You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:52 UTC

[45/50] [abbrv] flink git commit: [FLINK-6091] [table] Implement and turn on retraction for non-windowed aggregates.

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index e61e190..c7c553b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTest
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -164,6 +165,25 @@ class TableEnvironmentITCase(
   }
 
   @Test
+  def testToDataSetWithTypeOfCRow(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
+
+    val expected = "+1,1,Hi\n" + "+2,2,Hello\n" + "+3,2,Hello world\n" +
+      "+4,3,Hello world, how are you?\n" + "+5,3,I am fine.\n" + "+6,3,Luke Skywalker\n" +
+      "+7,4,Comment#1\n" + "+8,4,Comment#2\n" + "+9,4,Comment#3\n" + "+10,4,Comment#4\n" +
+      "+11,5,Comment#5\n" + "+12,5,Comment#6\n" + "+13,5,Comment#7\n" + "+14,5,Comment#8\n" +
+      "+15,5,Comment#9\n" + "+16,6,Comment#10\n" + "+17,6,Comment#11\n" + "+18,6,Comment#12\n" +
+      "+19,6,Comment#13\n" + "+20,6,Comment#14\n" + "+21,6,Comment#15\n"
+    val results = t.toDataSet[CRow].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testToTableFromCaseClass(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
new file mode 100644
index 0000000..62fcfcd
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+    ("Hello", 1),
+    ("word", 1),
+    ("Hello", 1),
+    ("bark", 1),
+    ("bark", 1),
+    ("bark", 1),
+    ("bark", 1),
+    ("bark", 1),
+    ("bark", 1),
+    ("flink", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+    env.setStateBackend(getStateBackend)
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'word, 'num)
+    val resultTable = table
+      .groupBy('word)
+      .select('word as 'word, 'num.sum as 'count)
+      .groupBy('count)
+      .select('count, 'word.count as 'frequency)
+
+    // to DataStream with CRow
+    val results = resultTable.toDataStream[CRow]
+    results.addSink(new StreamITCase.StringSinkWithCRow)
+    env.execute()
+
+    val expected = Seq("+1,1", "+1,2", "+1,1", "+2,1", "+1,2", "+1,1", "+2,2", "+2,1", "+3,1",
+      "+3,0", "+4,1", "+4,0", "+5,1", "+5,0", "+6,1", "+1,2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+    env.setStateBackend(getStateBackend)
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'word, 'num)
+    val resultTable = table
+      .groupBy('word)
+      .select('word as 'word, 'num.sum as 'count)
+      .select('count.sum)
+
+    val results = resultTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("1", "2", "1", "3", "4", "3", "5", "3", "6", "3", "7", "3", "8", "3", "9",
+      "10")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // non-keyed groupby + keyed groupby
+  @Test
+  def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+    env.setStateBackend(getStateBackend)
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'word, 'num)
+    val resultTable = table
+      .select('num.sum as 'count)
+      .groupBy('count)
+      .select('count, 'count.count)
+
+    val results = resultTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1", "4,0", "5,1", "5,0", "6," +
+      "1", "6,0", "7,1", "7,0", "8,1", "8,0", "9,1", "9,0", "10,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // test unique process, if the current output message of unbounded groupby equals the
+  // previous message, unbounded groupby will ignore the current one.
+  @Test
+  def testUniqueProcess(): Unit = {
+    // data input
+    val data = List(
+      (1, 1L),
+      (2, 2L),
+      (3, 3L),
+      (3, 3L),
+      (4, 1L),
+      (4, 0L),
+      (4, 0L),
+      (4, 0L),
+      (5, 1L),
+      (6, 6L),
+      (6, 6L),
+      (6, 6L),
+      (7, 8L)
+    )
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+    env.setStateBackend(getStateBackend)
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'pk, 'value)
+    val resultTable = table
+      .groupBy('pk)
+      .select('pk as 'pk, 'value.sum as 'sum)
+      .groupBy('sum)
+      .select('sum, 'pk.count as 'count)
+
+    val results = resultTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("1,1", "2,1", "3,1", "3,0", "6,1", "1,2", "1,3", "6,2", "6,1", "12,1","12," +
+      "0", "18,1", "8,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // correlate should handle retraction messages correctly
+  @Test
+  def testCorrelate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+    env.setStateBackend(getStateBackend)
+
+    val func0 = new TableFunc0
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'word, 'num)
+    val resultTable = table
+      .groupBy('word)
+      .select('word as 'word, 'num.sum as 'count)
+      .leftOuterJoin(func0('word))
+      .groupBy('count)
+      .select('count, 'word.count as 'frequency)
+
+    val results = resultTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0", "4,1", "4,0", "5,1",
+      "5,0", "6,1", "1,2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + over agg(unbounded, procTime, keyed)
+  @Test(expected = classOf[TableException])
+  def testGroupByAndUnboundPartitionedProcessingWindowWithRow(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT word, cnt, count(word) " +
+      "OVER (PARTITION BY cnt ORDER BY ProcTime() " +
+      "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "FROM " +
+      "(SELECT word, count(number) as cnt from T1 group by word) "
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+  }
+
+  // keyed groupby + over agg(unbounded, procTime, non-keyed)
+  @Test(expected = classOf[TableException])
+  def testGroupByAndUnboundNonPartitionedProcessingWindowWithRow(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT word, cnt, count(word) " +
+      "OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "FROM (SELECT word , count(number) as cnt from T1 group by word) "
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+  }
+
+
+
+  // groupby + window agg
+  @Test(expected = classOf[TableException])
+  def testGroupByAndProcessingTimeSlidingGroupWindow(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+    env.setParallelism(1)
+    env.setStateBackend(getStateBackend)
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'word, 'num)
+    val windowedTable = table
+      .groupBy('word)
+      .select('word as 'word, 'num.sum as 'count)
+      .window(Tumble over 2.rows as 'w)
+      .groupBy('w, 'count)
+      .select('count, 'word.count)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+  }
+
+  // groupby + over agg(rowTime)
+  @Test(expected = classOf[TableException])
+  def testEventTimeOverWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT word, cnt, count(word) " +
+      "OVER (ORDER BY rowtime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+      "FROM (SELECT word, count(number) as cnt from T1 group by word) "
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+  }
+
+  // groupby + over agg(bounded)
+  @Test(expected = classOf[TableException])
+  def testBoundedOverWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT word, cnt, count(word) " +
+      "OVER (ORDER BY ProcTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+      "FROM (SELECT word, count(number) as cnt from T1 group by word) "
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
index cdc4329..c446d64 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
@@ -59,5 +59,5 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
 
     TestBaseUtils.compareResultsByLinesInMemory(expected, path)
   }
-
+  
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index f826bba..6c75d53 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -25,6 +25,7 @@ import org.junit.Assert._
 
 import scala.collection.mutable
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.runtime.types.CRow
 
 import scala.collection.JavaConverters._
 
@@ -44,7 +45,15 @@ object StreamITCase {
   final class StringSink extends RichSinkFunction[Row]() {
     def invoke(value: Row) {
       testResults.synchronized {
-        testResults += value.toString 
+        testResults += value.toString
+      }
+    }
+  }
+
+  final class StringSinkWithCRow extends RichSinkFunction[CRow]() {
+    def invoke(value: CRow) {
+      testResults.synchronized {
+        testResults += value.toString
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
index 5e3e995..eadcfc8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
@@ -33,6 +33,7 @@ import org.apache.flink.table.codegen.GeneratedAggregationsFunction
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
 import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 import org.junit.Test
 
@@ -41,13 +42,13 @@ class BoundedProcessingOverRangeProcessFunctionTest {
   @Test
   def testProcTimePartitionedOverRange(): Unit = {
 
-    val rT =  new RowTypeInfo(Array[TypeInformation[_]](
+    val rT =  new CRowTypeInfo(new RowTypeInfo(Array[TypeInformation[_]](
       INT_TYPE_INFO,
       LONG_TYPE_INFO,
       INT_TYPE_INFO,
       STRING_TYPE_INFO,
       LONG_TYPE_INFO),
-      Array("a", "b", "c", "d", "e"))
+      Array("a", "b", "c", "d", "e")))
 
     val aggregates =
       Array(new LongMinWithRetractAggFunction,
@@ -183,14 +184,14 @@ class BoundedProcessingOverRangeProcessFunctionTest {
     val funcName = "BoundedOverAggregateHelper$33"
 
     val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
-    val processFunction = new KeyedProcessOperator[String, Row, Row](
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
       new ProcTimeBoundedRangeOver(
         genAggFunction,
         1000,
         aggregationStateType,
         rT))
 
-    val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, Row, Row](
+    val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, CRow, CRow](
       processFunction,
       new TupleRowSelector(0),
       BasicTypeInfo.INT_TYPE_INFO)
@@ -201,26 +202,26 @@ class BoundedProcessingOverRangeProcessFunctionTest {
     testHarness.setProcessingTime(3)
     // key = 1
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
     // key = 2
     testHarness.processElement(new StreamRecord(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), 0))
+      new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
 
     // Time = 4
     testHarness.setProcessingTime(4)
     // key = 1
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
     // key = 2
     testHarness.processElement(new StreamRecord(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), 0))
+      new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
 
     // Time = 5
     testHarness.setProcessingTime(5)
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
 
     // Time = 6
     testHarness.setProcessingTime(6)
@@ -229,33 +230,33 @@ class BoundedProcessingOverRangeProcessFunctionTest {
     testHarness.setProcessingTime(1002)
     // key = 1
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
     // key = 2
     testHarness.processElement(new StreamRecord(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), 0))
+      new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
 
     // Time = 1003
     testHarness.setProcessingTime(1003)
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
 
     // Time = 1004
     testHarness.setProcessingTime(1004)
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
 
     // Time = 1005
     testHarness.setProcessingTime(1005)
     // key = 1
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), 0))
+      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0))
     // key = 2
     testHarness.processElement(new StreamRecord(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), 0))
+      new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
 
     testHarness.setProcessingTime(1006)
 
@@ -264,34 +265,34 @@ class BoundedProcessingOverRangeProcessFunctionTest {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     // all elements at the same proc timestamp have the same value
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), 4))
-    expectedOutput.add(new StreamRecord(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 4))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), 5))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), 5))
-    expectedOutput.add(new StreamRecord(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 5))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), 6))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), 1003))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), 1003))
-    expectedOutput.add(new StreamRecord(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 1003))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), 1004))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), 1005))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), 1006))
-    expectedOutput.add(new StreamRecord(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), 1006))
-    expectedOutput.add(new StreamRecord(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), 1006))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006))
 
     TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.",
         expectedOutput, result, new RowResultSortComparator(6))
@@ -304,7 +305,7 @@ class BoundedProcessingOverRangeProcessFunctionTest {
 object BoundedProcessingOverRangeProcessFunctionTest {
 
 /**
- * Return 0 for equal Rows and non zero for different rows
+ * Return 0 for equal CRows and non zero for different CRows
  */
 class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable {
 
@@ -314,8 +315,8 @@ class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with
         // watermark is not expected
          -1
        } else {
-        val row1 = o1.asInstanceOf[StreamRecord[Row]].getValue
-        val row2 = o2.asInstanceOf[StreamRecord[Row]].getValue
+        val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue
+        val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue
         row1.toString.compareTo(row2.toString)
       }
    }
@@ -325,10 +326,10 @@ class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with
  * Simple test class that returns a specified field as the selector function
  */
 class TupleRowSelector(
-    private val selectorField:Int) extends KeySelector[Row, Integer] {
+    private val selectorField:Int) extends KeySelector[CRow, Integer] {
 
-  override def getKey(value: Row): Integer = {
-    value.getField(selectorField).asInstanceOf[Integer]
+  override def getKey(value: CRow): Integer = {
+    value.row.getField(selectorField).asInstanceOf[Integer]
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala
new file mode 100644
index 0000000..e574084
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import java.lang.{Integer => JInt, Long => JLong}
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+
+class CRowComparatorTest extends ComparatorTestBase[CRow] {
+
+  val rowType = new RowTypeInfo(
+    BasicTypeInfo.INT_TYPE_INFO,
+    BasicTypeInfo.STRING_TYPE_INFO,
+    BasicTypeInfo.LONG_TYPE_INFO
+  )
+
+  val cRowType = new CRowTypeInfo(rowType)
+
+  override protected def createComparator(asc: Boolean): TypeComparator[CRow] = {
+    cRowType.createComparator(
+      Array[Int](0, 2),
+      Array[Boolean](asc, asc),
+      0,
+      new ExecutionConfig
+    )
+  }
+
+  override protected def createSerializer(): TypeSerializer[CRow] =
+    cRowType.createSerializer(new ExecutionConfig)
+
+  override protected def getSortedTestData: Array[CRow] = Array[CRow](
+    new CRow(Row.of(new JInt(1), "Hello", new JLong(1L)), true),
+    new CRow(Row.of(new JInt(1), "Hello", new JLong(2L)), true),
+    new CRow(Row.of(new JInt(2), "Hello", new JLong(2L)), false),
+    new CRow(Row.of(new JInt(2), "Hello", new JLong(3L)), true),
+    new CRow(Row.of(new JInt(3), "World", new JLong(0L)), false),
+    new CRow(Row.of(new JInt(4), "Hello", new JLong(0L)), true),
+    new CRow(Row.of(new JInt(5), "Hello", new JLong(1L)), true),
+    new CRow(Row.of(new JInt(5), "Hello", new JLong(4L)), false)
+  )
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index c5e13a1..79e957a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.utils
 
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
@@ -36,4 +38,10 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
   override protected def getBuiltInNormRuleSet: RuleSet = ???
 
   override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
+
+  override protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String) = ???
 }