You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:44 UTC
[10/15] flink git commit: [FLINK-6091] [table] Implement and turn on
retraction for non-windowed aggregates.
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 5247685..60de1f1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -22,9 +22,10 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.scala._
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.table.api.TableException
import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.table.runtime.types.CRowTypeInfo
import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, binaryNode, streamTableNode, term, unaryNode}
import org.apache.flink.types.Row
@@ -40,9 +41,13 @@ class TableEnvironmentTest extends TableTestBase {
STRING_TYPE_INFO,
DOUBLE_TYPE_INFO)
- val caseClassType = implicitly[TypeInformation[CClass]]
+ val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
- val pojoType = TypeExtractor.createTypeInfo(classOf[PojoClass])
+ val cRowType = new CRowTypeInfo(rowType)
+
+ val caseClassType: TypeInformation[CClass] = implicitly[TypeInformation[CClass]]
+
+ val pojoType: TypeInformation[PojoClass] = TypeExtractor.createTypeInfo(classOf[PojoClass])
val atomicType = INT_TYPE_INFO
@@ -57,6 +62,14 @@ class TableEnvironmentTest extends TableTestBase {
}
@Test
+ def testGetFieldInfoCRow(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(cRowType)
+
+ fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
def testGetFieldInfoCClass(): Unit = {
val fieldInfo = tEnv.getFieldInfo(caseClassType)
@@ -100,6 +113,20 @@ class TableEnvironmentTest extends TableTestBase {
}
@Test
+ def testGetFieldInfoCRowNames(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ cRowType,
+ Array(
+ UnresolvedFieldReference("name1"),
+ UnresolvedFieldReference("name2"),
+ UnresolvedFieldReference("name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
def testGetFieldInfoCClassNames(): Unit = {
val fieldInfo = tEnv.getFieldInfo(
caseClassType,
@@ -198,6 +225,45 @@ class TableEnvironmentTest extends TableTestBase {
}
@Test
+ def testGetFieldInfoCRowAlias1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ cRowType,
+ Array(
+ Alias(UnresolvedFieldReference("f0"), "name1"),
+ Alias(UnresolvedFieldReference("f1"), "name2"),
+ Alias(UnresolvedFieldReference("f2"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoCRowAlias2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ cRowType,
+ Array(
+ Alias(UnresolvedFieldReference("f2"), "name1"),
+ Alias(UnresolvedFieldReference("f0"), "name2"),
+ Alias(UnresolvedFieldReference("f1"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoCRowAlias3(): Unit = {
+ tEnv.getFieldInfo(
+ cRowType,
+ Array(
+ Alias(UnresolvedFieldReference("xxx"), "name1"),
+ Alias(UnresolvedFieldReference("yyy"), "name2"),
+ Alias(UnresolvedFieldReference("zzz"), "name3")
+ ))
+ }
+
+ @Test
def testGetFieldInfoCClassAlias1(): Unit = {
val fieldInfo = tEnv.getFieldInfo(
caseClassType,
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 57ee3b3..ebfac0a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTest
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.types.Row
import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
@@ -164,6 +165,25 @@ class TableEnvironmentITCase(
}
@Test
+ def testToDataSetWithTypeOfCRow(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 'b, 'c)
+
+ val expected = "+1,1,Hi\n" + "+2,2,Hello\n" + "+3,2,Hello world\n" +
+ "+4,3,Hello world, how are you?\n" + "+5,3,I am fine.\n" + "+6,3,Luke Skywalker\n" +
+ "+7,4,Comment#1\n" + "+8,4,Comment#2\n" + "+9,4,Comment#3\n" + "+10,4,Comment#4\n" +
+ "+11,5,Comment#5\n" + "+12,5,Comment#6\n" + "+13,5,Comment#7\n" + "+14,5,Comment#8\n" +
+ "+15,5,Comment#9\n" + "+16,6,Comment#10\n" + "+17,6,Comment#11\n" + "+18,6,Comment#12\n" +
+ "+19,6,Comment#13\n" + "+20,6,Comment#14\n" + "+21,6,Comment#15\n"
+ val results = t.toDataSet[CRow].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
def testToTableFromCaseClass(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
new file mode 100644
index 0000000..dde7f89
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+ * tests for retraction
+ */
+class RetractionITCase extends StreamingWithStateTestBase {
+ // input data
+ val data = List(
+ ("Hello", 1),
+ ("word", 1),
+ ("Hello", 1),
+ ("bark", 1),
+ ("bark", 1),
+ ("bark", 1),
+ ("bark", 1),
+ ("bark", 1),
+ ("bark", 1),
+ ("flink", 1)
+ )
+
+ // keyed groupby + keyed groupby
+ @Test
+ def testWordCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+ env.setStateBackend(getStateBackend)
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'word, 'num)
+ val resultTable = table
+ .groupBy('word)
+ .select('word as 'word, 'num.sum as 'count)
+ .groupBy('count)
+ .select('count, 'word.count as 'frequency)
+
+ // to DataStream with CRow
+ val results = resultTable.toDataStream[CRow]
+ results.addSink(new StreamITCase.StringSinkWithCRow)
+ env.execute()
+
+ val expected = Seq("+1,1", "+1,2", "+1,1", "+2,1", "+1,2", "+1,1", "+2,2", "+2,1", "+3,1",
+ "+3,0", "+4,1", "+4,0", "+5,1", "+5,0", "+6,1", "+1,2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ // keyed groupby + non-keyed groupby
+ @Test
+ def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+ env.setStateBackend(getStateBackend)
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'word, 'num)
+ val resultTable = table
+ .groupBy('word)
+ .select('word as 'word, 'num.sum as 'count)
+ .select('count.sum)
+
+ val results = resultTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("1", "2", "1", "3", "4", "3", "5", "3", "6", "3", "7", "3", "8", "3", "9",
+ "10")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ // non-keyed groupby + keyed groupby
+ @Test
+ def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+ env.setStateBackend(getStateBackend)
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'word, 'num)
+ val resultTable = table
+ .select('num.sum as 'count)
+ .groupBy('count)
+ .select('count, 'count.count)
+
+ val results = resultTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1", "4,0", "5,1", "5,0", "6," +
+ "1", "6,0", "7,1", "7,0", "8,1", "8,0", "9,1", "9,0", "10,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ // test unique process, if the current output message of unbounded groupby equals the
+ // previous message, unbounded groupby will ignore the current one.
+ @Test
+ def testUniqueProcess(): Unit = {
+ // data input
+ val data = List(
+ (1, 1L),
+ (2, 2L),
+ (3, 3L),
+ (3, 3L),
+ (4, 1L),
+ (4, 0L),
+ (4, 0L),
+ (4, 0L),
+ (5, 1L),
+ (6, 6L),
+ (6, 6L),
+ (6, 6L),
+ (7, 8L)
+ )
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+ env.setStateBackend(getStateBackend)
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'pk, 'value)
+ val resultTable = table
+ .groupBy('pk)
+ .select('pk as 'pk, 'value.sum as 'sum)
+ .groupBy('sum)
+ .select('sum, 'pk.count as 'count)
+
+ val results = resultTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("1,1", "2,1", "3,1", "3,0", "6,1", "1,2", "1,3", "6,2", "6,1", "12,1","12," +
+ "0", "18,1", "8,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ // correlate should handle retraction messages correctly
+ @Test
+ def testCorrelate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+ env.setStateBackend(getStateBackend)
+
+ val func0 = new TableFunc0
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'word, 'num)
+ val resultTable = table
+ .groupBy('word)
+ .select('word as 'word, 'num.sum as 'count)
+ .leftOuterJoin(func0('word))
+ .groupBy('count)
+ .select('count, 'word.count as 'frequency)
+
+ val results = resultTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0", "4,1", "4,0", "5,1",
+ "5,0", "6,1", "1,2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
index cdc4329..c446d64 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
@@ -59,5 +59,5 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
TestBaseUtils.compareResultsByLinesInMemory(expected, path)
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index f826bba..6c75d53 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -25,6 +25,7 @@ import org.junit.Assert._
import scala.collection.mutable
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.runtime.types.CRow
import scala.collection.JavaConverters._
@@ -44,7 +45,15 @@ object StreamITCase {
final class StringSink extends RichSinkFunction[Row]() {
def invoke(value: Row) {
testResults.synchronized {
- testResults += value.toString
+ testResults += value.toString
+ }
+ }
+ }
+
+ final class StringSinkWithCRow extends RichSinkFunction[CRow]() {
+ def invoke(value: CRow) {
+ testResults.synchronized {
+ testResults += value.toString
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
index 5e3e995..eadcfc8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
@@ -33,6 +33,7 @@ import org.apache.flink.table.codegen.GeneratedAggregationsFunction
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.types.Row
import org.junit.Test
@@ -41,13 +42,13 @@ class BoundedProcessingOverRangeProcessFunctionTest {
@Test
def testProcTimePartitionedOverRange(): Unit = {
- val rT = new RowTypeInfo(Array[TypeInformation[_]](
+ val rT = new CRowTypeInfo(new RowTypeInfo(Array[TypeInformation[_]](
INT_TYPE_INFO,
LONG_TYPE_INFO,
INT_TYPE_INFO,
STRING_TYPE_INFO,
LONG_TYPE_INFO),
- Array("a", "b", "c", "d", "e"))
+ Array("a", "b", "c", "d", "e")))
val aggregates =
Array(new LongMinWithRetractAggFunction,
@@ -183,14 +184,14 @@ class BoundedProcessingOverRangeProcessFunctionTest {
val funcName = "BoundedOverAggregateHelper$33"
val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
- val processFunction = new KeyedProcessOperator[String, Row, Row](
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
new ProcTimeBoundedRangeOver(
genAggFunction,
1000,
aggregationStateType,
rT))
- val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, Row, Row](
+ val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, CRow, CRow](
processFunction,
new TupleRowSelector(0),
BasicTypeInfo.INT_TYPE_INFO)
@@ -201,26 +202,26 @@ class BoundedProcessingOverRangeProcessFunctionTest {
testHarness.setProcessingTime(3)
// key = 1
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
// key = 2
testHarness.processElement(new StreamRecord(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), 0))
+ new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
// Time = 4
testHarness.setProcessingTime(4)
// key = 1
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
// key = 2
testHarness.processElement(new StreamRecord(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), 0))
+ new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
// Time = 5
testHarness.setProcessingTime(5)
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
// Time = 6
testHarness.setProcessingTime(6)
@@ -229,33 +230,33 @@ class BoundedProcessingOverRangeProcessFunctionTest {
testHarness.setProcessingTime(1002)
// key = 1
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
// key = 2
testHarness.processElement(new StreamRecord(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), 0))
+ new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
// Time = 1003
testHarness.setProcessingTime(1003)
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
// Time = 1004
testHarness.setProcessingTime(1004)
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
// Time = 1005
testHarness.setProcessingTime(1005)
// key = 1
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), 0))
+ new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0))
// key = 2
testHarness.processElement(new StreamRecord(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), 0))
+ new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
testHarness.setProcessingTime(1006)
@@ -264,34 +265,34 @@ class BoundedProcessingOverRangeProcessFunctionTest {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
// all elements at the same proc timestamp have the same value
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), 4))
- expectedOutput.add(new StreamRecord(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 4))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), 5))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), 5))
- expectedOutput.add(new StreamRecord(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 5))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), 6))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), 1003))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), 1003))
- expectedOutput.add(new StreamRecord(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 1003))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), 1004))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), 1005))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), 1006))
- expectedOutput.add(new StreamRecord(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), 1006))
- expectedOutput.add(new StreamRecord(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), 1006))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006))
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.",
expectedOutput, result, new RowResultSortComparator(6))
@@ -304,7 +305,7 @@ class BoundedProcessingOverRangeProcessFunctionTest {
object BoundedProcessingOverRangeProcessFunctionTest {
/**
- * Return 0 for equal Rows and non zero for different rows
+ * Return 0 for equal CRows and non zero for different CRows
*/
class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable {
@@ -314,8 +315,8 @@ class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with
// watermark is not expected
-1
} else {
- val row1 = o1.asInstanceOf[StreamRecord[Row]].getValue
- val row2 = o2.asInstanceOf[StreamRecord[Row]].getValue
+ val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue
+ val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue
row1.toString.compareTo(row2.toString)
}
}
@@ -325,10 +326,10 @@ class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with
* Simple test class that returns a specified field as the selector function
*/
class TupleRowSelector(
- private val selectorField:Int) extends KeySelector[Row, Integer] {
+ private val selectorField:Int) extends KeySelector[CRow, Integer] {
- override def getKey(value: Row): Integer = {
- value.getField(selectorField).asInstanceOf[Integer]
+ override def getKey(value: CRow): Integer = {
+ value.row.getField(selectorField).asInstanceOf[Integer]
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala
new file mode 100644
index 0000000..e574084
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import java.lang.{Integer => JInt, Long => JLong}
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+
+class CRowComparatorTest extends ComparatorTestBase[CRow] {
+
+ val rowType = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO
+ )
+
+ val cRowType = new CRowTypeInfo(rowType)
+
+ override protected def createComparator(asc: Boolean): TypeComparator[CRow] = {
+ cRowType.createComparator(
+ Array[Int](0, 2),
+ Array[Boolean](asc, asc),
+ 0,
+ new ExecutionConfig
+ )
+ }
+
+ override protected def createSerializer(): TypeSerializer[CRow] =
+ cRowType.createSerializer(new ExecutionConfig)
+
+ override protected def getSortedTestData: Array[CRow] = Array[CRow](
+ new CRow(Row.of(new JInt(1), "Hello", new JLong(1L)), true),
+ new CRow(Row.of(new JInt(1), "Hello", new JLong(2L)), true),
+ new CRow(Row.of(new JInt(2), "Hello", new JLong(2L)), false),
+ new CRow(Row.of(new JInt(2), "Hello", new JLong(3L)), true),
+ new CRow(Row.of(new JInt(3), "World", new JLong(0L)), false),
+ new CRow(Row.of(new JInt(4), "Hello", new JLong(0L)), true),
+ new CRow(Row.of(new JInt(5), "Hello", new JLong(1L)), true),
+ new CRow(Row.of(new JInt(5), "Hello", new JLong(4L)), false)
+ )
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index c5e13a1..79e957a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -18,7 +18,9 @@
package org.apache.flink.table.utils
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
@@ -36,4 +38,10 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
override protected def getBuiltInNormRuleSet: RuleSet = ???
override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
+
+ override protected def getConversionMapper[IN, OUT](
+ physicalTypeInfo: TypeInformation[IN],
+ logicalRowType: RelDataType,
+ requestedTypeInfo: TypeInformation[OUT],
+ functionName: String) = ???
}