You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:48 UTC
[14/15] flink git commit: [FLINK-6093] [table] Add stream TableSinks
and DataStream conversion with support for retraction.
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index 809afd2..7214394 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -25,7 +25,6 @@ import org.apache.flink.types.Row
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
/**
* A simple [[TableSink]] to emit data as CSV files.
@@ -40,7 +39,7 @@ class CsvTableSink(
fieldDelim: Option[String],
numFiles: Option[Int],
writeMode: Option[WriteMode])
- extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] {
+ extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {
/**
* A simple [[TableSink]] to emit data as CSV files.
@@ -134,100 +133,3 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
builder.mkString
}
}
-
-/**
- * A simple [[TableSink]] to emit data as CSV files.
- *
- * @param path The output path to write the Table to.
- * @param fieldDelim The field delimiter
- * @param numFiles The number of files to write to
- * @param writeMode The write mode to specify whether existing files are overwritten or not.
- */
-class CsvRetractTableSink(
- path: String,
- fieldDelim: Option[String],
- numFiles: Option[Int],
- writeMode: Option[WriteMode])
- extends TableSinkBase[Row] with StreamRetractSink[Row] {
-
- override def needsUpdatesAsRetraction: Boolean = true
-
- /**
- * A simple [[TableSink]] to emit data as CSV files.
- *
- * @param path The output path to write the Table to.
- * @param fieldDelim The field delimiter, ',' by default.
- */
- def this(path: String, fieldDelim: String = ",") {
- this(path, Some(fieldDelim), None, None)
- }
-
- /**
- * A simple [[TableSink]] to emit data as CSV files.
- *
- * @param path The output path to write the Table to.
- * @param fieldDelim The field delimiter.
- * @param numFiles The number of files to write to.
- * @param writeMode The write mode to specify whether existing files are overwritten or not.
- */
- def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
- this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
- }
-
-
- override def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,Row]]): Unit = {
- val csvRows = dataStream
- .map(new CsvRetractFormatter(fieldDelim.getOrElse(",")))
- .returns(TypeInformation.of(classOf[String]))
-
-
- if (numFiles.isDefined) {
- csvRows.setParallelism(numFiles.get)
- }
-
- val sink = writeMode match {
- case None => csvRows.writeAsText(path)
- case Some(wm) => csvRows.writeAsText(path, wm)
- }
-
- if (numFiles.isDefined) {
- sink.setParallelism(numFiles.get)
- }
- }
-
- override protected def copy: TableSinkBase[Row] = {
- new CsvRetractTableSink(path, fieldDelim, numFiles, writeMode)
- }
-
- override def getOutputType: TypeInformation[Row] = {
- new RowTypeInfo(getFieldTypes: _*)
- }
-}
-
-/**
- * Formats a [[Tuple2]] with change information into a [[String]] with fields separated by the
- * field delimiter.
- *
- * @param fieldDelim The field delimiter.
- */
-class CsvRetractFormatter(fieldDelim: String) extends MapFunction[JTuple2[Boolean,Row], String] {
- override def map(rowT: JTuple2[Boolean,Row]): String = {
-
- val row: Row = rowT.f1
-
- val builder = new StringBuilder
-
- builder.append(rowT.f0.toString)
-
- // write following values
- for (i <- 0 until row.getArity) {
- builder.append(fieldDelim)
- val v = row.getField(i)
- if (v != null) {
- builder.append(v.toString)
- }
- }
- builder.mkString
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
new file mode 100644
index 0000000..3ab997e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.Types
+
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.Table
+
+/**
+ * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete
+ * changes.
+ *
+ * The table will be converted into a stream of accumulate and retraction messages which are
+ * encoded as [[JTuple2]].
+ * The first field is a [[JBool]] flag to indicate the message type.
+ * The second field holds the record of the requested type [[T]].
+ *
+ * A message with true [[JBool]] flag is an accumulate (or add) message.
+ * A message with false flag is a retract message.
+ *
+ * @tparam T Type of records that this [[TableSink]] expects and supports.
+ */
+trait RetractStreamTableSink[T] extends TableSink[JTuple2[JBool, T]] {
+
+ /** Returns the requested record type */
+ def getRecordType: TypeInformation[T]
+
+ /** Emits the DataStream. */
+ def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit
+
+ override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
deleted file mode 100644
index 7f7c944..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.streaming.api.datastream.DataStream
-
-trait StreamRetractSink[T] extends TableSink[T]{
-
- /**
- * Whether the [[StreamTableSink]] requires that update and delete changes are sent with
- * retraction messages.
- */
- def needsUpdatesAsRetraction: Boolean = false
-
- /** Emits the DataStream with change infomation. */
- def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,T]]): Unit
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
deleted file mode 100644
index 360252e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.Table
-
-/** Defines an external [[TableSink]] to emit a batch [[Table]].
- *
- * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
- */
-trait StreamTableSink[T] extends TableSink[T] {
-
- /** Emits the DataStream. */
- def emitDataStream(dataStream: DataStream[T]): Unit
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
new file mode 100644
index 0000000..2ae3406
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{Table, Types}
+
+/**
+ * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete
+ * changes. The [[Table]] must be have unique key fields (atomic or composite) or be append-only.
+ *
+ * If the [[Table]] does not have a unique key and is not append-only, a
+ * [[org.apache.flink.table.api.TableException]] will be thrown.
+ *
+ * The unique key of the table is configured by the [[UpsertStreamTableSink#setKeyFields()]]
+ * method.
+ *
+ * The [[Table]] will be converted into a stream of upsert and delete messages which are encoded as
+ * [[JTuple2]]. The first field is a [[JBool]] flag to indicate the message type. The second field
+ * holds the record of the requested type [[T]].
+ *
+ * A message with true [[JBool]] field is an upsert message for the configured key.
+ * A message with false flag is a delete message for the configured key.
+ *
+ * If the table is append-only, all messages will have a true flag and must be interpreted
+ * as insertions.
+ *
+ * @tparam T Type of records that this [[TableSink]] expects and supports.
+ */
+trait UpsertStreamTableSink[T] extends TableSink[JTuple2[JBool, T]] {
+
+ /**
+ * Configures the unique key fields of the [[Table]] to write.
+ * The method is called after [[TableSink.configure()]].
+ *
+ * The keys array might be empty, if the table consists of a single (updated) record.
+ * If the table does not have a key and is append-only, the keys attribute is null.
+ *
+ * @param keys the field names of the table's keys, an empty array if the table has a single
+ * row, and null if the table is append-only and has no key.
+ */
+ def setKeyFields(keys: Array[String]): Unit
+
+ /**
+ * Specifies whether the [[Table]] to write is append-only or not.
+ *
+ * @param isAppendOnly true if the table is append-only, false otherwise.
+ */
+ def setIsAppendOnly(isAppendOnly: Boolean): Unit
+
+ /** Returns the requested record type */
+ def getRecordType: TypeInformation[T]
+
+ /** Emits the DataStream. */
+ def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit
+
+ override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 675e5d9..ba3b591 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.table
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
import org.apache.flink.table.api.scala._
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.table.api.TableException
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
index d490763..40f4c7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
@@ -27,10 +27,8 @@ import org.junit.Test
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.utils.TableFunc0
-import scala.collection.mutable
/**
* tests for retraction
@@ -55,51 +53,47 @@ class RetractionITCase extends StreamingWithStateTestBase {
def testWordCount(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
+ StreamITCase.clear
env.setStateBackend(getStateBackend)
val stream = env.fromCollection(data)
val table = stream.toTable(tEnv, 'word, 'num)
val resultTable = table
.groupBy('word)
- .select('word as 'word, 'num.sum as 'count)
+ .select('num.sum as 'count)
.groupBy('count)
- .select('count, 'word.count as 'frequency)
+ .select('count, 'count.count as 'frequency)
- val results = resultTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ val results = resultTable.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
env.execute()
- val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0",
- "4,1", "4,0", "5,1", "5,0", "6,1", "1,2")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ val expected = Seq("1,2", "2,1", "6,1")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
// keyed groupby + non-keyed groupby
@Test
def testGroupByAndNonKeyedGroupBy(): Unit = {
-
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
+ StreamITCase.clear
env.setStateBackend(getStateBackend)
val stream = env.fromCollection(data)
val table = stream.toTable(tEnv, 'word, 'num)
val resultTable = table
.groupBy('word)
- .select('word as 'word, 'num.sum as 'count)
- .select('count.sum)
+ .select('word as 'word, 'num.sum as 'cnt)
+ .select('cnt.sum)
+
+ val results = resultTable.toRetractStream[Row]
- val results = resultTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
- val expected = Seq("1", "2", "1", "3", "4", "3", "5", "3", "6", "3", "7", "3", "8", "3", "9",
- "10")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ val expected = Seq("10")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
// non-keyed groupby + keyed groupby
@@ -108,8 +102,7 @@ class RetractionITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
+ StreamITCase.clear
env.setStateBackend(getStateBackend)
val stream = env.fromCollection(data)
@@ -119,13 +112,12 @@ class RetractionITCase extends StreamingWithStateTestBase {
.groupBy('count)
.select('count, 'count.count)
- val results = resultTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ val results = resultTable.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
env.execute()
- val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1", "4,0", "5,1", "5,0", "6," +
- "1", "6,0", "7,1", "7,0", "8,1", "8,0", "9,1", "9,0", "10,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ val expected = Seq("10,1")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
// test unique process, if the current output message of unbounded groupby equals the
@@ -150,9 +142,9 @@ class RetractionITCase extends StreamingWithStateTestBase {
)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
+ StreamITCase.clear
env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
val stream = env.fromCollection(data)
val table = stream.toTable(tEnv, 'pk, 'value)
@@ -162,12 +154,13 @@ class RetractionITCase extends StreamingWithStateTestBase {
.groupBy('sum)
.select('sum, 'pk.count as 'count)
- val results = resultTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ val results = resultTable.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractMessagesSink)
env.execute()
- val expected = Seq("1,1", "2,1", "3,1", "3,0", "6,1", "1,2", "1,3", "6,2", "6,1", "12,1","12," +
- "0", "18,1", "8,1")
+ val expected = Seq(
+ "+1,1", "+2,1", "+3,1", "-3,1", "+6,1", "-1,1", "+1,2", "-1,2", "+1,3", "-6,1", "+6,2",
+ "-6,2", "+6,1", "+12,1", "-12,1", "+18,1", "+8,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -176,8 +169,7 @@ class RetractionITCase extends StreamingWithStateTestBase {
def testCorrelate(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- env.setParallelism(1)
+ StreamITCase.clear
env.setStateBackend(getStateBackend)
val func0 = new TableFunc0
@@ -186,19 +178,17 @@ class RetractionITCase extends StreamingWithStateTestBase {
val table = stream.toTable(tEnv, 'word, 'num)
val resultTable = table
.groupBy('word)
- .select('word as 'word, 'num.sum as 'count)
+ .select('word as 'word, 'num.sum as 'cnt)
.leftOuterJoin(func0('word))
- .groupBy('count)
- .select('count, 'word.count as 'frequency)
+ .groupBy('cnt)
+ .select('cnt, 'word.count as 'frequency)
- val results = resultTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ val results = resultTable.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
env.execute()
- val expected = Seq(
- "1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0", "4,1", "4,0", "5,1",
- "5,0", "6,1", "1,2")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ val expected = Seq("1,2", "2,1", "6,1")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
index ceae6c6..c446d64 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
@@ -23,7 +23,7 @@ import java.io.File
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.stream.utils.StreamTestData
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sinks.{CsvTableSink, CsvRetractTableSink}
+import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
@@ -59,34 +59,5 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
TestBaseUtils.compareResultsByLinesInMemory(expected, path)
}
-
- @Test
- def testStreamTableSinkNeedRetraction(): Unit = {
-
- val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
- tmpFile.deleteOnExit()
- val path = tmpFile.toURI.toString
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(4)
-
- val input = StreamTestData.get3TupleDataStream(env)
- .map(x => x).setParallelism(1) // increase DOP to 4
-
- val results = input.toTable(tEnv, 'a, 'b, 'c)
- .where('a < 5 || 'a > 17)
- .select('c, 'b)
- .groupBy('b)
- .select('b, 'c.count)
- .writeToSink(new CsvRetractTableSink(path))
-
- env.execute()
-
- val expected = Seq(
- "true,1,1", "true,2,1", "false,2,1", "true,2,2", "true,3,1", "true,6,1", "false,6,1",
- "true,6,2", "false,6,2", "true,6,3", "false,6,3", "true,6,4").mkString("\n")
-
- TestBaseUtils.compareResultsByLinesInMemory(expected, path)
- }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index abbcbdd..249d505 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -53,19 +53,19 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
- val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
+ val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+ result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
- val expected = mutable.MutableList("1,1", "2,1", "2,2")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ val expected = List("1,1", "2,2", "3,3", "4,4", "5,5", "6,6")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
/** test selection **/
@@ -74,7 +74,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
@@ -85,7 +85,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList("2,0", "4,1", "6,1")
+ val expected = List("2,0", "4,1", "6,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -95,7 +95,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
@@ -106,7 +106,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList("3,2,Hello world")
+ val expected = List("3,2,Hello world")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -116,7 +116,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
@@ -127,7 +127,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList("3,2,Hello world")
+ val expected = List("3,2,Hello world")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -136,7 +136,7 @@ class SqlITCase extends StreamingWithStateTestBase {
def testUnion(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val sqlQuery = "SELECT * FROM T1 " +
"UNION ALL " +
@@ -151,7 +151,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"1,1,Hi", "1,1,Hi",
"2,2,Hello", "2,2,Hello",
"3,2,Hello world", "3,2,Hello world")
@@ -163,7 +163,7 @@ class SqlITCase extends StreamingWithStateTestBase {
def testUnionWithFilter(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
"UNION ALL " +
@@ -178,7 +178,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"2,2,Hello",
"3,2,Hello world")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
@@ -189,7 +189,7 @@ class SqlITCase extends StreamingWithStateTestBase {
def testUnionTableWithDataSet(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
"UNION ALL " +
@@ -204,7 +204,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList("Hello", "Hello world")
+ val expected = List("Hello", "Hello world")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -213,7 +213,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
// for sum aggregation ensure that every time the order of each element is consistent
env.setParallelism(1)
@@ -232,7 +232,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
"Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
@@ -243,7 +243,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
@@ -259,7 +259,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"Hello World,1", "Hello World,2", "Hello World,3",
"Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
@@ -270,7 +270,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
// for sum aggregation ensure that every time the order of each element is consistent
env.setParallelism(1)
@@ -289,7 +289,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
"Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
@@ -300,7 +300,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
@@ -314,7 +314,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList("1", "2", "3", "4", "5", "6", "7", "8", "9")
+ val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -363,7 +363,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
"Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
"Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
@@ -420,7 +420,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
"Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
"Hello,3,3,7",
@@ -490,7 +490,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
"Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
"Hello,3,4,9",
@@ -562,7 +562,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
"Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
"Hello,3,4,9",
@@ -584,7 +584,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
@@ -608,7 +608,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " +
@@ -661,7 +661,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"1,2,Hello,2,1,2,2,2",
"1,3,Hello world,5,2,2,3,2",
"1,1,Hi,6,3,2,3,1",
@@ -687,7 +687,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val sqlQuery = "SELECT a, b, c, " +
"SUM(b) over (" +
@@ -731,7 +731,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"1,2,Hello,2,1,2,2,2",
"1,3,Hello world,5,2,2,3,2",
"1,1,Hi,6,3,2,3,1",
@@ -757,7 +757,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " +
@@ -793,7 +793,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"2,2,Hello,2,1,2,2,2",
"3,5,Hello,7,2,3,5,2",
"1,3,Hello,10,3,3,5,2",
@@ -812,7 +812,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " +
@@ -849,7 +849,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"2,2,Hello,2,1,2,2,2",
"3,5,Hello,7,2,3,5,2",
"1,3,Hello,10,3,3,5,2",
@@ -869,7 +869,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " +
@@ -907,7 +907,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"2,1,Hello,1,1,1,1,1",
"1,1,Hello,7,4,1,3,1",
"1,2,Hello,7,4,1,3,1",
@@ -932,7 +932,7 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " +
@@ -975,7 +975,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"1,1,Hello,6,3,2,3,1",
"1,2,Hello,6,3,2,3,1",
"1,3,Hello world,6,3,2,3,1",
@@ -1000,7 +1000,7 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val t = StreamTestData.get5TupleDataStream(env)
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
@@ -1017,7 +1017,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"1,0,0",
"2,1,1",
"2,3,1",
@@ -1043,7 +1043,7 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val t = StreamTestData.get5TupleDataStream(env)
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
@@ -1060,7 +1060,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"1,0,0",
"2,1,1",
"2,3,1",
@@ -1087,7 +1087,7 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val t = StreamTestData.get5TupleDataStream(env)
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
@@ -1104,7 +1104,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"1,0,0",
"2,1,0",
"2,3,0",
@@ -1130,7 +1130,7 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)
- StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
val t = StreamTestData.get5TupleDataStream(env)
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
@@ -1146,7 +1146,7 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = mutable.MutableList(
+ val expected = List(
"1,0,0",
"2,1,0",
"2,3,0",
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
index 271e90b..910cbf2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase.RetractingSink
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -38,29 +39,24 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
def testNonKeyedGroupAggregate(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
- env.setParallelism(1)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
.select('a.sum, 'b.sum)
- val results = t.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ val results = t.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
- val expected = mutable.MutableList(
- "1,1", "3,3", "6,5", "10,8", "15,11", "21,14", "28,18", "36,22", "45,26", "55,30", "66,35",
- "78,40", "91,45", "105,50", "120,55", "136,61", "153,67", "171,73", "190,79", "210,85",
- "231,91")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ val expected = List("231,91")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
@Test
def testGroupAggregate(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
- env.setParallelism(1)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
@@ -68,15 +64,12 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
.groupBy('b)
.select('b, 'a.sum)
- val results = t.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ val results = t.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
env.execute()
- val expected = mutable.MutableList(
- "1,1", "2,2", "2,5", "3,4", "3,9", "3,15", "4,7", "4,15",
- "4,24", "4,34", "5,11", "5,23", "5,36", "5,50", "5,65", "6,16", "6,33", "6,51", "6,70",
- "6,90", "6,111")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
@Test
@@ -88,30 +81,22 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
.groupBy('b)
- .select('a.sum as 'd, 'b)
- .groupBy('b, 'd)
- .select('b)
+ .select('a.count as 'cnt, 'b)
+ .groupBy('cnt)
+ .select('cnt, 'b.count as 'freq)
- val results = t.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
+ val results = t.toRetractStream[Row]
- val expected = mutable.MutableList(
- "1",
- "2", "2",
- "3", "3", "3",
- "4", "4", "4", "4",
- "5", "5", "5", "5", "5",
- "6", "6", "6", "6", "6", "6")
-
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ results.addSink(new RetractingSink)
+ env.execute()
+ val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
@Test
def testGroupAggregateWithExpression(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
- env.setParallelism(1)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
@@ -119,14 +104,14 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
.groupBy('e, 'b % 3)
.select('c.min, 'e, 'a.avg, 'd.count)
- val results = t.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
+ val results = t.toRetractStream[Row]
+ results.addSink(new RetractingSink)
env.execute()
val expected = mutable.MutableList(
- "0,1,1,1", "1,2,2,1", "2,1,2,1", "3,2,3,1", "1,2,2,2",
- "5,3,3,1", "3,2,3,2", "7,1,4,1", "2,1,3,2", "3,2,3,3", "7,1,4,2", "5,3,4,2", "12,3,5,1",
- "1,2,3,3", "14,2,5,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ "0,1,1,1", "7,1,4,2", "2,1,3,2",
+ "3,2,3,3", "1,2,3,3", "14,2,5,1",
+ "12,3,5,1", "5,3,4,2")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
index ea3ab22..96e5eb5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -35,7 +35,7 @@ class OverWindowTest extends TableTestBase {
val result = table
.window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
.select('c, 'b.count over 'x)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test(expected = classOf[ValidationException])
@@ -43,7 +43,7 @@ class OverWindowTest extends TableTestBase {
val result = table
.window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w)
.select('c, 'b.count over 'w)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test(expected = classOf[ValidationException])
@@ -51,7 +51,7 @@ class OverWindowTest extends TableTestBase {
val result = table
.window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" as 'w)
.select('c, 'b.count over 'w)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test(expected = classOf[ValidationException])
@@ -59,7 +59,7 @@ class OverWindowTest extends TableTestBase {
val result = table
.window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_RANGE as 'w)
.select('c, 'b.count over 'w)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test(expected = classOf[ValidationException])
@@ -67,7 +67,7 @@ class OverWindowTest extends TableTestBase {
val result = table
.window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w)
.select('c, 'b.count over 'w)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test(expected = classOf[ValidationException])
@@ -77,7 +77,7 @@ class OverWindowTest extends TableTestBase {
val result = table2
.window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
.select('c, 'b.count over 'w)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test(expected = classOf[ValidationException])
@@ -85,7 +85,7 @@ class OverWindowTest extends TableTestBase {
val result = table
.window(Over orderBy 'rowtime preceding -1.rows as 'w)
.select('c, 'b.count over 'w)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test(expected = classOf[ValidationException])
@@ -93,7 +93,7 @@ class OverWindowTest extends TableTestBase {
val result = table
.window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w)
.select('c, 'b.count over 'w)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test(expected = classOf[ValidationException])
@@ -103,7 +103,7 @@ class OverWindowTest extends TableTestBase {
val result = table
.window(Over orderBy 'rowtime preceding 1.minutes as 'w)
.select('c, weightedAvg('b, 'a) over 'w)
- streamUtil.tEnv.optimize(result.getRelNode)
+ streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index 497869d..effde8e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -25,16 +25,18 @@ import org.junit.Assert._
import scala.collection.mutable
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.runtime.types.CRow
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
object StreamITCase {
- var testResults = mutable.MutableList.empty[String]
+ var testResults: mutable.MutableList[String] = mutable.MutableList.empty[String]
+ var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]
def clear = {
StreamITCase.testResults.clear()
+ StreamITCase.retractedResults.clear()
}
def compareWithList(expected: java.util.List[String]): Unit = {
@@ -49,4 +51,33 @@ object StreamITCase {
}
}
}
+
+ final class RetractMessagesSink extends RichSinkFunction[(Boolean, Row)]() {
+ def invoke(v: (Boolean, Row)) {
+ testResults.synchronized {
+ testResults += (if (v._1) "+" else "-") + v._2
+ }
+ }
+ }
+
+ final class RetractingSink() extends RichSinkFunction[(Boolean, Row)] {
+ def invoke(v: (Boolean, Row)) {
+ retractedResults.synchronized {
+ val value = v._2.toString
+ if (v._1) {
+ retractedResults += value
+ } else {
+ val idx = retractedResults.indexOf(value)
+ if (idx >= 0) {
+ retractedResults.remove(idx)
+ } else {
+ throw new RuntimeException("Tried to retract a value that wasn't added first. " +
+ "This is probably an incorrectly implemented test. " +
+ "Try to set the parallelism of the sink to 1.")
+ }
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
index 580029f..861f70e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
@@ -292,7 +292,7 @@ class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
def verifyTableTrait(resultTable: Table, expected: String): Unit = {
val relNode = resultTable.getRelNode
- val optimized = tEnv.optimize(relNode)
+ val optimized = tEnv.optimize(relNode, updatesAsRetraction = false)
val actual = TraitUtil.toString(optimized)
assertEquals(
expected.split("\n").map(_.trim).mkString("\n"),
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
new file mode 100644
index 0000000..2dfb658
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test(expected = classOf[TableException])
+ def testAppendSinkOnUpdatingTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
+
+ t.groupBy('text)
+ .select('text, 'id.count, 'num.sum)
+ .writeToSink(new TestAppendSink)
+
+ // must fail because table is not append-only
+ env.execute()
+ }
+
+ @Test
+ def testAppendSinkOnAppendTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select('w.end, 'id.count, 'num.sum)
+ .writeToSink(new TestAppendSink)
+
+ env.execute()
+
+ val result = RowCollector.getAndClearValues.map(_.f1.toString).sorted
+ val expected = List(
+ "1970-01-01 00:00:00.005,4,8",
+ "1970-01-01 00:00:00.01,5,18",
+ "1970-01-01 00:00:00.015,5,24",
+ "1970-01-01 00:00:00.02,5,29",
+ "1970-01-01 00:00:00.025,2,12")
+ .sorted
+ assertEquals(expected, result)
+ }
+
+ @Test
+ def testRetractSinkOnUpdatingTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text)
+
+ t.select('id, 'num, 'text.charLength() as 'len)
+ .groupBy('len)
+ .select('len, 'id.count, 'num.sum)
+ .writeToSink(new TestRetractSink)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ val retracted = restractResults(results).sorted
+ val expected = List(
+ "2,1,1",
+ "5,1,2",
+ "11,1,2",
+ "25,1,3",
+ "10,7,39",
+ "14,1,3",
+ "9,9,41").sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test
+ def testRetractSinkOnAppendTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select('w.end, 'id.count, 'num.sum)
+ .writeToSink(new TestRetractSink)
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = restractResults(results).sorted
+ val expected = List(
+ "1970-01-01 00:00:00.005,4,8",
+ "1970-01-01 00:00:00.01,5,18",
+ "1970-01-01 00:00:00.015,5,24",
+ "1970-01-01 00:00:00.02,5,29",
+ "1970-01-01 00:00:00.025,2,12")
+ .sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test
+ def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text)
+
+ t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+ .groupBy('len, 'cTrue)
+ .select('len, 'id.count as 'cnt, 'cTrue)
+ .groupBy('cnt, 'cTrue)
+ .select('cnt, 'len.count, 'cTrue)
+ .writeToSink(new TestUpsertSink(Array("cnt", "cTrue"), false))
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertTrue(
+ "Results must include delete messages",
+ results.exists(_.f0 == false)
+ )
+
+ val retracted = upsertResults(results, Array(0, 2)).sorted
+ val expected = List(
+ "1,5,true",
+ "7,1,true",
+ "9,1,true").sorted
+ assertEquals(expected, retracted)
+
+ }
+
+ @Test(expected = classOf[TableException])
+ def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text)
+
+ t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+ .groupBy('len, 'cTrue)
+ .select('len, 'id.count, 'num.sum)
+ .writeToSink(new TestUpsertSink(Array("len", "cTrue"), false))
+
+ // must fail because table is updating table without full key
+ env.execute()
+ }
+
+ @Test
+ def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'num)
+ .select('num, 'w.end as 'wend, 'id.count)
+ .writeToSink(new TestUpsertSink(Array("wend", "num"), true))
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = upsertResults(results, Array(0, 1, 2)).sorted
+ val expected = List(
+ "1,1970-01-01 00:00:00.005,1",
+ "2,1970-01-01 00:00:00.005,2",
+ "3,1970-01-01 00:00:00.005,1",
+ "3,1970-01-01 00:00:00.01,2",
+ "4,1970-01-01 00:00:00.01,3",
+ "4,1970-01-01 00:00:00.015,1",
+ "5,1970-01-01 00:00:00.015,4",
+ "5,1970-01-01 00:00:00.02,1",
+ "6,1970-01-01 00:00:00.02,4",
+ "6,1970-01-01 00:00:00.025,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'num)
+ .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count)
+ .writeToSink(new TestUpsertSink(Array("wstart", "wend", "num"), true))
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = upsertResults(results, Array(0, 1, 2)).sorted
+ val expected = List(
+ "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
+ "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
+ "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
+ "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
+ "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
+ "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
+ "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
+ "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
+ "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
+ "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'num)
+ .select('w.end as 'wend, 'id.count as 'cnt)
+ .writeToSink(new TestUpsertSink(null, true))
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = results.map(_.f1.toString).sorted
+ val expected = List(
+ "1970-01-01 00:00:00.005,1",
+ "1970-01-01 00:00:00.005,2",
+ "1970-01-01 00:00:00.005,1",
+ "1970-01-01 00:00:00.01,2",
+ "1970-01-01 00:00:00.01,3",
+ "1970-01-01 00:00:00.015,1",
+ "1970-01-01 00:00:00.015,4",
+ "1970-01-01 00:00:00.02,1",
+ "1970-01-01 00:00:00.02,4",
+ "1970-01-01 00:00:00.025,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ @Test
+ def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+ t.window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'num)
+ .select('num, 'id.count as 'cnt)
+ .writeToSink(new TestUpsertSink(null, true))
+
+ env.execute()
+ val results = RowCollector.getAndClearValues
+
+ assertFalse(
+ "Received retraction messages for append only table",
+ results.exists(!_.f0))
+
+ val retracted = results.map(_.f1.toString).sorted
+ val expected = List(
+ "1,1",
+ "2,2",
+ "3,1",
+ "3,2",
+ "4,3",
+ "4,1",
+ "5,4",
+ "5,1",
+ "6,4",
+ "6,2").sorted
+ assertEquals(expected, retracted)
+ }
+
+ /** Converts a list of retraction messages into a list of final results. */
+ private def restractResults(results: List[JTuple2[JBool, Row]]): List[String] = {
+
+ val retracted = results
+ .foldLeft(Map[String, Int]()){ (m: Map[String, Int], v: JTuple2[JBool, Row]) =>
+ val cnt = m.getOrElse(v.f1.toString, 0)
+ if (v.f0) {
+ m + (v.f1.toString -> (cnt + 1))
+ } else {
+ m + (v.f1.toString -> (cnt - 1))
+ }
+ }.filter{ case (_, c: Int) => c != 0 }
+
+ assertFalse(
+ "Received retracted rows which have not been accumulated.",
+ retracted.exists{ case (_, c: Int) => c < 0})
+
+ retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList
+ }
+
+ /** Converts a list of upsert messages into a list of final results. */
+ private def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
+
+ def getKeys(r: Row): List[String] =
+ keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
+
+ val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
+ val key = getKeys(r.f1).mkString("")
+ if (r.f0) {
+ o + (key -> r.f1.toString)
+ } else {
+ o - key
+ }
+ }
+
+ upserted.values.toList
+ }
+
+}
+
+private class TestAppendSink extends AppendStreamTableSink[Row] {
+
+ var fNames: Array[String] = _
+ var fTypes: Array[TypeInformation[_]] = _
+
+ override def emitDataStream(s: DataStream[Row]): Unit = {
+ s.map(
+ new MapFunction[Row, JTuple2[JBool, Row]] {
+ override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, value)
+ })
+ .addSink(new RowSink)
+ }
+
+ override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
+
+ override def getFieldNames: Array[String] = fNames
+
+ override def getFieldTypes: Array[TypeInformation[_]] = fTypes
+
+ override def configure(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
+ val copy = new TestAppendSink
+ copy.fNames = fieldNames
+ copy.fTypes = fieldTypes
+ copy
+ }
+}
+
+private class TestRetractSink extends RetractStreamTableSink[Row] {
+
+ var fNames: Array[String] = _
+ var fTypes: Array[TypeInformation[_]] = _
+
+ override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = {
+ s.addSink(new RowSink)
+ }
+
+ override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
+
+ override def getFieldNames: Array[String] = fNames
+
+ override def getFieldTypes: Array[TypeInformation[_]] = fTypes
+
+ override def configure(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
+ val copy = new TestRetractSink
+ copy.fNames = fieldNames
+ copy.fTypes = fieldTypes
+ copy
+ }
+
+}
+
+private class TestUpsertSink(
+ expectedKeys: Array[String],
+ expectedIsAppendOnly: Boolean)
+ extends UpsertStreamTableSink[Row] {
+
+ var fNames: Array[String] = _
+ var fTypes: Array[TypeInformation[_]] = _
+
+ override def setKeyFields(keys: Array[String]): Unit =
+ if (keys != null) {
+ assertEquals("Provided key fields do not match expected keys",
+ expectedKeys.sorted.mkString(","),
+ keys.sorted.mkString(","))
+ } else {
+ assertNull("Provided key fields should not be null.", expectedKeys)
+ }
+
+ override def setIsAppendOnly(isAppendOnly: Boolean): Unit =
+ assertEquals(
+ "Provided isAppendOnly does not match expected isAppendOnly",
+ expectedIsAppendOnly,
+ isAppendOnly)
+
+ override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
+
+ override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = {
+ s.addSink(new RowSink)
+ }
+
+ override def getFieldNames: Array[String] = fNames
+
+ override def getFieldTypes: Array[TypeInformation[_]] = fTypes
+
+ override def configure(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
+ val copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly)
+ copy.fNames = fieldNames
+ copy.fTypes = fieldTypes
+ copy
+ }
+}
+
+class RowSink extends SinkFunction[JTuple2[JBool, Row]] {
+ override def invoke(value: JTuple2[JBool, Row]): Unit = RowCollector.addValue(value)
+}
+
+object RowCollector {
+ private val sink: mutable.ArrayBuffer[JTuple2[JBool, Row]] =
+ new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
+
+ def addValue(value: JTuple2[JBool, Row]): Unit = {
+ sink.synchronized {
+ sink += value
+ }
+ }
+
+ def getAndClearValues: List[JTuple2[JBool, Row]] = {
+ val out = sink.toList
+ sink.clear()
+ out
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 79e957a..8626b07 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -39,9 +39,4 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
- override protected def getConversionMapper[IN, OUT](
- physicalTypeInfo: TypeInformation[IN],
- logicalRowType: RelDataType,
- requestedTypeInfo: TypeInformation[OUT],
- functionName: String) = ???
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 73bc2f8..0e6d461 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -211,7 +211,7 @@ case class StreamTableTestUtil() extends TableTestUtil {
def verifyTable(resultTable: Table, expected: String): Unit = {
val relNode = resultTable.getRelNode
- val optimized = tEnv.optimize(relNode)
+ val optimized = tEnv.optimize(relNode, updatesAsRetraction = false)
val actual = RelOptUtil.toString(optimized)
assertEquals(
expected.split("\n").map(_.trim).mkString("\n"),
@@ -221,7 +221,7 @@ case class StreamTableTestUtil() extends TableTestUtil {
// the print methods are for debugging purposes only
def printTable(resultTable: Table): Unit = {
val relNode = resultTable.getRelNode
- val optimized = tEnv.optimize(relNode)
+ val optimized = tEnv.optimize(relNode, updatesAsRetraction = false)
println(RelOptUtil.toString(optimized))
}