You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:43 UTC
[flink] 10/11: [hotfix][table,
tests] Add convienient verify methods to HarnessTestBase
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit dde089be869c8d2d24f27059530b086326daa5d5
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 20 12:40:36 2018 +0200
[hotfix][table,tests] Add convienient verify methods to HarnessTestBase
---
.../table/runtime/harness/HarnessTestBase.scala | 9 ++++
.../table/runtime/harness/JoinHarnessTest.scala | 50 +++++++---------------
.../runtime/harness/NonWindowHarnessTest.scala | 4 +-
.../runtime/harness/OverWindowHarnessTest.scala | 14 +++---
4 files changed, 33 insertions(+), 44 deletions(-)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 58fe9d3..d494c21 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.functions.aggfunctions.{IntSumWithRetractAggFunction, LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
+import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
class HarnessTestBase {
@@ -318,6 +319,14 @@ class HarnessTestBase {
new KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT](operator, keySelector, keyType)
}
+ def verify(expected: JQueue[Object], actual: JQueue[Object]): Unit = {
+ verify(expected, actual, new RowResultSortComparator)
+ }
+
+ def verifyWithWatermarks(expected: JQueue[Object], actual: JQueue[Object]): Unit = {
+ verify(expected, actual, new RowResultSortComparatorWithWatermarks, true)
+ }
+
def verify(
expected: JQueue[Object],
actual: JQueue[Object],
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 86133a0..bd19be8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -231,7 +231,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(
CRow(2L: JLong, "2a33", 2L: JLong, "2b33"), 33))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -313,7 +313,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(
CRow(1L: JLong, "1a3", 1L: JLong, "1b12"), 12))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -409,11 +409,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new Watermark(41))
val result = testHarness.getOutput
- verify(
- expectedOutput,
- result,
- new RowResultSortComparatorWithWatermarks(),
- checkWaterMark = true)
+ verifyWithWatermarks(expectedOutput, result)
testHarness.close()
}
@@ -491,11 +487,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new Watermark(8))
val result = testHarness.getOutput
- verify(
- expectedOutput,
- result,
- new RowResultSortComparatorWithWatermarks(),
- checkWaterMark = true)
+ verifyWithWatermarks(expectedOutput, result)
testHarness.close()
}
@@ -605,11 +597,7 @@ class JoinHarnessTest extends HarnessTestBase {
val result = testHarness.getOutput
- verify(
- expectedOutput,
- result,
- new RowResultSortComparatorWithWatermarks(),
- checkWaterMark = true)
+ verifyWithWatermarks(expectedOutput, result)
testHarness.close()
}
@@ -718,11 +706,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new Watermark(91))
val result = testHarness.getOutput
- verify(
- expectedOutput,
- result,
- new RowResultSortComparatorWithWatermarks(),
- checkWaterMark = true)
+ verifyWithWatermarks(expectedOutput, result)
testHarness.close()
}
@@ -839,11 +823,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new Watermark(91))
val result = testHarness.getOutput
- verify(
- expectedOutput,
- result,
- new RowResultSortComparatorWithWatermarks(),
- checkWaterMark = true)
+ verifyWithWatermarks(expectedOutput, result)
testHarness.close()
}
@@ -944,7 +924,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(
CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -1035,7 +1015,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(
CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -1143,7 +1123,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(
CRow(false, 1: JInt, "aaa", null: JInt, null)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -1273,7 +1253,7 @@ class JoinHarnessTest extends HarnessTestBase {
CRow(false, 1: JInt, "bbb", 1: JInt, "Hi1")))
expectedOutput.add(new StreamRecord(
CRow(1: JInt, "bbb", null: JInt, null)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -1381,7 +1361,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(
CRow(false, null: JInt, null, 1: JInt, "aaa")))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -1511,7 +1491,7 @@ class JoinHarnessTest extends HarnessTestBase {
CRow(false, 1: JInt, "Hi1", 1: JInt, "bbb")))
expectedOutput.add(new StreamRecord(
CRow(null: JInt, null, 1: JInt, "bbb")))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -1681,7 +1661,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(
CRow(null: JInt, null, 2: JInt, "bbb")))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -1836,7 +1816,7 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(
CRow(null: JInt, null, 2: JInt, "bbb")))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index d6daa9e..7c4f543 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -91,7 +91,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(CRow(9L: JLong, 18: JInt), 1))
expectedOutput.add(new StreamRecord(CRow(10L: JLong, 3: JInt), 1))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -150,7 +150,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
expectedOutput.add(new StreamRecord(CRow(false, 10L: JLong, 2: JInt), 10))
expectedOutput.add(new StreamRecord(CRow(10L: JLong, 5: JInt), 10))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 63d7b5d..95b13a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -130,7 +130,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
expectedOutput.add(new StreamRecord(
CRow(2L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -261,7 +261,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
expectedOutput.add(new StreamRecord(
CRow(0L: JLong, "ccc", 20L: JLong, 10L: JLong, 20L: JLong)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -351,7 +351,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
expectedOutput.add(new StreamRecord(
CRow(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -504,7 +504,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
expectedOutput.add(new StreamRecord(
CRow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -649,7 +649,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
expectedOutput.add(new StreamRecord(
CRow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -788,7 +788,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
expectedOutput.add(new StreamRecord(
CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
@@ -923,7 +923,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
expectedOutput.add(new StreamRecord(
CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
- verify(expectedOutput, result, new RowResultSortComparator())
+ verify(expectedOutput, result)
testHarness.close()
}
}