You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:42 UTC
[flink] 09/11: [hotfix][table] Add convienient constructors for CRow
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 61c1161860e6f500a5c11b08bc4f17231a2b79e0
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 20 12:14:44 2018 +0200
[hotfix][table] Add convienient constructors for CRow
---
.../apache/flink/table/runtime/types/CRow.scala | 8 +-
.../table/runtime/harness/JoinHarnessTest.scala | 648 ++++++++++-----------
.../runtime/harness/NonWindowHarnessTest.scala | 88 +--
.../runtime/harness/OverWindowHarnessTest.scala | 444 +++++++-------
4 files changed, 596 insertions(+), 592 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
index 25ec8c4..7e6f9b2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
@@ -49,7 +49,11 @@ object CRow {
new CRow()
}
- def apply(row: Row, change: Boolean): CRow = {
- new CRow(row, change)
+ def apply(values: Any*): CRow = {
+ new CRow(Row.of(values.map(_.asInstanceOf[Object]): _*), true)
+ }
+
+ def apply(change: Boolean, values: Any*): CRow = {
+ new CRow(Row.of(values.map(_.asInstanceOf[Object]): _*), change)
}
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index c499a9d..86133a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -167,28 +167,28 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a1"), change = true), 1))
+ CRow(1L: JLong, "1a1"), 1))
assertEquals(1, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(2)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2L: JLong, "2a2"), change = true), 2))
+ CRow(2L: JLong, "2a2"), 2))
// timers for key = 1 and key = 2
assertEquals(2, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a3"), change = true), 3))
+ CRow(1L: JLong, "1a3"), 3))
assertEquals(4, testHarness.numKeyedStateEntries())
// The number of timers won't increase.
assertEquals(2, testHarness.numProcessingTimeTimers())
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1L: JLong, "1b3"), change = true), 3))
+ CRow(1L: JLong, "1b3"), 3))
testHarness.setProcessingTime(4)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2L: JLong, "2b4"), change = true), 4))
+ CRow(2L: JLong, "2b4"), 4))
// The number of states should be doubled.
assertEquals(8, testHarness.numKeyedStateEntries())
@@ -198,38 +198,38 @@ class JoinHarnessTest extends HarnessTestBase {
// The left row (key = 1) with timestamp = 1 will be eagerly removed here.
testHarness.setProcessingTime(13)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1L: JLong, "1b13"), change = true), 13))
+ CRow(1L: JLong, "1b13"), 13))
// Test for +20 boundary (13 + 20 = 33).
testHarness.setProcessingTime(33)
assertEquals(4, testHarness.numKeyedStateEntries())
assertEquals(2, testHarness.numProcessingTimeTimers())
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a33"), change = true), 33))
+ CRow(1L: JLong, "1a33"), 33))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2L: JLong, "2a33"), change = true), 33))
+ CRow(2L: JLong, "2a33"), 33))
// The left row (key = 2) with timestamp = 2 will be eagerly removed here.
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2L: JLong, "2b33"), change = true), 33))
+ CRow(2L: JLong, "2b33"), 33))
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a1", 1L: JLong, "1b3"), change = true), 3))
+ CRow(1L: JLong, "1a1", 1L: JLong, "1b3"), 3))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a3", 1L: JLong, "1b3"), change = true), 3))
+ CRow(1L: JLong, "1a3", 1L: JLong, "1b3"), 3))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "2a2", 2L: JLong, "2b4"), change = true), 4))
+ CRow(2L: JLong, "2a2", 2L: JLong, "2b4"), 4))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a3", 1L: JLong, "1b13"), change = true), 13))
+ CRow(1L: JLong, "1a3", 1L: JLong, "1b13"), 13))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a33", 1L: JLong, "1b13"), change = true), 33))
+ CRow(1L: JLong, "1a33", 1L: JLong, "1b13"), 33))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "2a33", 2L: JLong, "2b33"), change = true), 33))
+ CRow(2L: JLong, "2a33", 2L: JLong, "2b33"), 33))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -257,19 +257,19 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a1"), change = true), 1))
+ CRow(1L: JLong, "1a1"), 1))
testHarness.setProcessingTime(2)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2L: JLong, "2a2"), change = true), 2))
+ CRow(2L: JLong, "2a2"), 2))
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a3"), change = true), 3))
+ CRow(1L: JLong, "1a3"), 3))
assertEquals(4, testHarness.numKeyedStateEntries())
assertEquals(2, testHarness.numProcessingTimeTimers())
// All the right rows will not be cached.
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1L: JLong, "1b3"), change = true), 3))
+ CRow(1L: JLong, "1b3"), 3))
assertEquals(4, testHarness.numKeyedStateEntries())
assertEquals(2, testHarness.numProcessingTimeTimers())
@@ -278,14 +278,14 @@ class JoinHarnessTest extends HarnessTestBase {
// Meets a.proctime <= b.proctime - 5.
// This row will only be joined without being cached (7 >= 7 - 5).
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2L: JLong, "2b7"), change = true), 7))
+ CRow(2L: JLong, "2b7"), 7))
assertEquals(4, testHarness.numKeyedStateEntries())
assertEquals(2, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(12)
// The left row (key = 1) with timestamp = 1 will be eagerly removed here.
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1L: JLong, "1b12"), change = true), 12))
+ CRow(1L: JLong, "1b12"), 12))
// We add a delay (relativeWindowSize / 2) for cleaning up state.
// No timers will be triggered here.
@@ -309,9 +309,9 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "2a2", 2L: JLong, "2b7"), change = true), 7))
+ CRow(2L: JLong, "2a2", 2L: JLong, "2b7"), 7))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "1a3", 1L: JLong, "1b12"), change = true), 12))
+ CRow(1L: JLong, "1a3", 1L: JLong, "1b12"), 12))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -344,24 +344,24 @@ class JoinHarnessTest extends HarnessTestBase {
// Test late data.
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(1L: JLong, "k1"), change = true), 0))
+ CRow(1L: JLong, "k1"), 0))
// Though (1L, "k1") is actually late, it will also be cached.
assertEquals(1, testHarness.numEventTimeTimers())
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+ CRow(2L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+ CRow(2L: JLong, "k1"), 0))
assertEquals(2, testHarness.numEventTimeTimers())
assertEquals(4, testHarness.numKeyedStateEntries())
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(5L: JLong, "k1"), change = true), 0))
+ CRow(5L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(15L: JLong, "k1"), change = true), 0))
+ CRow(15L: JLong, "k1"), 0))
testHarness.processWatermark1(new Watermark(20))
testHarness.processWatermark2(new Watermark(20))
@@ -369,7 +369,7 @@ class JoinHarnessTest extends HarnessTestBase {
assertEquals(4, testHarness.numKeyedStateEntries())
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(35L: JLong, "k1"), change = true), 0))
+ CRow(35L: JLong, "k1"), 0))
// The right rows with timestamp = 2 and 5 will be removed here.
// The left rows with timestamp = 2 and 15 will be removed here.
@@ -377,9 +377,9 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.processWatermark2(new Watermark(38))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(40L: JLong, "k2"), change = true), 0))
+ CRow(40L: JLong, "k2"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(39L: JLong, "k2"), change = true), 0))
+ CRow(39L: JLong, "k2"), 0))
assertEquals(6, testHarness.numKeyedStateEntries())
@@ -393,19 +393,19 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new Watermark(-19))
// This result is produced by the late row (1, "k1").
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "k1", 2L: JLong, "k1"), change = true), 0))
+ CRow(1L: JLong, "k1", 2L: JLong, "k1"), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "k1", 2L: JLong, "k1"), change = true), 0))
+ CRow(2L: JLong, "k1", 2L: JLong, "k1"), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), change = true), 0))
+ CRow(5L: JLong, "k1", 2L: JLong, "k1"), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), change = true), 0))
+ CRow(5L: JLong, "k1", 15L: JLong, "k1"), 0))
expectedOutput.add(new Watermark(0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), change = true), 0))
+ CRow(35L: JLong, "k1", 15L: JLong, "k1"), 0))
expectedOutput.add(new Watermark(18))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), change = true), 0))
+ CRow(40L: JLong, "k2", 39L: JLong, "k2"), 0))
expectedOutput.add(new Watermark(41))
val result = testHarness.getOutput
@@ -443,7 +443,7 @@ class JoinHarnessTest extends HarnessTestBase {
// This row will not be cached.
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+ CRow(2L: JLong, "k1"), 0))
assertEquals(0, testHarness.numKeyedStateEntries())
@@ -451,19 +451,19 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.processWatermark2(new Watermark(2))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(3L: JLong, "k1"), change = true), 0))
+ CRow(3L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(3L: JLong, "k1"), change = true), 0))
+ CRow(3L: JLong, "k1"), 0))
// Test for -10 boundary (13 - 10 = 3).
// This row from the right stream will be cached.
// The clean time for the left stream is 13 - 7 + 1 - 1 = 8
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(13L: JLong, "k1"), change = true), 0))
+ CRow(13L: JLong, "k1"), 0))
// Test for -7 boundary (13 - 7 = 6).
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(6L: JLong, "k1"), change = true), 0))
+ CRow(6L: JLong, "k1"), 0))
assertEquals(4, testHarness.numKeyedStateEntries())
@@ -484,9 +484,9 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new Watermark(-9))
expectedOutput.add(new Watermark(-8))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(3L: JLong, "k1", 13L: JLong, "k1"), change = true), 0))
+ CRow(3L: JLong, "k1", 13L: JLong, "k1"), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6L: JLong, "k1", 13L: JLong, "k1"), change = true), 0))
+ CRow(6L: JLong, "k1", 13L: JLong, "k1"), 0))
expectedOutput.add(new Watermark(0))
expectedOutput.add(new Watermark(8))
@@ -521,9 +521,9 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.open()
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(1L: JLong, "k1"), change = true), 0))
+ CRow(1L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(1L: JLong, "k2"), change = true), 0))
+ CRow(1L: JLong, "k2"), 0))
assertEquals(2, testHarness.numEventTimeTimers())
assertEquals(4, testHarness.numKeyedStateEntries())
@@ -543,9 +543,9 @@ class JoinHarnessTest extends HarnessTestBase {
assertEquals(0, testHarness.numKeyedStateEntries())
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+ CRow(2L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k2"), change = true), 0))
+ CRow(2L: JLong, "k2"), 0))
// The late rows with timestamp = 2 will not be cached, but a null padding result for the left
// row will be emitted.
@@ -554,26 +554,26 @@ class JoinHarnessTest extends HarnessTestBase {
// Make sure the common (inner) join can be performed.
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(19L: JLong, "k1"), change = true), 0))
+ CRow(19L: JLong, "k1"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(20L: JLong, "k1"), change = true), 0))
+ CRow(20L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(26L: JLong, "k1"), change = true), 0))
+ CRow(26L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(25L: JLong, "k1"), change = true), 0))
+ CRow(25L: JLong, "k1"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(21L: JLong, "k1"), change = true), 0))
+ CRow(21L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(39L: JLong, "k2"), change = true), 0))
+ CRow(39L: JLong, "k2"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(40L: JLong, "k2"), change = true), 0))
+ CRow(40L: JLong, "k2"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(50L: JLong, "k2"), change = true), 0))
+ CRow(50L: JLong, "k2"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(49L: JLong, "k2"), change = true), 0))
+ CRow(49L: JLong, "k2"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(41L: JLong, "k2"), change = true), 0))
+ CRow(41L: JLong, "k2"), 0))
testHarness.processWatermark1(new Watermark(100))
testHarness.processWatermark2(new Watermark(100))
@@ -581,26 +581,26 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
// The timestamp 14 is set with the triggered timer.
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "k1", null: JLong, null: String), change = true), 14))
+ CRow(1L: JLong, "k1", null: JLong, null: String), 14))
expectedOutput.add(new Watermark(5))
expectedOutput.add(new Watermark(9))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "k1", null: JLong, null: String), change = true), 0))
+ CRow(2L: JLong, "k1", null: JLong, null: String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+ CRow(20L: JLong, "k1", 25L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(21L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+ CRow(21L: JLong, "k1", 25L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(21L: JLong, "k1", 26L: JLong, "k1": String), change = true), 0))
+ CRow(21L: JLong, "k1", 26L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(49L: JLong, "k2", 40L: JLong, "k2": String), change = true), 0))
+ CRow(49L: JLong, "k2", 40L: JLong, "k2": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(49L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+ CRow(49L: JLong, "k2", 41L: JLong, "k2": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(50L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+ CRow(50L: JLong, "k2", 41L: JLong, "k2": String), 0))
// The timestamp 32 is set with the triggered timer.
expectedOutput.add(new StreamRecord(
- CRow(Row.of(19L: JLong, "k1", null: JLong, null: String), change = true), 32))
+ CRow(19L: JLong, "k1", null: JLong, null: String), 32))
expectedOutput.add(new Watermark(91))
@@ -635,9 +635,9 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.open()
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(1L: JLong, "k1"), change = true), 0))
+ CRow(1L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(1L: JLong, "k2"), change = true), 0))
+ CRow(1L: JLong, "k2"), 0))
assertEquals(2, testHarness.numEventTimeTimers())
assertEquals(4, testHarness.numKeyedStateEntries())
@@ -657,9 +657,9 @@ class JoinHarnessTest extends HarnessTestBase {
assertEquals(0, testHarness.numKeyedStateEntries())
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+ CRow(2L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k2"), change = true), 0))
+ CRow(2L: JLong, "k2"), 0))
// The late rows with timestamp = 2 will not be cached, but a null padding result for the right
// row will be emitted.
@@ -668,26 +668,26 @@ class JoinHarnessTest extends HarnessTestBase {
// Make sure the common (inner) join can be performed.
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(19L: JLong, "k1"), change = true), 0))
+ CRow(19L: JLong, "k1"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(20L: JLong, "k1"), change = true), 0))
+ CRow(20L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(26L: JLong, "k1"), change = true), 0))
+ CRow(26L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(25L: JLong, "k1"), change = true), 0))
+ CRow(25L: JLong, "k1"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(21L: JLong, "k1"), change = true), 0))
+ CRow(21L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(39L: JLong, "k2"), change = true), 0))
+ CRow(39L: JLong, "k2"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(40L: JLong, "k2"), change = true), 0))
+ CRow(40L: JLong, "k2"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(50L: JLong, "k2"), change = true), 0))
+ CRow(50L: JLong, "k2"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(49L: JLong, "k2"), change = true), 0))
+ CRow(49L: JLong, "k2"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(41L: JLong, "k2"), change = true), 0))
+ CRow(41L: JLong, "k2"), 0))
testHarness.processWatermark1(new Watermark(100))
testHarness.processWatermark2(new Watermark(100))
@@ -696,25 +696,25 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new Watermark(5))
// The timestamp 18 is set with the triggered timer.
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JLong, null: String, 1L: JLong, "k2": String), change = true), 18))
+ CRow(null: JLong, null: String, 1L: JLong, "k2": String), 18))
expectedOutput.add(new Watermark(9))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JLong, null: String, 2L: JLong, "k2": String), change = true), 0))
+ CRow(null: JLong, null: String, 2L: JLong, "k2": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+ CRow(20L: JLong, "k1", 25L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(21L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+ CRow(21L: JLong, "k1", 25L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(21L: JLong, "k1", 26L: JLong, "k1": String), change = true), 0))
+ CRow(21L: JLong, "k1", 26L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(49L: JLong, "k2", 40L: JLong, "k2": String), change = true), 0))
+ CRow(49L: JLong, "k2", 40L: JLong, "k2": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(49L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+ CRow(49L: JLong, "k2", 41L: JLong, "k2": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(50L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+ CRow(50L: JLong, "k2", 41L: JLong, "k2": String), 0))
// The timestamp 56 is set with the triggered timer.
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JLong, null: String, 39L: JLong, "k2": String), change = true), 56))
+ CRow(null: JLong, null: String, 39L: JLong, "k2": String), 56))
expectedOutput.add(new Watermark(91))
val result = testHarness.getOutput
@@ -748,9 +748,9 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.open()
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(1L: JLong, "k1"), change = true), 0))
+ CRow(1L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(1L: JLong, "k2"), change = true), 0))
+ CRow(1L: JLong, "k2"), 0))
assertEquals(2, testHarness.numEventTimeTimers())
assertEquals(4, testHarness.numKeyedStateEntries())
@@ -770,9 +770,9 @@ class JoinHarnessTest extends HarnessTestBase {
assertEquals(0, testHarness.numKeyedStateEntries())
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k1"), change = true), 0))
+ CRow(2L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(2L: JLong, "k2"), change = true), 0))
+ CRow(2L: JLong, "k2"), 0))
// The late rows with timestamp = 2 will not be cached, but a null padding result for the right
// row will be emitted.
@@ -781,26 +781,26 @@ class JoinHarnessTest extends HarnessTestBase {
// Make sure the common (inner) join can be performed.
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(19L: JLong, "k1"), change = true), 0))
+ CRow(19L: JLong, "k1"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(20L: JLong, "k1"), change = true), 0))
+ CRow(20L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(26L: JLong, "k1"), change = true), 0))
+ CRow(26L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(25L: JLong, "k1"), change = true), 0))
+ CRow(25L: JLong, "k1"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(21L: JLong, "k1"), change = true), 0))
+ CRow(21L: JLong, "k1"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(39L: JLong, "k2"), change = true), 0))
+ CRow(39L: JLong, "k2"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(40L: JLong, "k2"), change = true), 0))
+ CRow(40L: JLong, "k2"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(50L: JLong, "k2"), change = true), 0))
+ CRow(50L: JLong, "k2"), 0))
testHarness.processElement1(new StreamRecord[CRow](
- CRow(Row.of(49L: JLong, "k2"), change = true), 0))
+ CRow(49L: JLong, "k2"), 0))
testHarness.processElement2(new StreamRecord[CRow](
- CRow(Row.of(41L: JLong, "k2"), change = true), 0))
+ CRow(41L: JLong, "k2"), 0))
testHarness.processWatermark1(new Watermark(100))
testHarness.processWatermark2(new Watermark(100))
@@ -808,34 +808,34 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
// The timestamp 14 is set with the triggered timer.
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "k1", null: JLong, null: String), change = true), 14))
+ CRow(1L: JLong, "k1", null: JLong, null: String), 14))
expectedOutput.add(new Watermark(5))
// The timestamp 18 is set with the triggered timer.
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JLong, null: String, 1L: JLong, "k2": String), change = true), 18))
+ CRow(null: JLong, null: String, 1L: JLong, "k2": String), 18))
expectedOutput.add(new Watermark(9))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "k1", null: JLong, null: String), change = true), 0))
+ CRow(2L: JLong, "k1", null: JLong, null: String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JLong, null: String, 2L: JLong, "k2": String), change = true), 0))
+ CRow(null: JLong, null: String, 2L: JLong, "k2": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+ CRow(20L: JLong, "k1", 25L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(21L: JLong, "k1", 25L: JLong, "k1": String), change = true), 0))
+ CRow(21L: JLong, "k1", 25L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(21L: JLong, "k1", 26L: JLong, "k1": String), change = true), 0))
+ CRow(21L: JLong, "k1", 26L: JLong, "k1": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(49L: JLong, "k2", 40L: JLong, "k2": String), change = true), 0))
+ CRow(49L: JLong, "k2", 40L: JLong, "k2": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(49L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+ CRow(49L: JLong, "k2", 41L: JLong, "k2": String), 0))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(50L: JLong, "k2", 41L: JLong, "k2": String), change = true), 0))
+ CRow(50L: JLong, "k2", 41L: JLong, "k2": String), 0))
// The timestamp 32 is set with the triggered timer.
expectedOutput.add(new StreamRecord(
- CRow(Row.of(19L: JLong, "k1", null: JLong, null: String), change = true), 32))
+ CRow(19L: JLong, "k1", null: JLong, null: String), 32))
// The timestamp 56 is set with the triggered timer.
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JLong, null: String, 39L: JLong, "k2": String), change = true), 56))
+ CRow(null: JLong, null: String, 39L: JLong, "k2": String), 56))
expectedOutput.add(new Watermark(91))
val result = testHarness.getOutput
@@ -880,37 +880,37 @@ class JoinHarnessTest extends HarnessTestBase {
// left stream input
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
assertEquals(1, testHarness.numProcessingTimeTimers())
assertEquals(2, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(2)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
assertEquals(2, testHarness.numProcessingTimeTimers())
assertEquals(4, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
assertEquals(4, testHarness.numKeyedStateEntries())
assertEquals(2, testHarness.numProcessingTimeTimers())
// right stream input and output normally
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "Hi1")))
assertEquals(6, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(4)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "Hello1"), change = true)))
+ CRow(2: JInt, "Hello1")))
assertEquals(8, testHarness.numKeyedStateEntries())
assertEquals(4, testHarness.numProcessingTimeTimers())
// expired left stream record with key value of 1
testHarness.setProcessingTime(5)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = true)))
+ CRow(1: JInt, "Hi2")))
assertEquals(6, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
@@ -936,13 +936,13 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), change = true)))
+ CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -974,43 +974,43 @@ class JoinHarnessTest extends HarnessTestBase {
// left stream input
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
assertEquals(1, testHarness.numProcessingTimeTimers())
assertEquals(2, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(2)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
assertEquals(2, testHarness.numProcessingTimeTimers())
assertEquals(4, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
assertEquals(4, testHarness.numKeyedStateEntries())
assertEquals(2, testHarness.numProcessingTimeTimers())
// right stream input and output normally
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "Hi1")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "Hi1")))
assertEquals(5, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(4)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "Hello1"), change = true)))
+ CRow(2: JInt, "Hello1")))
assertEquals(7, testHarness.numKeyedStateEntries())
assertEquals(4, testHarness.numProcessingTimeTimers())
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
// expired left stream record with key value of 1
testHarness.setProcessingTime(5)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = true)))
+ CRow(1: JInt, "Hi2")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = false)))
+ CRow(false, 1: JInt, "Hi2")))
assertEquals(5, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
@@ -1029,11 +1029,11 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), change = true)))
+ CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -1066,43 +1066,43 @@ class JoinHarnessTest extends HarnessTestBase {
// left stream input
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
assertEquals(1, testHarness.numProcessingTimeTimers())
assertEquals(2, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(2)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
assertEquals(2, testHarness.numProcessingTimeTimers())
assertEquals(4, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
assertEquals(4, testHarness.numKeyedStateEntries())
assertEquals(2, testHarness.numProcessingTimeTimers())
// right stream input and output normally
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "Hi1")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "Hi1")))
assertEquals(5, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(4)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "Hello1"), change = true)))
+ CRow(2: JInt, "Hello1")))
assertEquals(7, testHarness.numKeyedStateEntries())
assertEquals(4, testHarness.numProcessingTimeTimers())
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
// expired left stream record with key value of 1
testHarness.setProcessingTime(5)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = true)))
+ CRow(1: JInt, "Hi2")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = false)))
+ CRow(false, 1: JInt, "Hi2")))
assertEquals(5, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
@@ -1121,27 +1121,27 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+ CRow(1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+ CRow(1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(2: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+ CRow(1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = false)))
+ CRow(false, 2: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), change = true)))
+ CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "aaa", null: JInt, null)))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -1174,19 +1174,19 @@ class JoinHarnessTest extends HarnessTestBase {
// left stream input
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb"), change = true)))
+ CRow(1: JInt, "bbb")))
assertEquals(1, testHarness.numProcessingTimeTimers())
// 1 left timer(5), 1 left key(1), 1 join cnt
assertEquals(3, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(2)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
assertEquals(2, testHarness.numProcessingTimeTimers())
// 2 left timer(5,6), 2 left key(1,2), 2 join cnt
assertEquals(6, testHarness.numKeyedStateEntries())
@@ -1194,35 +1194,35 @@ class JoinHarnessTest extends HarnessTestBase {
// right stream input and output normally
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "Hi1")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb"), change = false)))
+ CRow(false, 1: JInt, "bbb")))
// 2 left timer(5,6), 2 left keys(1,2), 2 join cnt, 1 right timer(7), 1 right key(1)
assertEquals(8, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(4)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "ccc"), change = true)))
+ CRow(2: JInt, "ccc")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "Hello"), change = true)))
+ CRow(2: JInt, "Hello")))
// 2 left timer(5,6), 2 left keys(1,2), 2 join cnt, 2 right timer(7,8), 2 right key(1,2)
assertEquals(10, testHarness.numKeyedStateEntries())
assertEquals(4, testHarness.numProcessingTimeTimers())
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = true)))
+ CRow(1: JInt, "Hi2")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = false)))
+ CRow(false, 1: JInt, "Hi2")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "Hi1")))
// expired left stream record with key value of 1
testHarness.setProcessingTime(5)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi3"), change = true)))
+ CRow(1: JInt, "Hi3")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi3"), change = false)))
+ CRow(false, 1: JInt, "Hi3")))
// 1 left timer(6), 1 left keys(2), 1 join cnt, 2 right timer(7,8), 1 right key(2)
assertEquals(6, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
@@ -1242,37 +1242,37 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+ CRow(1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = true)))
+ CRow(1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(2: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "aaa", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "bbb", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb", null: JInt, null), change = false)))
+ CRow(false, 2: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello"), change = true)))
+ CRow(2: JInt, "bbb", 2: JInt, "Hello")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "aaa", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi2"), change = true)))
+ CRow(1: JInt, "bbb", 1: JInt, "Hi2")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi2"), change = false)))
+ CRow(false, 1: JInt, "bbb", 1: JInt, "Hi2")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "bbb", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(1: JInt, "bbb", null: JInt, null)))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
@@ -1304,43 +1304,43 @@ class JoinHarnessTest extends HarnessTestBase {
// right stream input
testHarness.setProcessingTime(1)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
assertEquals(1, testHarness.numProcessingTimeTimers())
assertEquals(2, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(2)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
assertEquals(2, testHarness.numProcessingTimeTimers())
assertEquals(4, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(3)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
assertEquals(4, testHarness.numKeyedStateEntries())
assertEquals(2, testHarness.numProcessingTimeTimers())
// left stream input and output normally
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "Hi1")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "Hi1")))
assertEquals(5, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(4)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "Hello1"), change = true)))
+ CRow(2: JInt, "Hello1")))
assertEquals(7, testHarness.numKeyedStateEntries())
assertEquals(4, testHarness.numProcessingTimeTimers())
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
// expired right stream record with key value of 1
testHarness.setProcessingTime(5)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = true)))
+ CRow(1: JInt, "Hi2")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = false)))
+ CRow(false, 1: JInt, "Hi2")))
assertEquals(5, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
@@ -1359,27 +1359,27 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+ CRow(null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+ CRow(null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+ CRow(false, null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+ CRow(false, null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "Hi1", 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "Hi1", 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+ CRow(null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+ CRow(false, null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "Hello1", 2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "Hello1", 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+ CRow(false, null: JInt, null, 1: JInt, "aaa")))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -1412,19 +1412,19 @@ class JoinHarnessTest extends HarnessTestBase {
// right stream input
testHarness.setProcessingTime(1)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb"), change = true)))
+ CRow(1: JInt, "bbb")))
assertEquals(1, testHarness.numProcessingTimeTimers())
// 1 right timer(5), 1 right key(1), 1 join cnt
assertEquals(3, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(2)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
assertEquals(2, testHarness.numProcessingTimeTimers())
// 2 right timer(5,6), 2 right key(1,2), 2 join cnt
assertEquals(6, testHarness.numKeyedStateEntries())
@@ -1432,35 +1432,35 @@ class JoinHarnessTest extends HarnessTestBase {
// left stream input and output normally
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = true)))
+ CRow(1: JInt, "Hi1")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb"), change = false)))
+ CRow(false, 1: JInt, "bbb")))
// 2 right timer(5,6), 2 right keys(1,2), 2 join cnt, 1 left timer(7), 1 left key(1)
assertEquals(8, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
testHarness.setProcessingTime(4)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "ccc"), change = true)))
+ CRow(2: JInt, "ccc")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "Hello"), change = true)))
+ CRow(2: JInt, "Hello")))
// 2 right timer(5,6), 2 right keys(1,2), 2 join cnt, 2 left timer(7,8), 2 left key(1,2)
assertEquals(10, testHarness.numKeyedStateEntries())
assertEquals(4, testHarness.numProcessingTimeTimers())
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = true)))
+ CRow(1: JInt, "Hi2")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2"), change = false)))
+ CRow(false, 1: JInt, "Hi2")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1"), change = false)))
+ CRow(false, 1: JInt, "Hi1")))
// expired right stream record with key value of 1
testHarness.setProcessingTime(5)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi3"), change = true)))
+ CRow(1: JInt, "Hi3")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi3"), change = false)))
+ CRow(false, 1: JInt, "Hi3")))
// 1 right timer(6), 1 right keys(2), 1 join cnt, 2 left timer(7,8), 1 left key(2)
assertEquals(6, testHarness.numKeyedStateEntries())
assertEquals(3, testHarness.numProcessingTimeTimers())
@@ -1480,37 +1480,37 @@ class JoinHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+ CRow(null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+ CRow(false, null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 1: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+ CRow(null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = false)))
+ CRow(false, null: JInt, null, 1: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = false)))
+ CRow(false, null: JInt, null, 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "Hi1", 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1", 1: JInt, "bbb"), change = true)))
+ CRow(1: JInt, "Hi1", 1: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+ CRow(false, null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "Hello", 2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "Hello", 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1", 1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "Hi1", 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2", 1: JInt, "bbb"), change = true)))
+ CRow(1: JInt, "Hi2", 1: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi2", 1: JInt, "bbb"), change = false)))
+ CRow(false, 1: JInt, "Hi2", 1: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "Hi1", 1: JInt, "bbb"), change = false)))
+ CRow(false, 1: JInt, "Hi1", 1: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 1: JInt, "bbb")))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
@@ -1541,18 +1541,18 @@ class JoinHarnessTest extends HarnessTestBase {
// left stream input
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb"), change = true)))
+ CRow(1: JInt, "bbb")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc"), change = true)))
+ CRow(1: JInt, "ccc")))
assertEquals(1, testHarness.numProcessingTimeTimers())
// 1 left timer(5), 1 left key(1)
assertEquals(2, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(2)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "ccc"), change = true)))
+ CRow(2: JInt, "ccc")))
assertEquals(2, testHarness.numProcessingTimeTimers())
// 1 left timer(5), 1 left key(1)
// 1 right timer(6), 1 right key(1)
@@ -1560,17 +1560,17 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa"), change = true)))
+ CRow(2: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "ddd"), change = true)))
+ CRow(2: JInt, "ddd")))
assertEquals(3, testHarness.numProcessingTimeTimers())
// 2 left timer(5,7), 2 left key(1,2)
// 1 right timer(6), 1 right key(1)
assertEquals(6, testHarness.numKeyedStateEntries())
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "ddd"), change = true)))
+ CRow(1: JInt, "ddd")))
assertEquals(4, testHarness.numProcessingTimeTimers())
// 2 left timer(5,7), 2 left key(1,2)
// 2 right timer(6,7), 2 right key(1,2)
@@ -1578,13 +1578,13 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.setProcessingTime(4)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa"), change = false)))
+ CRow(false, 2: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "ddd"), change = false)))
+ CRow(false, 2: JInt, "ddd")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "ddd"), change = false)))
+ CRow(false, 1: JInt, "ddd")))
assertEquals(4, testHarness.numProcessingTimeTimers())
// 2 left timer(5,7), 1 left key(1)
// 2 right timer(6,7), 1 right key(2)
@@ -1608,78 +1608,78 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.setProcessingTime(8)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb"), change = true)))
+ CRow(1: JInt, "bbb")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
// processing time 1
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+ CRow(1: JInt, "ccc", null: JInt, null)))
// processing time 2
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "ccc")))
// processing time 3
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+ CRow(false, null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = false)))
+ CRow(false, null: JInt, null, 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "aaa", 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = true)))
+ CRow(2: JInt, "aaa", 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "ddd", 2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "ddd", 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "ddd", 2: JInt, "ccc"), change = true)))
+ CRow(2: JInt, "ddd", 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "ccc", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "bbb", 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", 1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "ccc", 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = true)))
+ CRow(1: JInt, "bbb", 1: JInt, "ddd")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = true)))
+ CRow(1: JInt, "ccc", 1: JInt, "ddd")))
// processing time 4
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = false)))
+ CRow(false, 2: JInt, "aaa", 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = false)))
+ CRow(false, 2: JInt, "aaa", 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "ddd", 2: JInt, "bbb"), change = false)))
+ CRow(false, 2: JInt, "ddd", 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "ddd", 2: JInt, "ccc"), change = false)))
+ CRow(false, 2: JInt, "ddd", 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "bbb", 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", 1: JInt, "aaa"), change = false)))
+ CRow(false, 1: JInt, "ccc", 1: JInt, "aaa")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = false)))
+ CRow(false, 1: JInt, "bbb", 1: JInt, "ddd")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = false)))
+ CRow(false, 1: JInt, "ccc", 1: JInt, "ddd")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+ CRow(1: JInt, "ccc", null: JInt, null)))
// processing time 8
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "bbb")))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
@@ -1710,18 +1710,18 @@ class JoinHarnessTest extends HarnessTestBase {
// left stream input
testHarness.setProcessingTime(1)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb"), change = true)))
+ CRow(1: JInt, "bbb")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc"), change = true)))
+ CRow(1: JInt, "ccc")))
assertEquals(1, testHarness.numProcessingTimeTimers())
// 1 left timer(5), 1 left key(1), 1 left joincnt key(1)
assertEquals(3, testHarness.numKeyedStateEntries())
testHarness.setProcessingTime(2)
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "ccc"), change = true)))
+ CRow(2: JInt, "ccc")))
assertEquals(2, testHarness.numProcessingTimeTimers())
// 1 left timer(5), 1 left key(1), 1 left joincnt key(1)
// 1 right timer(6), 1 right key(1), 1 right joincnt key(1)
@@ -1729,17 +1729,17 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.setProcessingTime(3)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa"), change = true)))
+ CRow(2: JInt, "aaa")))
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "ddd"), change = true)))
+ CRow(2: JInt, "ddd")))
assertEquals(3, testHarness.numProcessingTimeTimers())
// 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
// 1 right timer(6), 1 right key(1), 1 right joincnt key(1)
assertEquals(9, testHarness.numKeyedStateEntries())
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "aaa"), change = true)))
+ CRow(1: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "ddd"), change = true)))
+ CRow(1: JInt, "ddd")))
assertEquals(4, testHarness.numProcessingTimeTimers())
// 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
// 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2)
@@ -1747,9 +1747,9 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.setProcessingTime(4)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa"), change = false)))
+ CRow(false, 2: JInt, "aaa")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(1: JInt, "ddd"), change = false)))
+ CRow(false, 1: JInt, "ddd")))
assertEquals(4, testHarness.numProcessingTimeTimers())
// 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2)
// 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2)
@@ -1773,68 +1773,68 @@ class JoinHarnessTest extends HarnessTestBase {
testHarness.setProcessingTime(8)
testHarness.processElement1(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb"), change = true)))
+ CRow(1: JInt, "bbb")))
testHarness.processElement2(new StreamRecord(
- CRow(Row.of(2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "bbb")))
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
// processing time 1
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+ CRow(1: JInt, "ccc", null: JInt, null)))
// processing time 2
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "ccc")))
// processing time 3
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = false)))
+ CRow(false, null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = false)))
+ CRow(false, null: JInt, null, 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = true)))
+ CRow(2: JInt, "aaa", 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = true)))
+ CRow(2: JInt, "aaa", 2: JInt, "ccc")))
// can not find matched row due to NonEquiJoinPred
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "ddd", null: JInt, null), change = true)))
+ CRow(2: JInt, "ddd", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = false)))
+ CRow(false, 1: JInt, "ccc", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = true)))
+ CRow(1: JInt, "bbb", 1: JInt, "ddd")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = true)))
+ CRow(1: JInt, "ccc", 1: JInt, "ddd")))
// can not find matched row due to NonEquiJoinPred
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 1: JInt, "aaa"), change = true)))
+ CRow(null: JInt, null, 1: JInt, "aaa")))
// processing time 4
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa", 2: JInt, "bbb"), change = false)))
+ CRow(false, 2: JInt, "aaa", 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2: JInt, "aaa", 2: JInt, "ccc"), change = false)))
+ CRow(false, 2: JInt, "aaa", 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "ccc"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "ccc")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", 1: JInt, "ddd"), change = false)))
+ CRow(false, 1: JInt, "bbb", 1: JInt, "ddd")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", 1: JInt, "ddd"), change = false)))
+ CRow(false, 1: JInt, "ccc", 1: JInt, "ddd")))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "ccc", null: JInt, null), change = true)))
+ CRow(1: JInt, "ccc", null: JInt, null)))
// processing time 8
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1: JInt, "bbb", null: JInt, null), change = true)))
+ CRow(1: JInt, "bbb", null: JInt, null)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(null: JInt, null, 2: JInt, "bbb"), change = true)))
+ CRow(null: JInt, null, 2: JInt, "bbb")))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index 2f4bbfa..d6daa9e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -57,39 +57,39 @@ class NonWindowHarnessTest extends HarnessTestBase {
// register cleanup timer with 3001
testHarness.setProcessingTime(1)
- testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1))
- testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(1L: JLong, 1: JInt, "aaa"), 1))
+ testHarness.processElement(new StreamRecord(CRow(2L: JLong, 1: JInt, "bbb"), 1))
// reuse timer 3001
testHarness.setProcessingTime(1000)
- testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 1))
- testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(3L: JLong, 2: JInt, "aaa"), 1))
+ testHarness.processElement(new StreamRecord(CRow(4L: JLong, 3: JInt, "aaa"), 1))
// register cleanup timer with 4002
testHarness.setProcessingTime(1002)
- testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 1))
- testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(5L: JLong, 4: JInt, "aaa"), 1))
+ testHarness.processElement(new StreamRecord(CRow(6L: JLong, 2: JInt, "bbb"), 1))
// trigger cleanup timer and register cleanup timer with 7003
testHarness.setProcessingTime(4003)
- testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 1))
- testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "aaa"), true), 1))
- testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 1))
- testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(7L: JLong, 5: JInt, "aaa"), 1))
+ testHarness.processElement(new StreamRecord(CRow(8L: JLong, 6: JInt, "aaa"), 1))
+ testHarness.processElement(new StreamRecord(CRow(9L: JLong, 7: JInt, "aaa"), 1))
+ testHarness.processElement(new StreamRecord(CRow(10L: JLong, 3: JInt, "bbb"), 1))
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
- expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 6: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 10: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 3: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 11: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(1L: JLong, 1: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(2L: JLong, 1: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(3L: JLong, 3: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(4L: JLong, 6: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(5L: JLong, 10: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(6L: JLong, 3: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(7L: JLong, 5: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(8L: JLong, 11: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(9L: JLong, 18: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(10L: JLong, 3: JInt), 1))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -117,38 +117,38 @@ class NonWindowHarnessTest extends HarnessTestBase {
// register cleanup timer with 3001
testHarness.setProcessingTime(1)
- testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1))
- testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 2))
- testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 3))
- testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "ccc"), true), 4))
+ testHarness.processElement(new StreamRecord(CRow(1L: JLong, 1: JInt, "aaa"), 1))
+ testHarness.processElement(new StreamRecord(CRow(2L: JLong, 1: JInt, "bbb"), 2))
+ testHarness.processElement(new StreamRecord(CRow(3L: JLong, 2: JInt, "aaa"), 3))
+ testHarness.processElement(new StreamRecord(CRow(4L: JLong, 3: JInt, "ccc"), 4))
// trigger cleanup timer and register cleanup timer with 6002
testHarness.setProcessingTime(3002)
- testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 5))
- testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 6))
- testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 7))
- testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "eee"), true), 8))
- testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 9))
- testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 10))
+ testHarness.processElement(new StreamRecord(CRow(5L: JLong, 4: JInt, "aaa"), 5))
+ testHarness.processElement(new StreamRecord(CRow(6L: JLong, 2: JInt, "bbb"), 6))
+ testHarness.processElement(new StreamRecord(CRow(7L: JLong, 5: JInt, "aaa"), 7))
+ testHarness.processElement(new StreamRecord(CRow(8L: JLong, 6: JInt, "eee"), 8))
+ testHarness.processElement(new StreamRecord(CRow(9L: JLong, 7: JInt, "aaa"), 9))
+ testHarness.processElement(new StreamRecord(CRow(10L: JLong, 3: JInt, "bbb"), 10))
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
- expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 2))
- expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 1: JInt), false), 3))
- expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 3))
- expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt), true), 4))
- expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt), true), 5))
- expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt), true), 6))
- expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 4: JInt), false), 7))
- expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 9: JInt), true), 7))
- expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt), true), 8))
- expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 9: JInt), false), 9))
- expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 16: JInt), true), 9))
- expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 2: JInt), false), 10))
- expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 5: JInt), true), 10))
+ expectedOutput.add(new StreamRecord(CRow(1L: JLong, 1: JInt), 1))
+ expectedOutput.add(new StreamRecord(CRow(2L: JLong, 1: JInt), 2))
+ expectedOutput.add(new StreamRecord(CRow(false, 3L: JLong, 1: JInt), 3))
+ expectedOutput.add(new StreamRecord(CRow(3L: JLong, 3: JInt), 3))
+ expectedOutput.add(new StreamRecord(CRow(4L: JLong, 3: JInt), 4))
+ expectedOutput.add(new StreamRecord(CRow(5L: JLong, 4: JInt), 5))
+ expectedOutput.add(new StreamRecord(CRow(6L: JLong, 2: JInt), 6))
+ expectedOutput.add(new StreamRecord(CRow(false, 7L: JLong, 4: JInt), 7))
+ expectedOutput.add(new StreamRecord(CRow(7L: JLong, 9: JInt), 7))
+ expectedOutput.add(new StreamRecord(CRow(8L: JLong, 6: JInt), 8))
+ expectedOutput.add(new StreamRecord(CRow(false, 9L: JLong, 9: JInt), 9))
+ expectedOutput.add(new StreamRecord(CRow(9L: JLong, 16: JInt), 9))
+ expectedOutput.add(new StreamRecord(CRow(false, 10L: JLong, 2: JInt), 10))
+ expectedOutput.add(new StreamRecord(CRow(10L: JLong, 5: JInt), 10))
verify(expectedOutput, result, new RowResultSortComparator())
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 7ad64c6..63d7b5d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -60,75 +60,75 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(1)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 1L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 1L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "bbb", 10L: JLong), change = true)))
+ CRow(1L: JLong, "bbb", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 2L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 2L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 3L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 3L: JLong)))
// register cleanup timer with 4100
testHarness.setProcessingTime(1100)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "bbb", 20L: JLong), change = true)))
+ CRow(1L: JLong, "bbb", 20L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 4L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 4L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 5L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 5L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 6L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 6L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1L: JLong, "bbb", 30L: JLong), change = true)))
+ CRow(1L: JLong, "bbb", 30L: JLong)))
// register cleanup timer with 6001
testHarness.setProcessingTime(3001)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 7L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 7L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 8L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 8L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 9L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 9L: JLong)))
// trigger cleanup timer and register cleanup timer with 9002
testHarness.setProcessingTime(6002)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 10L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2L: JLong, "bbb", 40L: JLong), change = true)))
+ CRow(2L: JLong, "bbb", 40L: JLong)))
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(1L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 3L: JLong, 2L: JLong, 3L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+ CRow(1L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 4L: JLong, 3L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 5L: JLong, 4L: JLong, 5L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), change = true)))
+ CRow(1L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(1L: JLong, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), change = true)))
+ CRow(1L: JLong, "bbb", 30L: JLong, 20L: JLong, 30L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 7L: JLong, 6L: JLong, 7L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 8L: JLong, 7L: JLong, 8L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 9L: JLong, 8L: JLong, 9L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+ CRow(2L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -160,51 +160,51 @@ class OverWindowHarnessTest extends HarnessTestBase{
// register cleanup timer with 3003
testHarness.setProcessingTime(3)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 1L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 1L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 10L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 10L: JLong)))
testHarness.setProcessingTime(4)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 2L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 2L: JLong)))
// trigger cleanup timer and register cleanup timer with 6003
testHarness.setProcessingTime(3003)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 3L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 3L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 20L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 20L: JLong)))
testHarness.setProcessingTime(5)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 4L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 4L: JLong)))
// register cleanup timer with 9002
testHarness.setProcessingTime(6002)
testHarness.setProcessingTime(7002)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 5L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 5L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 6L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 6L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 30L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 30L: JLong)))
// register cleanup timer with 14002
testHarness.setProcessingTime(11002)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 7L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 7L: JLong)))
testHarness.setProcessingTime(11004)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 8L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 8L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 9L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 9L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 10L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 40L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 40L: JLong)))
testHarness.setProcessingTime(11006)
@@ -213,12 +213,12 @@ class OverWindowHarnessTest extends HarnessTestBase{
// timer registered for 23000
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "ccc", 10L: JLong), change = true)))
+ CRow(0L: JLong, "ccc", 10L: JLong)))
// update clean-up timer to 25500. Previous timer should not clean up
testHarness.setProcessingTime(22500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "ccc", 20L: JLong), change = true)))
+ CRow(0L: JLong, "ccc", 20L: JLong)))
// 23000 clean-up timer should fire but not fail with an NPE
testHarness.setProcessingTime(23001)
@@ -229,37 +229,37 @@ class OverWindowHarnessTest extends HarnessTestBase{
// all elements at the same proc timestamp have the same value per key
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 3L: JLong, 3L: JLong, 4L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 3L: JLong, 3L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 20L: JLong, 20L: JLong, 20L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 20L: JLong, 20L: JLong, 20L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 4L: JLong, 4L: JLong, 4L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 4L: JLong, 4L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 5L: JLong, 5L: JLong, 6L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 5L: JLong, 5L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 30L: JLong, 30L: JLong, 30L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 30L: JLong, 30L: JLong, 30L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 7L: JLong, 7L: JLong, 7L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 8L: JLong, 7L: JLong, 10L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 8L: JLong, 7L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 9L: JLong, 7L: JLong, 10L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 9L: JLong, 7L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 10L: JLong, 7L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "ccc", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(0L: JLong, "ccc", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "ccc", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+ CRow(0L: JLong, "ccc", 20L: JLong, 10L: JLong, 20L: JLong)))
verify(expectedOutput, result, new RowResultSortComparator())
@@ -287,69 +287,69 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(1003)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 1L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 1L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 10L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 2L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 2L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 3L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 3L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 20L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 20L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 4L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 4L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 5L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 5L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 6L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 6L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 30L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 30L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 7L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 7L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 8L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 8L: JLong)))
// trigger cleanup timer and register cleanup timer with 8003
testHarness.setProcessingTime(5003)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 9L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 9L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 10L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 40L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 40L: JLong)))
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 9L: JLong, 9L: JLong, 9L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 9L: JLong, 9L: JLong, 9L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), change = true)))
+ CRow(0L: JLong, "aaa", 10L: JLong, 9L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+ CRow(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
@@ -380,51 +380,51 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processWatermark(1)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 1L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 1L: JLong)))
testHarness.processWatermark(2)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(3L: JLong, "bbb", 10L: JLong), change = true)))
+ CRow(3L: JLong, "bbb", 10L: JLong)))
testHarness.processWatermark(4000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 2L: JLong)))
testHarness.processWatermark(4001)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4002L: JLong, "aaa", 3L: JLong), change = true)))
+ CRow(4002L: JLong, "aaa", 3L: JLong)))
testHarness.processWatermark(4002)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4003L: JLong, "aaa", 4L: JLong), change = true)))
+ CRow(4003L: JLong, "aaa", 4L: JLong)))
testHarness.processWatermark(4800)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4801L: JLong, "bbb", 25L: JLong), change = true)))
+ CRow(4801L: JLong, "bbb", 25L: JLong)))
testHarness.processWatermark(6500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 5L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 6L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
+ CRow(6501L: JLong, "bbb", 30L: JLong)))
testHarness.processWatermark(7000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
+ CRow(7001L: JLong, "aaa", 7L: JLong)))
testHarness.processWatermark(8000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
+ CRow(8001L: JLong, "aaa", 8L: JLong)))
testHarness.processWatermark(12000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 9L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
+ CRow(12001L: JLong, "bbb", 40L: JLong)))
testHarness.processWatermark(19000)
@@ -434,10 +434,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
// check that state is removed after max retention time
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 3000
+ CRow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
testHarness.setProcessingTime(2500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 4500
+ CRow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
testHarness.processWatermark(20010) // compute output
assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
@@ -449,7 +449,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
// check that state is only removed if all data was processed
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20011L: JLong, "ccc", 3L: JLong), change = true))) // clean-up 6500
+ CRow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
@@ -469,40 +469,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
// all elements at the same row-time have the same value per key
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(2L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(3L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(3L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4002L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+ CRow(4002L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4003L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), change = true)))
+ CRow(4003L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4801L: JLong, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), change = true)))
+ CRow(4801L: JLong, "bbb", 25L: JLong, 25L: JLong, 25L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 5L: JLong, 2L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 6L: JLong, 2L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), change = true)))
+ CRow(7001L: JLong, "aaa", 7L: JLong, 2L: JLong, 7L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), change = true)))
+ CRow(8001L: JLong, "aaa", 8L: JLong, 2L: JLong, 8L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), change = true)))
+ CRow(6501L: JLong, "bbb", 30L: JLong, 25L: JLong, 30L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 9L: JLong, 8L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+ CRow(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), change = true)))
+ CRow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
@@ -530,47 +530,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processWatermark(800)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
+ CRow(801L: JLong, "aaa", 1L: JLong)))
testHarness.processWatermark(2500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
+ CRow(2501L: JLong, "bbb", 10L: JLong)))
testHarness.processWatermark(4000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 2L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 3L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
+ CRow(4001L: JLong, "bbb", 20L: JLong)))
testHarness.processWatermark(4800)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
+ CRow(4801L: JLong, "aaa", 4L: JLong)))
testHarness.processWatermark(6500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 5L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 6L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
+ CRow(6501L: JLong, "bbb", 30L: JLong)))
testHarness.processWatermark(7000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
+ CRow(7001L: JLong, "aaa", 7L: JLong)))
testHarness.processWatermark(8000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
+ CRow(8001L: JLong, "aaa", 8L: JLong)))
testHarness.processWatermark(12000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 9L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
+ CRow(12001L: JLong, "bbb", 40L: JLong)))
testHarness.processWatermark(19000)
@@ -580,10 +580,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
// check that state is removed after max retention time
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 3000
+ CRow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
testHarness.setProcessingTime(2500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 4500
+ CRow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
testHarness.processWatermark(20010) // compute output
assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
@@ -594,7 +594,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
// check that state is only removed if all data was processed
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20011L: JLong, "ccc", 3L: JLong), change = true))) // clean-up 6500
+ CRow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
@@ -614,40 +614,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+ CRow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), change = true)))
+ CRow(4801L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 5L: JLong, 3L: JLong, 5L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 6L: JLong, 4L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
+ CRow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), change = true)))
+ CRow(7001L: JLong, "aaa", 7L: JLong, 5L: JLong, 7L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), change = true)))
+ CRow(8001L: JLong, "aaa", 8L: JLong, 6L: JLong, 8L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 9L: JLong, 7L: JLong, 9L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), change = true)))
+ CRow(12001L: JLong, "bbb", 40L: JLong, 20L: JLong, 40L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), change = true)))
+ CRow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
@@ -678,47 +678,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(1000)
testHarness.processWatermark(800)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
+ CRow(801L: JLong, "aaa", 1L: JLong)))
testHarness.processWatermark(2500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
+ CRow(2501L: JLong, "bbb", 10L: JLong)))
testHarness.processWatermark(4000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 2L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 3L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
+ CRow(4001L: JLong, "bbb", 20L: JLong)))
testHarness.processWatermark(4800)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
+ CRow(4801L: JLong, "aaa", 4L: JLong)))
testHarness.processWatermark(6500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 5L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 6L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
+ CRow(6501L: JLong, "bbb", 30L: JLong)))
testHarness.processWatermark(7000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
+ CRow(7001L: JLong, "aaa", 7L: JLong)))
testHarness.processWatermark(8000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
+ CRow(8001L: JLong, "aaa", 8L: JLong)))
testHarness.processWatermark(12000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 9L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
+ CRow(12001L: JLong, "bbb", 40L: JLong)))
testHarness.processWatermark(19000)
@@ -731,13 +731,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processWatermark(20000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20000L: JLong, "ccc", 1L: JLong), change = true))) // test for late data
+ CRow(20000L: JLong, "ccc", 1L: JLong))) // test for late data
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
+ CRow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 5000
testHarness.setProcessingTime(2500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 5000
+ CRow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 5000
assert(testHarness.numKeyedStateEntries() > 0)
testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
@@ -755,38 +755,38 @@ class OverWindowHarnessTest extends HarnessTestBase{
// all elements at the same row-time have the same value per key
expectedOutput.add(new StreamRecord(
- CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 3L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+ CRow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
+ CRow(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
+ CRow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
+ CRow(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
+ CRow(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), change = true)))
+ CRow(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()
@@ -814,47 +814,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(1000)
testHarness.processWatermark(800)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true)))
+ CRow(801L: JLong, "aaa", 1L: JLong)))
testHarness.processWatermark(2500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true)))
+ CRow(2501L: JLong, "bbb", 10L: JLong)))
testHarness.processWatermark(4000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 2L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 3L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true)))
+ CRow(4001L: JLong, "bbb", 20L: JLong)))
testHarness.processWatermark(4800)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true)))
+ CRow(4801L: JLong, "aaa", 4L: JLong)))
testHarness.processWatermark(6500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 5L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 6L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true)))
+ CRow(6501L: JLong, "bbb", 30L: JLong)))
testHarness.processWatermark(7000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true)))
+ CRow(7001L: JLong, "aaa", 7L: JLong)))
testHarness.processWatermark(8000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true)))
+ CRow(8001L: JLong, "aaa", 8L: JLong)))
testHarness.processWatermark(12000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 9L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 10L: JLong)))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true)))
+ CRow(12001L: JLong, "bbb", 40L: JLong)))
testHarness.processWatermark(19000)
@@ -867,13 +867,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processWatermark(20000)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20000L: JLong, "ccc", 2L: JLong), change = true))) // test for late data
+ CRow(20000L: JLong, "ccc", 2L: JLong))) // test for late data
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
+ CRow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 5000
testHarness.setProcessingTime(2500)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 5000
+ CRow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 5000
assert(testHarness.numKeyedStateEntries() > 0)
testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
@@ -890,38 +890,38 @@ class OverWindowHarnessTest extends HarnessTestBase{
val expectedOutput = new ConcurrentLinkedQueue[Object]()
expectedOutput.add(new StreamRecord(
- CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ CRow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true)))
+ CRow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
+ CRow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true)))
+ CRow(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true)))
+ CRow(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true)))
+ CRow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true)))
+ CRow(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true)))
+ CRow(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 9L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), change = true)))
+ CRow(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), change = true)))
+ CRow(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true)))
+ CRow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
expectedOutput.add(new StreamRecord(
- CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true)))
+ CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
verify(expectedOutput, result, new RowResultSortComparator())
testHarness.close()