You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:42 UTC

[flink] 09/11: [hotfix][table] Add convienient constructors for CRow

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61c1161860e6f500a5c11b08bc4f17231a2b79e0
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 20 12:14:44 2018 +0200

    [hotfix][table] Add convienient constructors for CRow
---
 .../apache/flink/table/runtime/types/CRow.scala    |   8 +-
 .../table/runtime/harness/JoinHarnessTest.scala    | 648 ++++++++++-----------
 .../runtime/harness/NonWindowHarnessTest.scala     |  88 +--
 .../runtime/harness/OverWindowHarnessTest.scala    | 444 +++++++-------
 4 files changed, 596 insertions(+), 592 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
index 25ec8c4..7e6f9b2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
@@ -49,7 +49,11 @@ object CRow {
     new CRow()
   }
 
-  def apply(row: Row, change: Boolean): CRow = {
-    new CRow(row, change)
+  def apply(values: Any*): CRow = {
+    new CRow(Row.of(values.map(_.asInstanceOf[Object]): _*), true)
+  }
+
+  def apply(change: Boolean, values: Any*): CRow = {
+    new CRow(Row.of(values.map(_.asInstanceOf[Object]): _*), change)
   }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index c499a9d..86133a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -167,28 +167,28 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a1"), change = true), 1))
+      CRow(1L: JLong, "1a1"), 1))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(2)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2a2"), change = true), 2))
+      CRow(2L: JLong, "2a2"), 2))
 
     // timers for key = 1 and key = 2
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a3"), change = true), 3))
+      CRow(1L: JLong, "1a3"), 3))
     assertEquals(4, testHarness.numKeyedStateEntries())
 
     // The number of timers won't increase.
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1b3"), change = true), 3))
+      CRow(1L: JLong, "1b3"), 3))
     testHarness.setProcessingTime(4)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2b4"), change = true), 4))
+      CRow(2L: JLong, "2b4"), 4))
 
     // The number of states should be doubled.
     assertEquals(8, testHarness.numKeyedStateEntries())
@@ -198,38 +198,38 @@ class JoinHarnessTest extends HarnessTestBase {
     // The left row (key = 1) with timestamp = 1 will be eagerly removed here.
     testHarness.setProcessingTime(13)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1b13"), change = true), 13))
+      CRow(1L: JLong, "1b13"), 13))
 
     // Test for +20 boundary (13 + 20 = 33).
     testHarness.setProcessingTime(33)
     assertEquals(4, testHarness.numKeyedStateEntries())
     assertEquals(2, testHarness.numProcessingTimeTimers())
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a33"), change = true), 33))
+      CRow(1L: JLong, "1a33"), 33))
 
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2a33"), change = true), 33))
+      CRow(2L: JLong, "2a33"), 33))
 
     // The left row (key = 2) with timestamp = 2 will be eagerly removed here.
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2b33"), change = true), 33))
+      CRow(2L: JLong, "2b33"), 33))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a1", 1L: JLong, "1b3"), change = true), 3))
+      CRow(1L: JLong, "1a1", 1L: JLong, "1b3"), 3))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a3", 1L: JLong, "1b3"), change = true), 3))
+      CRow(1L: JLong, "1a3", 1L: JLong, "1b3"), 3))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2a2", 2L: JLong, "2b4"), change = true), 4))
+      CRow(2L: JLong, "2a2", 2L: JLong, "2b4"), 4))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a3", 1L: JLong, "1b13"), change = true), 13))
+      CRow(1L: JLong, "1a3", 1L: JLong, "1b13"), 13))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a33", 1L: JLong, "1b13"), change = true), 33))
+      CRow(1L: JLong, "1a33", 1L: JLong, "1b13"), 33))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2a33", 2L: JLong, "2b33"), change = true), 33))
+      CRow(2L: JLong, "2a33", 2L: JLong, "2b33"), 33))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -257,19 +257,19 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a1"), change = true), 1))
+      CRow(1L: JLong, "1a1"), 1))
     testHarness.setProcessingTime(2)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2a2"), change = true), 2))
+      CRow(2L: JLong, "2a2"), 2))
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a3"), change = true), 3))
+      CRow(1L: JLong, "1a3"), 3))
     assertEquals(4, testHarness.numKeyedStateEntries())
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // All the right rows will not be cached.
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1b3"), change = true), 3))
+      CRow(1L: JLong, "1b3"), 3))
     assertEquals(4, testHarness.numKeyedStateEntries())
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
@@ -278,14 +278,14 @@ class JoinHarnessTest extends HarnessTestBase {
     // Meets a.proctime <= b.proctime - 5.
     // This row will only be joined without being cached (7 >= 7 - 5).
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2b7"), change = true), 7))
+      CRow(2L: JLong, "2b7"), 7))
     assertEquals(4, testHarness.numKeyedStateEntries())
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
     testHarness.setProcessingTime(12)
     // The left row (key = 1) with timestamp = 1 will be eagerly removed here.
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1b12"), change = true), 12))
+      CRow(1L: JLong, "1b12"), 12))
 
     // We add a delay (relativeWindowSize / 2) for cleaning up state.
     // No timers will be triggered here.
@@ -309,9 +309,9 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "2a2", 2L: JLong, "2b7"), change = true), 7))
+      CRow(2L: JLong, "2a2", 2L: JLong, "2b7"), 7))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "1a3", 1L: JLong, "1b12"), change = true), 12))
+      CRow(1L: JLong, "1a3", 1L: JLong, "1b12"), 12))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -344,24 +344,24 @@ class JoinHarnessTest extends HarnessTestBase {
 
     // Test late data.
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(1L: JLong, "k1"), change = true), 0))
+      CRow(1L: JLong, "k1"), 0))
 
     // Though (1L, "k1") is actually late, it will also be cached.
     assertEquals(1, testHarness.numEventTimeTimers())
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+      CRow(2L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+      CRow(2L: JLong, "k1"), 0))
 
     assertEquals(2, testHarness.numEventTimeTimers())
     assertEquals(4, testHarness.numKeyedStateEntries())
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(5L: JLong, "k1"), change = true), 0))
+      CRow(5L: JLong, "k1"), 0))
 
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(15L: JLong, "k1"), change = true), 0))
+      CRow(15L: JLong, "k1"), 0))
 
     testHarness.processWatermark1(new Watermark(20))
     testHarness.processWatermark2(new Watermark(20))
@@ -369,7 +369,7 @@ class JoinHarnessTest extends HarnessTestBase {
     assertEquals(4, testHarness.numKeyedStateEntries())
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(35L: JLong, "k1"), change = true), 0))
+      CRow(35L: JLong, "k1"), 0))
 
     // The right rows with timestamp = 2 and 5 will be removed here.
     // The left rows with timestamp = 2 and 15 will be removed here.
@@ -377,9 +377,9 @@ class JoinHarnessTest extends HarnessTestBase {
     testHarness.processWatermark2(new Watermark(38))
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(40L: JLong, "k2"), change = true), 0))
+      CRow(40L: JLong, "k2"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(39L: JLong, "k2"), change = true), 0))
+      CRow(39L: JLong, "k2"), 0))
 
     assertEquals(6, testHarness.numKeyedStateEntries())
 
@@ -393,19 +393,19 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new Watermark(-19))
     // This result is produced by the late row (1, "k1").
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "k1", 2L: JLong, "k1"), change = true), 0))
+      CRow(1L: JLong, "k1", 2L: JLong, "k1"), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "k1", 2L: JLong, "k1"), change = true), 0))
+      CRow(2L: JLong, "k1", 2L: JLong, "k1"), 0))
     expectedOutput.add(new StreamRecord(
-        CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), change = true), 0))
+        CRow(5L: JLong, "k1", 2L: JLong, "k1"), 0))
     expectedOutput.add(new StreamRecord(
-        CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), change = true), 0))
+        CRow(5L: JLong, "k1", 15L: JLong, "k1"), 0))
     expectedOutput.add(new Watermark(0))
     expectedOutput.add(new StreamRecord(
-        CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), change = true), 0))
+        CRow(35L: JLong, "k1", 15L: JLong, "k1"), 0))
     expectedOutput.add(new Watermark(18))
     expectedOutput.add(new StreamRecord(
-        CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), change = true), 0))
+        CRow(40L: JLong, "k2", 39L: JLong, "k2"), 0))
     expectedOutput.add(new Watermark(41))
 
     val result = testHarness.getOutput
@@ -443,7 +443,7 @@ class JoinHarnessTest extends HarnessTestBase {
 
     // This row will not be cached.
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+      CRow(2L: JLong, "k1"), 0))
 
     assertEquals(0, testHarness.numKeyedStateEntries())
 
@@ -451,19 +451,19 @@ class JoinHarnessTest extends HarnessTestBase {
     testHarness.processWatermark2(new Watermark(2))
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(3L: JLong, "k1"), change = true), 0))
+      CRow(3L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(3L: JLong, "k1"), change = true), 0))
+      CRow(3L: JLong, "k1"), 0))
 
     // Test for -10 boundary (13 - 10 = 3).
     // This row from the right stream will be cached.
     // The clean time for the left stream is 13 - 7 + 1 - 1 = 8
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(13L: JLong, "k1"), change = true), 0))
+      CRow(13L: JLong, "k1"), 0))
 
     // Test for -7 boundary (13 - 7 = 6).
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(6L: JLong, "k1"), change = true), 0))
+      CRow(6L: JLong, "k1"), 0))
 
     assertEquals(4, testHarness.numKeyedStateEntries())
 
@@ -484,9 +484,9 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new Watermark(-9))
     expectedOutput.add(new Watermark(-8))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(3L: JLong, "k1", 13L: JLong, "k1"), change = true), 0))
+      CRow(3L: JLong, "k1", 13L: JLong, "k1"), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6L: JLong, "k1", 13L: JLong, "k1"), change = true), 0))
+      CRow(6L: JLong, "k1", 13L: JLong, "k1"), 0))
     expectedOutput.add(new Watermark(0))
     expectedOutput.add(new Watermark(8))
 
@@ -521,9 +521,9 @@ class JoinHarnessTest extends HarnessTestBase {
     testHarness.open()
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(1L: JLong, "k1"), change = true), 0))
+      CRow(1L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(1L: JLong, "k2"), change = true), 0))
+      CRow(1L: JLong, "k2"), 0))
 
     assertEquals(2, testHarness.numEventTimeTimers())
     assertEquals(4, testHarness.numKeyedStateEntries())
@@ -543,9 +543,9 @@ class JoinHarnessTest extends HarnessTestBase {
     assertEquals(0, testHarness.numKeyedStateEntries())
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+      CRow(2L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k2"), change = true), 0))
+      CRow(2L: JLong, "k2"), 0))
 
     // The late rows with timestamp = 2 will not be cached, but a null padding result for the left
     // row will be emitted.
@@ -554,26 +554,26 @@ class JoinHarnessTest extends HarnessTestBase {
 
     // Make sure the common (inner) join can be performed.
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(19L: JLong, "k1"), change = true), 0))
+      CRow(19L: JLong, "k1"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(20L: JLong, "k1"), change = true), 0))
+      CRow(20L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(26L: JLong, "k1"), change = true), 0))
+      CRow(26L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(25L: JLong, "k1"), change = true), 0))
+      CRow(25L: JLong, "k1"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(21L: JLong, "k1"), change = true), 0))
+      CRow(21L: JLong, "k1"), 0))
 
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(39L: JLong, "k2"), change = true), 0))
+      CRow(39L: JLong, "k2"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(40L: JLong, "k2"), change = true), 0))
+      CRow(40L: JLong, "k2"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(50L: JLong, "k2"), change = true), 0))
+      CRow(50L: JLong, "k2"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(49L: JLong, "k2"), change = true), 0))
+      CRow(49L: JLong, "k2"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(41L: JLong, "k2"), change = true), 0))
+      CRow(41L: JLong, "k2"), 0))
 
     testHarness.processWatermark1(new Watermark(100))
     testHarness.processWatermark2(new Watermark(100))
@@ -581,26 +581,26 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
     // The timestamp 14 is set with the triggered timer.
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "k1", null: JLong, null: String), change = true), 14))
+      CRow(1L: JLong, "k1", null: JLong, null: String), 14))
     expectedOutput.add(new Watermark(5))
     expectedOutput.add(new Watermark(9))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "k1", null: JLong, null: String), change = true), 0))
+      CRow(2L: JLong, "k1", null: JLong, null: String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+      CRow(20L: JLong, "k1", 25L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(21L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+      CRow(21L: JLong, "k1", 25L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(21L: JLong, "k1", 26L: JLong, "k1": String), change = true), 0))
+      CRow(21L: JLong, "k1", 26L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(49L: JLong, "k2", 40L: JLong, "k2": String), change = true), 0))
+      CRow(49L: JLong, "k2", 40L: JLong, "k2": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(49L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+      CRow(49L: JLong, "k2", 41L: JLong, "k2": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(50L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+      CRow(50L: JLong, "k2", 41L: JLong, "k2": String), 0))
     // The timestamp 32 is set with the triggered timer.
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(19L: JLong, "k1", null: JLong, null: String), change = true), 32))
+      CRow(19L: JLong, "k1", null: JLong, null: String), 32))
     expectedOutput.add(new Watermark(91))
 
 
@@ -635,9 +635,9 @@ class JoinHarnessTest extends HarnessTestBase {
     testHarness.open()
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(1L: JLong, "k1"), change = true), 0))
+      CRow(1L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(1L: JLong, "k2"), change = true), 0))
+      CRow(1L: JLong, "k2"), 0))
 
     assertEquals(2, testHarness.numEventTimeTimers())
     assertEquals(4, testHarness.numKeyedStateEntries())
@@ -657,9 +657,9 @@ class JoinHarnessTest extends HarnessTestBase {
     assertEquals(0, testHarness.numKeyedStateEntries())
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+      CRow(2L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k2"), change = true), 0))
+      CRow(2L: JLong, "k2"), 0))
 
     // The late rows with timestamp = 2 will not be cached, but a null padding result for the right
     // row will be emitted.
@@ -668,26 +668,26 @@ class JoinHarnessTest extends HarnessTestBase {
 
     // Make sure the common (inner) join can be performed.
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(19L: JLong, "k1"), change = true), 0))
+      CRow(19L: JLong, "k1"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(20L: JLong, "k1"), change = true), 0))
+      CRow(20L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(26L: JLong, "k1"), change = true), 0))
+      CRow(26L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(25L: JLong, "k1"), change = true), 0))
+      CRow(25L: JLong, "k1"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(21L: JLong, "k1"), change = true), 0))
+      CRow(21L: JLong, "k1"), 0))
 
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(39L: JLong, "k2"), change = true), 0))
+      CRow(39L: JLong, "k2"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(40L: JLong, "k2"), change = true), 0))
+      CRow(40L: JLong, "k2"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(50L: JLong, "k2"), change = true), 0))
+      CRow(50L: JLong, "k2"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(49L: JLong, "k2"), change = true), 0))
+      CRow(49L: JLong, "k2"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(41L: JLong, "k2"), change = true), 0))
+      CRow(41L: JLong, "k2"), 0))
 
     testHarness.processWatermark1(new Watermark(100))
     testHarness.processWatermark2(new Watermark(100))
@@ -696,25 +696,25 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new Watermark(5))
     // The timestamp 18 is set with the triggered timer.
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JLong, null: String, 1L: JLong, "k2": String), change = true), 18))
+      CRow(null: JLong, null: String, 1L: JLong, "k2": String), 18))
     expectedOutput.add(new Watermark(9))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JLong, null: String, 2L: JLong, "k2": String), change = true), 0))
+      CRow(null: JLong, null: String, 2L: JLong, "k2": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+      CRow(20L: JLong, "k1", 25L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(21L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+      CRow(21L: JLong, "k1", 25L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(21L: JLong, "k1", 26L: JLong, "k1": String), change = true), 0))
+      CRow(21L: JLong, "k1", 26L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(49L: JLong, "k2", 40L: JLong, "k2": String), change = true), 0))
+      CRow(49L: JLong, "k2", 40L: JLong, "k2": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(49L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+      CRow(49L: JLong, "k2", 41L: JLong, "k2": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(50L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+      CRow(50L: JLong, "k2", 41L: JLong, "k2": String), 0))
     // The timestamp 56 is set with the triggered timer.
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JLong, null: String, 39L: JLong, "k2": String), change = true), 56))
+      CRow(null: JLong, null: String, 39L: JLong, "k2": String), 56))
     expectedOutput.add(new Watermark(91))
 
     val result = testHarness.getOutput
@@ -748,9 +748,9 @@ class JoinHarnessTest extends HarnessTestBase {
     testHarness.open()
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(1L: JLong, "k1"), change = true), 0))
+      CRow(1L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(1L: JLong, "k2"), change = true), 0))
+      CRow(1L: JLong, "k2"), 0))
 
     assertEquals(2, testHarness.numEventTimeTimers())
     assertEquals(4, testHarness.numKeyedStateEntries())
@@ -770,9 +770,9 @@ class JoinHarnessTest extends HarnessTestBase {
     assertEquals(0, testHarness.numKeyedStateEntries())
 
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+      CRow(2L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(2L: JLong, "k2"), change = true), 0))
+      CRow(2L: JLong, "k2"), 0))
 
     // The late rows with timestamp = 2 will not be cached, but a null padding result for the right
     // row will be emitted.
@@ -781,26 +781,26 @@ class JoinHarnessTest extends HarnessTestBase {
 
     // Make sure the common (inner) join can be performed.
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(19L: JLong, "k1"), change = true), 0))
+      CRow(19L: JLong, "k1"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(20L: JLong, "k1"), change = true), 0))
+      CRow(20L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(26L: JLong, "k1"), change = true), 0))
+      CRow(26L: JLong, "k1"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(25L: JLong, "k1"), change = true), 0))
+      CRow(25L: JLong, "k1"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(21L: JLong, "k1"), change = true), 0))
+      CRow(21L: JLong, "k1"), 0))
 
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(39L: JLong, "k2"), change = true), 0))
+      CRow(39L: JLong, "k2"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(40L: JLong, "k2"), change = true), 0))
+      CRow(40L: JLong, "k2"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(50L: JLong, "k2"), change = true), 0))
+      CRow(50L: JLong, "k2"), 0))
     testHarness.processElement1(new StreamRecord[CRow](
-      CRow(Row.of(49L: JLong, "k2"), change = true), 0))
+      CRow(49L: JLong, "k2"), 0))
     testHarness.processElement2(new StreamRecord[CRow](
-      CRow(Row.of(41L: JLong, "k2"), change = true), 0))
+      CRow(41L: JLong, "k2"), 0))
 
     testHarness.processWatermark1(new Watermark(100))
     testHarness.processWatermark2(new Watermark(100))
@@ -808,34 +808,34 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
     // The timestamp 14 is set with the triggered timer.
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "k1", null: JLong, null: String), change = true), 14))
+      CRow(1L: JLong, "k1", null: JLong, null: String), 14))
     expectedOutput.add(new Watermark(5))
     // The timestamp 18 is set with the triggered timer.
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JLong, null: String, 1L: JLong, "k2": String), change = true), 18))
+      CRow(null: JLong, null: String, 1L: JLong, "k2": String), 18))
     expectedOutput.add(new Watermark(9))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "k1", null: JLong, null: String), change = true), 0))
+      CRow(2L: JLong, "k1", null: JLong, null: String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JLong, null: String, 2L: JLong, "k2": String), change = true), 0))
+      CRow(null: JLong, null: String, 2L: JLong, "k2": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+      CRow(20L: JLong, "k1", 25L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(21L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+      CRow(21L: JLong, "k1", 25L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(21L: JLong, "k1", 26L: JLong, "k1": String), change = true), 0))
+      CRow(21L: JLong, "k1", 26L: JLong, "k1": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(49L: JLong, "k2", 40L: JLong, "k2": String), change = true), 0))
+      CRow(49L: JLong, "k2", 40L: JLong, "k2": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(49L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+      CRow(49L: JLong, "k2", 41L: JLong, "k2": String), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(50L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+      CRow(50L: JLong, "k2", 41L: JLong, "k2": String), 0))
     // The timestamp 32 is set with the triggered timer.
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(19L: JLong, "k1", null: JLong, null: String), change = true), 32))
+      CRow(19L: JLong, "k1", null: JLong, null: String), 32))
     // The timestamp 56 is set with the triggered timer.
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JLong, null: String, 39L: JLong, "k2": String), change = true), 56))
+      CRow(null: JLong, null: String, 39L: JLong, "k2": String), 56))
     expectedOutput.add(new Watermark(91))
 
     val result = testHarness.getOutput
@@ -880,37 +880,37 @@ class JoinHarnessTest extends HarnessTestBase {
     // left stream input
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     assertEquals(2, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(2)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
     assertEquals(2, testHarness.numProcessingTimeTimers())
     assertEquals(4, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     assertEquals(4, testHarness.numKeyedStateEntries())
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // right stream input and output normally
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "Hi1")))
     assertEquals(6, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(4)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "Hello1"), change = true)))
+      CRow(2: JInt, "Hello1")))
     assertEquals(8, testHarness.numKeyedStateEntries())
     assertEquals(4, testHarness.numProcessingTimeTimers())
 
     // expired left stream record with key value of 1
     testHarness.setProcessingTime(5)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = true)))
+      CRow(1: JInt, "Hi2")))
     assertEquals(6, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
 
@@ -936,13 +936,13 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), change = true)))
+      CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -974,43 +974,43 @@ class JoinHarnessTest extends HarnessTestBase {
     // left stream input
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     assertEquals(2, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(2)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
     assertEquals(2, testHarness.numProcessingTimeTimers())
     assertEquals(4, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     assertEquals(4, testHarness.numKeyedStateEntries())
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // right stream input and output normally
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "Hi1")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "Hi1")))
     assertEquals(5, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(4)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "Hello1"), change = true)))
+      CRow(2: JInt, "Hello1")))
     assertEquals(7, testHarness.numKeyedStateEntries())
     assertEquals(4, testHarness.numProcessingTimeTimers())
 
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     // expired left stream record with key value of 1
     testHarness.setProcessingTime(5)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = true)))
+      CRow(1: JInt, "Hi2")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = false)))
+      CRow(false, 1: JInt, "Hi2")))
     assertEquals(5, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
 
@@ -1029,11 +1029,11 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), change = true)))
+      CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -1066,43 +1066,43 @@ class JoinHarnessTest extends HarnessTestBase {
     // left stream input
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     assertEquals(2, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(2)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
     assertEquals(2, testHarness.numProcessingTimeTimers())
     assertEquals(4, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     assertEquals(4, testHarness.numKeyedStateEntries())
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // right stream input and output normally
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "Hi1")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "Hi1")))
     assertEquals(5, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(4)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "Hello1"), change = true)))
+      CRow(2: JInt, "Hello1")))
     assertEquals(7, testHarness.numKeyedStateEntries())
     assertEquals(4, testHarness.numProcessingTimeTimers())
 
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     // expired left stream record with key value of 1
     testHarness.setProcessingTime(5)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = true)))
+      CRow(1: JInt, "Hi2")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = false)))
+      CRow(false, 1: JInt, "Hi2")))
     assertEquals(5, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
 
@@ -1121,27 +1121,27 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+      CRow(1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+      CRow(1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(2: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+      CRow(1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = false)))
+      CRow(false, 2: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), change = true)))
+      CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "aaa", null: JInt, null)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -1174,19 +1174,19 @@ class JoinHarnessTest extends HarnessTestBase {
     // left stream input
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb"), change = true)))
+      CRow(1: JInt, "bbb")))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     // 1 left timer(5), 1 left key(1), 1 join cnt
     assertEquals(3, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(2)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
     assertEquals(2, testHarness.numProcessingTimeTimers())
     // 2 left timer(5,6), 2 left key(1,2), 2 join cnt
     assertEquals(6, testHarness.numKeyedStateEntries())
@@ -1194,35 +1194,35 @@ class JoinHarnessTest extends HarnessTestBase {
 
     // right stream input and output normally
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "Hi1")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb"), change = false)))
+      CRow(false, 1: JInt, "bbb")))
     // 2 left timer(5,6), 2 left keys(1,2), 2 join cnt, 1 right timer(7), 1 right key(1)
     assertEquals(8, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(4)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "ccc"), change = true)))
+      CRow(2: JInt, "ccc")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "Hello"), change = true)))
+      CRow(2: JInt, "Hello")))
     // 2 left timer(5,6), 2 left keys(1,2), 2 join cnt, 2 right timer(7,8), 2 right key(1,2)
     assertEquals(10, testHarness.numKeyedStateEntries())
     assertEquals(4, testHarness.numProcessingTimeTimers())
 
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = true)))
+      CRow(1: JInt, "Hi2")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = false)))
+      CRow(false, 1: JInt, "Hi2")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "Hi1")))
     // expired left stream record with key value of 1
     testHarness.setProcessingTime(5)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi3"), change = true)))
+      CRow(1: JInt, "Hi3")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi3"), change = false)))
+      CRow(false, 1: JInt, "Hi3")))
     // 1 left timer(6), 1 left keys(2), 1 join cnt, 2 right timer(7,8), 1 right key(2)
     assertEquals(6, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
@@ -1242,37 +1242,37 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+      CRow(1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+      CRow(1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(2: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "aaa", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "bbb", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = false)))
+      CRow(false, 2: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello"), change = true)))
+      CRow(2: JInt, "bbb", 2: JInt, "Hello")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "aaa", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi2"), change = true)))
+      CRow(1: JInt, "bbb", 1: JInt, "Hi2")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi2"), change = false)))
+      CRow(false, 1: JInt, "bbb", 1: JInt, "Hi2")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "bbb", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(1: JInt, "bbb", null: JInt, null)))
     verify(expectedOutput, result, new RowResultSortComparator())
 
     testHarness.close()
@@ -1304,43 +1304,43 @@ class JoinHarnessTest extends HarnessTestBase {
     // right stream input
     testHarness.setProcessingTime(1)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     assertEquals(2, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(2)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
     assertEquals(2, testHarness.numProcessingTimeTimers())
     assertEquals(4, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(3)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     assertEquals(4, testHarness.numKeyedStateEntries())
     assertEquals(2, testHarness.numProcessingTimeTimers())
 
     // left stream input and output normally
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "Hi1")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "Hi1")))
     assertEquals(5, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(4)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "Hello1"), change = true)))
+      CRow(2: JInt, "Hello1")))
     assertEquals(7, testHarness.numKeyedStateEntries())
     assertEquals(4, testHarness.numProcessingTimeTimers())
 
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     // expired right stream record with key value of 1
     testHarness.setProcessingTime(5)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = true)))
+      CRow(1: JInt, "Hi2")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = false)))
+      CRow(false, 1: JInt, "Hi2")))
     assertEquals(5, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
 
@@ -1359,27 +1359,27 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+      CRow(null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+      CRow(null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+      CRow(false, null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+      CRow(false, null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "Hi1", 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "Hi1", 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+      CRow(null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+      CRow(false, null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "Hello1", 2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "Hello1", 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+      CRow(false, null: JInt, null, 1: JInt, "aaa")))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -1412,19 +1412,19 @@ class JoinHarnessTest extends HarnessTestBase {
     // right stream input
     testHarness.setProcessingTime(1)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb"), change = true)))
+      CRow(1: JInt, "bbb")))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     // 1 right timer(5), 1 right key(1), 1 join cnt
     assertEquals(3, testHarness.numKeyedStateEntries())
     testHarness.setProcessingTime(2)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
     assertEquals(2, testHarness.numProcessingTimeTimers())
     // 2 right timer(5,6), 2 right key(1,2), 2 join cnt
     assertEquals(6, testHarness.numKeyedStateEntries())
@@ -1432,35 +1432,35 @@ class JoinHarnessTest extends HarnessTestBase {
 
     // left stream input and output normally
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = true)))
+      CRow(1: JInt, "Hi1")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb"), change = false)))
+      CRow(false, 1: JInt, "bbb")))
     // 2 right timer(5,6), 2 right keys(1,2), 2 join cnt, 1 left timer(7), 1 left key(1)
     assertEquals(8, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
     testHarness.setProcessingTime(4)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "ccc"), change = true)))
+      CRow(2: JInt, "ccc")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "Hello"), change = true)))
+      CRow(2: JInt, "Hello")))
     // 2 right timer(5,6), 2 right keys(1,2), 2 join cnt, 2 left timer(7,8), 2 left key(1,2)
     assertEquals(10, testHarness.numKeyedStateEntries())
     assertEquals(4, testHarness.numProcessingTimeTimers())
 
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = true)))
+      CRow(1: JInt, "Hi2")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2"), change = false)))
+      CRow(false, 1: JInt, "Hi2")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1"), change = false)))
+      CRow(false, 1: JInt, "Hi1")))
     // expired right stream record with key value of 1
     testHarness.setProcessingTime(5)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi3"), change = true)))
+      CRow(1: JInt, "Hi3")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi3"), change = false)))
+      CRow(false, 1: JInt, "Hi3")))
     // 1 right timer(6), 1 right keys(2), 1 join cnt, 2 left timer(7,8), 1 left key(2)
     assertEquals(6, testHarness.numKeyedStateEntries())
     assertEquals(3, testHarness.numProcessingTimeTimers())
@@ -1480,37 +1480,37 @@ class JoinHarnessTest extends HarnessTestBase {
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+      CRow(null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+      CRow(false, null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 1: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+      CRow(null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = false)))
+      CRow(false, null: JInt, null, 1: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+      CRow(false, null: JInt, null, 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "Hi1", 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1", 1: JInt, "bbb"), change = true)))
+      CRow(1: JInt, "Hi1", 1: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+      CRow(false, null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "Hello", 2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "Hello", 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "Hi1", 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2", 1: JInt, "bbb"), change = true)))
+      CRow(1: JInt, "Hi2", 1: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi2", 1: JInt, "bbb"), change = false)))
+      CRow(false, 1: JInt, "Hi2", 1: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "Hi1", 1: JInt, "bbb"), change = false)))
+      CRow(false, 1: JInt, "Hi1", 1: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 1: JInt, "bbb")))
     verify(expectedOutput, result, new RowResultSortComparator())
 
     testHarness.close()
@@ -1541,18 +1541,18 @@ class JoinHarnessTest extends HarnessTestBase {
     // left stream input
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb"), change = true)))
+      CRow(1: JInt, "bbb")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc"), change = true)))
+      CRow(1: JInt, "ccc")))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     // 1 left timer(5), 1 left key(1)
     assertEquals(2, testHarness.numKeyedStateEntries())
 
     testHarness.setProcessingTime(2)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "ccc"), change = true)))
+      CRow(2: JInt, "ccc")))
     assertEquals(2, testHarness.numProcessingTimeTimers())
     // 1 left timer(5), 1 left key(1)
     // 1 right timer(6), 1 right key(1)
@@ -1560,17 +1560,17 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa"), change = true)))
+      CRow(2: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "ddd"), change = true)))
+      CRow(2: JInt, "ddd")))
     assertEquals(3, testHarness.numProcessingTimeTimers())
     // 2 left timer(5,7), 2 left key(1,2)
     // 1 right timer(6), 1 right key(1)
     assertEquals(6, testHarness.numKeyedStateEntries())
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "ddd"), change = true)))
+      CRow(1: JInt, "ddd")))
     assertEquals(4, testHarness.numProcessingTimeTimers())
     // 2 left timer(5,7), 2 left key(1,2)
     // 2 right timer(6,7), 2 right key(1,2)
@@ -1578,13 +1578,13 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.setProcessingTime(4)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa"), change = false)))
+      CRow(false, 2: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "ddd"), change = false)))
+      CRow(false, 2: JInt, "ddd")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "ddd"), change = false)))
+      CRow(false, 1: JInt, "ddd")))
     assertEquals(4, testHarness.numProcessingTimeTimers())
     // 2 left timer(5,7), 1 left key(1)
     // 2 right timer(6,7), 1 right key(2)
@@ -1608,78 +1608,78 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.setProcessingTime(8)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb"), change = true)))
+      CRow(1: JInt, "bbb")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
 
     val result = testHarness.getOutput
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     // processing time 1
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+      CRow(1: JInt, "ccc", null: JInt, null)))
     // processing time 2
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "ccc")))
     // processing time 3
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+      CRow(false, null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = false)))
+      CRow(false, null: JInt, null, 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "aaa", 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = true)))
+      CRow(2: JInt, "aaa", 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "ddd", 2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "ddd", 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "ddd", 2: JInt, "ccc"), change = true)))
+      CRow(2: JInt, "ddd", 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "ccc", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "bbb", 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", 1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "ccc", 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = true)))
+      CRow(1: JInt, "bbb", 1: JInt, "ddd")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = true)))
+      CRow(1: JInt, "ccc", 1: JInt, "ddd")))
     // processing time 4
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = false)))
+      CRow(false, 2: JInt, "aaa", 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = false)))
+      CRow(false, 2: JInt, "aaa", 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "ddd", 2: JInt, "bbb"), change = false)))
+      CRow(false, 2: JInt, "ddd", 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "ddd", 2: JInt, "ccc"), change = false)))
+      CRow(false, 2: JInt, "ddd", 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "bbb", 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", 1: JInt, "aaa"), change = false)))
+      CRow(false, 1: JInt, "ccc", 1: JInt, "aaa")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = false)))
+      CRow(false, 1: JInt, "bbb", 1: JInt, "ddd")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = false)))
+      CRow(false, 1: JInt, "ccc", 1: JInt, "ddd")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+      CRow(1: JInt, "ccc", null: JInt, null)))
     // processing time 8
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "bbb")))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -1710,18 +1710,18 @@ class JoinHarnessTest extends HarnessTestBase {
     // left stream input
     testHarness.setProcessingTime(1)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb"), change = true)))
+      CRow(1: JInt, "bbb")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc"), change = true)))
+      CRow(1: JInt, "ccc")))
     assertEquals(1, testHarness.numProcessingTimeTimers())
     // 1 left timer(5), 1 left key(1), 1 left joincnt key(1)
     assertEquals(3, testHarness.numKeyedStateEntries())
 
     testHarness.setProcessingTime(2)
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "ccc"), change = true)))
+      CRow(2: JInt, "ccc")))
     assertEquals(2, testHarness.numProcessingTimeTimers())
     // 1 left timer(5), 1 left key(1), 1 left joincnt key(1)
     // 1 right timer(6), 1 right key(1), 1 right joincnt key(1)
@@ -1729,17 +1729,17 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.setProcessingTime(3)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa"), change = true)))
+      CRow(2: JInt, "aaa")))
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "ddd"), change = true)))
+      CRow(2: JInt, "ddd")))
     assertEquals(3, testHarness.numProcessingTimeTimers())
     // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
     // 1 right timer(6), 1 right key(1), 1 right joincnt key(1)
     assertEquals(9, testHarness.numKeyedStateEntries())
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "aaa"), change = true)))
+      CRow(1: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "ddd"), change = true)))
+      CRow(1: JInt, "ddd")))
     assertEquals(4, testHarness.numProcessingTimeTimers())
     // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
     // 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2)
@@ -1747,9 +1747,9 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.setProcessingTime(4)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa"), change = false)))
+      CRow(false, 2: JInt, "aaa")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(1: JInt, "ddd"), change = false)))
+      CRow(false, 1: JInt, "ddd")))
     assertEquals(4, testHarness.numProcessingTimeTimers())
     // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
     // 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2)
@@ -1773,68 +1773,68 @@ class JoinHarnessTest extends HarnessTestBase {
 
     testHarness.setProcessingTime(8)
     testHarness.processElement1(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb"), change = true)))
+      CRow(1: JInt, "bbb")))
     testHarness.processElement2(new StreamRecord(
-      CRow(Row.of(2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "bbb")))
 
     val result = testHarness.getOutput
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     // processing time 1
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+      CRow(1: JInt, "ccc", null: JInt, null)))
     // processing time 2
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "ccc")))
     // processing time 3
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+      CRow(false, null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = false)))
+      CRow(false, null: JInt, null, 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = true)))
+      CRow(2: JInt, "aaa", 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = true)))
+      CRow(2: JInt, "aaa", 2: JInt, "ccc")))
     // can not find matched row due to NonEquiJoinPred
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "ddd", null: JInt, null), change = true)))
+      CRow(2: JInt, "ddd", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = false)))
+      CRow(false, 1: JInt, "ccc", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = true)))
+      CRow(1: JInt, "bbb", 1: JInt, "ddd")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = true)))
+      CRow(1: JInt, "ccc", 1: JInt, "ddd")))
     // can not find matched row due to NonEquiJoinPred
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+      CRow(null: JInt, null, 1: JInt, "aaa")))
     // processing time 4
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = false)))
+      CRow(false, 2: JInt, "aaa", 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = false)))
+      CRow(false, 2: JInt, "aaa", 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "ccc")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = false)))
+      CRow(false, 1: JInt, "bbb", 1: JInt, "ddd")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = false)))
+      CRow(false, 1: JInt, "ccc", 1: JInt, "ddd")))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+      CRow(1: JInt, "ccc", null: JInt, null)))
     // processing time 8
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+      CRow(1: JInt, "bbb", null: JInt, null)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+      CRow(null: JInt, null, 2: JInt, "bbb")))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index 2f4bbfa..d6daa9e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -57,39 +57,39 @@ class NonWindowHarnessTest extends HarnessTestBase {
     // register cleanup timer with 3001
     testHarness.setProcessingTime(1)
 
-    testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(1L: JLong, 1: JInt, "aaa"), 1))
+    testHarness.processElement(new StreamRecord(CRow(2L: JLong, 1: JInt, "bbb"), 1))
     // reuse timer 3001
     testHarness.setProcessingTime(1000)
-    testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 1))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "aaa"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(3L: JLong, 2: JInt, "aaa"), 1))
+    testHarness.processElement(new StreamRecord(CRow(4L: JLong, 3: JInt, "aaa"), 1))
 
     // register cleanup timer with 4002
     testHarness.setProcessingTime(1002)
-    testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 1))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(5L: JLong, 4: JInt, "aaa"), 1))
+    testHarness.processElement(new StreamRecord(CRow(6L: JLong, 2: JInt, "bbb"), 1))
 
     // trigger cleanup timer and register cleanup timer with 7003
     testHarness.setProcessingTime(4003)
-    testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 1))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "aaa"), true), 1))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 1))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(7L: JLong, 5: JInt, "aaa"), 1))
+    testHarness.processElement(new StreamRecord(CRow(8L: JLong, 6: JInt, "aaa"), 1))
+    testHarness.processElement(new StreamRecord(CRow(9L: JLong, 7: JInt, "aaa"), 1))
+    testHarness.processElement(new StreamRecord(CRow(10L: JLong, 3: JInt, "bbb"), 1))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
-    expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 6: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 10: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 3: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 11: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt), true), 1))
+    expectedOutput.add(new StreamRecord(CRow(1L: JLong, 1: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(2L: JLong, 1: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(3L: JLong, 3: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(4L: JLong, 6: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(5L: JLong, 10: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(6L: JLong, 3: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(7L: JLong, 5: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(8L: JLong, 11: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(9L: JLong, 18: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(10L: JLong, 3: JInt), 1))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -117,38 +117,38 @@ class NonWindowHarnessTest extends HarnessTestBase {
     // register cleanup timer with 3001
     testHarness.setProcessingTime(1)
 
-    testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 2))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 3))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "ccc"), true), 4))
+    testHarness.processElement(new StreamRecord(CRow(1L: JLong, 1: JInt, "aaa"), 1))
+    testHarness.processElement(new StreamRecord(CRow(2L: JLong, 1: JInt, "bbb"), 2))
+    testHarness.processElement(new StreamRecord(CRow(3L: JLong, 2: JInt, "aaa"), 3))
+    testHarness.processElement(new StreamRecord(CRow(4L: JLong, 3: JInt, "ccc"), 4))
 
     // trigger cleanup timer and register cleanup timer with 6002
     testHarness.setProcessingTime(3002)
-    testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 5))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 6))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 7))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "eee"), true), 8))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 9))
-    testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 10))
+    testHarness.processElement(new StreamRecord(CRow(5L: JLong, 4: JInt, "aaa"), 5))
+    testHarness.processElement(new StreamRecord(CRow(6L: JLong, 2: JInt, "bbb"), 6))
+    testHarness.processElement(new StreamRecord(CRow(7L: JLong, 5: JInt, "aaa"), 7))
+    testHarness.processElement(new StreamRecord(CRow(8L: JLong, 6: JInt, "eee"), 8))
+    testHarness.processElement(new StreamRecord(CRow(9L: JLong, 7: JInt, "aaa"), 9))
+    testHarness.processElement(new StreamRecord(CRow(10L: JLong, 3: JInt, "bbb"), 10))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
-    expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 2))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 1: JInt), false), 3))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 3))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt), true), 4))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt), true), 5))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt), true), 6))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 4: JInt), false), 7))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 9: JInt), true), 7))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt), true), 8))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 9: JInt), false), 9))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 16: JInt), true), 9))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 2: JInt), false), 10))
-    expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 5: JInt), true), 10))
+    expectedOutput.add(new StreamRecord(CRow(1L: JLong, 1: JInt), 1))
+    expectedOutput.add(new StreamRecord(CRow(2L: JLong, 1: JInt), 2))
+    expectedOutput.add(new StreamRecord(CRow(false, 3L: JLong, 1: JInt), 3))
+    expectedOutput.add(new StreamRecord(CRow(3L: JLong, 3: JInt), 3))
+    expectedOutput.add(new StreamRecord(CRow(4L: JLong, 3: JInt), 4))
+    expectedOutput.add(new StreamRecord(CRow(5L: JLong, 4: JInt), 5))
+    expectedOutput.add(new StreamRecord(CRow(6L: JLong, 2: JInt), 6))
+    expectedOutput.add(new StreamRecord(CRow(false, 7L: JLong, 4: JInt), 7))
+    expectedOutput.add(new StreamRecord(CRow(7L: JLong, 9: JInt), 7))
+    expectedOutput.add(new StreamRecord(CRow(8L: JLong, 6: JInt), 8))
+    expectedOutput.add(new StreamRecord(CRow(false, 9L: JLong, 9: JInt), 9))
+    expectedOutput.add(new StreamRecord(CRow(9L: JLong, 16: JInt), 9))
+    expectedOutput.add(new StreamRecord(CRow(false, 10L: JLong, 2: JInt), 10))
+    expectedOutput.add(new StreamRecord(CRow(10L: JLong, 5: JInt), 10))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 7ad64c6..63d7b5d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -60,75 +60,75 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1)
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 1L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 1L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "bbb", 10L: JLong), change = true)))
+      CRow(1L: JLong, "bbb", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 2L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 2L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 3L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 3L: JLong)))
 
     // register cleanup timer with 4100
     testHarness.setProcessingTime(1100)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "bbb", 20L: JLong), change = true)))
+      CRow(1L: JLong, "bbb", 20L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 4L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 4L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 5L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 5L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 6L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 6L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1L: JLong, "bbb", 30L: JLong), change = true)))
+      CRow(1L: JLong, "bbb", 30L: JLong)))
 
     // register cleanup timer with 6001
     testHarness.setProcessingTime(3001)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 7L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 7L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 8L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 8L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 9L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 9L: JLong)))
 
     // trigger cleanup timer and register cleanup timer with 9002
     testHarness.setProcessingTime(6002)
     testHarness.processElement(new StreamRecord(
-        CRow(Row.of(2L: JLong, "aaa", 10L: JLong), change = true)))
+        CRow(2L: JLong, "aaa", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2L: JLong, "bbb", 40L: JLong), change = true)))
+      CRow(2L: JLong, "bbb", 40L: JLong)))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(1L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 3L: JLong, 2L: JLong, 3L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+      CRow(1L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 4L: JLong, 3L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 5L: JLong, 4L: JLong, 5L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), change = true)))
+      CRow(1L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), change = true)))
+      CRow(1L: JLong, "bbb", 30L: JLong, 20L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 7L: JLong, 6L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 8L: JLong, 7L: JLong, 8L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 9L: JLong, 8L: JLong, 9L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+      CRow(2L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -160,51 +160,51 @@ class OverWindowHarnessTest extends HarnessTestBase{
     // register cleanup timer with 3003
     testHarness.setProcessingTime(3)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 1L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 1L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 10L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 10L: JLong)))
 
     testHarness.setProcessingTime(4)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 2L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 2L: JLong)))
 
     // trigger cleanup timer and register cleanup timer with 6003
     testHarness.setProcessingTime(3003)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 3L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 3L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 20L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 20L: JLong)))
 
     testHarness.setProcessingTime(5)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 4L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 4L: JLong)))
 
     // register cleanup timer with 9002
     testHarness.setProcessingTime(6002)
 
     testHarness.setProcessingTime(7002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 5L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 5L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 6L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 6L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 30L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 30L: JLong)))
 
     // register cleanup timer with 14002
     testHarness.setProcessingTime(11002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 7L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 7L: JLong)))
 
     testHarness.setProcessingTime(11004)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 8L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 8L: JLong)))
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 9L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 9L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 10L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 40L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 40L: JLong)))
 
     testHarness.setProcessingTime(11006)
 
@@ -213,12 +213,12 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // timer registered for 23000
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "ccc", 10L: JLong), change = true)))
+      CRow(0L: JLong, "ccc", 10L: JLong)))
 
     // update clean-up timer to 25500. Previous timer should not clean up
     testHarness.setProcessingTime(22500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "ccc", 20L: JLong), change = true)))
+      CRow(0L: JLong, "ccc", 20L: JLong)))
 
     // 23000 clean-up timer should fire but not fail with an NPE
     testHarness.setProcessingTime(23001)
@@ -229,37 +229,37 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same proc timestamp have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 3L: JLong, 3L: JLong, 4L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 3L: JLong, 3L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 20L: JLong, 20L: JLong, 20L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 20L: JLong, 20L: JLong, 20L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 4L: JLong, 4L: JLong, 4L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 4L: JLong, 4L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 5L: JLong, 5L: JLong, 6L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 5L: JLong, 5L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 30L: JLong, 30L: JLong, 30L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 30L: JLong, 30L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 7L: JLong, 7L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 8L: JLong, 7L: JLong, 10L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 8L: JLong, 7L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 9L: JLong, 7L: JLong, 10L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 9L: JLong, 7L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 10L: JLong, 7L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "ccc", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(0L: JLong, "ccc", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "ccc", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+      CRow(0L: JLong, "ccc", 20L: JLong, 10L: JLong, 20L: JLong)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
 
@@ -287,69 +287,69 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1003)
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 1L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 1L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 10L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 2L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 2L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 3L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 3L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 20L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 20L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 4L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 4L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 5L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 5L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 6L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 6L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 30L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 30L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 7L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 7L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 8L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 8L: JLong)))
 
     // trigger cleanup timer and register cleanup timer with 8003
     testHarness.setProcessingTime(5003)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 9L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 9L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 10L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 40L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 40L: JLong)))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 9L: JLong, 9L: JLong, 9L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 9L: JLong, 9L: JLong, 9L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), change = true)))
+      CRow(0L: JLong, "aaa", 10L: JLong, 9L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+      CRow(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -380,51 +380,51 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(1)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 1L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 1L: JLong)))
 
     testHarness.processWatermark(2)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(3L: JLong, "bbb", 10L: JLong), change = true)))
+      CRow(3L: JLong, "bbb", 10L: JLong)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 2L: JLong)))
 
     testHarness.processWatermark(4001)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4002L: JLong, "aaa", 3L: JLong), change = true)))
+      CRow(4002L: JLong, "aaa", 3L: JLong)))
 
     testHarness.processWatermark(4002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4003L: JLong, "aaa", 4L: JLong), change = true)))
+      CRow(4003L: JLong, "aaa", 4L: JLong)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4801L: JLong, "bbb", 25L: JLong), change = true)))
+      CRow(4801L: JLong, "bbb", 25L: JLong)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 5L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 6L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
+      CRow(6501L: JLong, "bbb", 30L: JLong)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
+      CRow(7001L: JLong, "aaa", 7L: JLong)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
+      CRow(8001L: JLong, "aaa", 8L: JLong)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 9L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
+      CRow(12001L: JLong, "bbb", 40L: JLong)))
 
     testHarness.processWatermark(19000)
 
@@ -434,10 +434,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is removed after max retention time
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 3000
+      CRow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 4500
+      CRow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
     testHarness.processWatermark(20010) // compute output
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
@@ -449,7 +449,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is only removed if all data was processed
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20011L: JLong, "ccc", 3L: JLong), change = true))) // clean-up 6500
+      CRow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
     testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
@@ -469,40 +469,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same row-time have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(2L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(3L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(3L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4002L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+      CRow(4002L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4003L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), change = true)))
+      CRow(4003L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4801L: JLong, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), change = true)))
+      CRow(4801L: JLong, "bbb", 25L: JLong, 25L: JLong, 25L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 5L: JLong, 2L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 6L: JLong, 2L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), change = true)))
+      CRow(7001L: JLong, "aaa", 7L: JLong, 2L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), change = true)))
+      CRow(8001L: JLong, "aaa", 8L: JLong, 2L: JLong, 8L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), change = true)))
+      CRow(6501L: JLong, "bbb", 30L: JLong, 25L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 9L: JLong, 8L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+      CRow(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), change = true)))
+      CRow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -530,47 +530,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
+      CRow(801L: JLong, "aaa", 1L: JLong)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
+      CRow(2501L: JLong, "bbb", 10L: JLong)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 2L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 3L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
+      CRow(4001L: JLong, "bbb", 20L: JLong)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
+      CRow(4801L: JLong, "aaa", 4L: JLong)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 5L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 6L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
+      CRow(6501L: JLong, "bbb", 30L: JLong)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
+      CRow(7001L: JLong, "aaa", 7L: JLong)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
+      CRow(8001L: JLong, "aaa", 8L: JLong)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 9L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
+      CRow(12001L: JLong, "bbb", 40L: JLong)))
 
     testHarness.processWatermark(19000)
 
@@ -580,10 +580,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is removed after max retention time
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 3000
+      CRow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 4500
+      CRow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
     testHarness.processWatermark(20010) // compute output
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
@@ -594,7 +594,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is only removed if all data was processed
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20011L: JLong, "ccc", 3L: JLong), change = true))) // clean-up 6500
+      CRow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
     testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
@@ -614,40 +614,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+      CRow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), change = true)))
+      CRow(4801L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 5L: JLong, 3L: JLong, 5L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 6L: JLong, 4L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
+      CRow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), change = true)))
+      CRow(7001L: JLong, "aaa", 7L: JLong, 5L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), change = true)))
+      CRow(8001L: JLong, "aaa", 8L: JLong, 6L: JLong, 8L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 9L: JLong, 7L: JLong, 9L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), change = true)))
+      CRow(12001L: JLong, "bbb", 40L: JLong, 20L: JLong, 40L: JLong)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), change = true)))
+      CRow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -678,47 +678,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1000)
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
+      CRow(801L: JLong, "aaa", 1L: JLong)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
+      CRow(2501L: JLong, "bbb", 10L: JLong)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 2L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 3L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
+      CRow(4001L: JLong, "bbb", 20L: JLong)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
+      CRow(4801L: JLong, "aaa", 4L: JLong)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 5L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 6L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
+      CRow(6501L: JLong, "bbb", 30L: JLong)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
+      CRow(7001L: JLong, "aaa", 7L: JLong)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
+      CRow(8001L: JLong, "aaa", 8L: JLong)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 9L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
+      CRow(12001L: JLong, "bbb", 40L: JLong)))
 
     testHarness.processWatermark(19000)
 
@@ -731,13 +731,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20000L: JLong, "ccc", 1L: JLong), change = true))) // test for late data
+      CRow(20000L: JLong, "ccc", 1L: JLong))) // test for late data
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
+      CRow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 5000
+      CRow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 5000
 
     assert(testHarness.numKeyedStateEntries() > 0)
     testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
@@ -755,38 +755,38 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same row-time have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 3L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+      CRow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
+      CRow(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
+      CRow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
+      CRow(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
+      CRow(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), change = true)))
+      CRow(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
@@ -814,47 +814,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1000)
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
+      CRow(801L: JLong, "aaa", 1L: JLong)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
+      CRow(2501L: JLong, "bbb", 10L: JLong)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 2L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 3L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
+      CRow(4001L: JLong, "bbb", 20L: JLong)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
+      CRow(4801L: JLong, "aaa", 4L: JLong)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 5L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 6L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
+      CRow(6501L: JLong, "bbb", 30L: JLong)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
+      CRow(7001L: JLong, "aaa", 7L: JLong)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
+      CRow(8001L: JLong, "aaa", 8L: JLong)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 9L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 10L: JLong)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
+      CRow(12001L: JLong, "bbb", 40L: JLong)))
 
     testHarness.processWatermark(19000)
 
@@ -867,13 +867,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20000L: JLong, "ccc", 2L: JLong), change = true))) // test for late data
+      CRow(20000L: JLong, "ccc", 2L: JLong))) // test for late data
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
+      CRow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 5000
+      CRow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 5000
 
     assert(testHarness.numKeyedStateEntries() > 0)
     testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
@@ -890,38 +890,38 @@ class OverWindowHarnessTest extends HarnessTestBase{
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+      CRow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+      CRow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+      CRow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
+      CRow(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
+      CRow(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
+      CRow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
+      CRow(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
+      CRow(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 9L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), change = true)))
+      CRow(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), change = true)))
+      CRow(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+      CRow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+      CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
 
     verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()