You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:43 UTC

[flink] 10/11: [hotfix][table, tests] Add convienient verify methods to HarnessTestBase

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dde089be869c8d2d24f27059530b086326daa5d5
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 20 12:40:36 2018 +0200

    [hotfix][table,tests] Add convienient verify methods to HarnessTestBase
---
 .../table/runtime/harness/HarnessTestBase.scala    |  9 ++++
 .../table/runtime/harness/JoinHarnessTest.scala    | 50 +++++++---------------
 .../runtime/harness/NonWindowHarnessTest.scala     |  4 +-
 .../runtime/harness/OverWindowHarnessTest.scala    | 14 +++---
 4 files changed, 33 insertions(+), 44 deletions(-)

diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 58fe9d3..d494c21 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.functions.aggfunctions.{IntSumWithRetractAggFunction, LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
+import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 class HarnessTestBase {
@@ -318,6 +319,14 @@ class HarnessTestBase {
     new KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT](operator, keySelector, keyType)
   }
 
+  def verify(expected: JQueue[Object], actual: JQueue[Object]): Unit = {
+    verify(expected, actual, new RowResultSortComparator)
+  }
+
+  def verifyWithWatermarks(expected: JQueue[Object], actual: JQueue[Object]): Unit = {
+    verify(expected, actual, new RowResultSortComparatorWithWatermarks, true)
+  }
+
   def verify(
     expected: JQueue[Object],
     actual: JQueue[Object],
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 86133a0..bd19be8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -231,7 +231,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(
       CRow(2L: JLong, "2a33", 2L: JLong, "2b33"), 33))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -313,7 +313,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(
       CRow(1L: JLong, "1a3", 1L: JLong, "1b12"), 12))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -409,11 +409,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new Watermark(41))
 
     val result = testHarness.getOutput
-    verify(
-      expectedOutput,
-      result,
-      new RowResultSortComparatorWithWatermarks(),
-      checkWaterMark = true)
+    verifyWithWatermarks(expectedOutput, result)
     testHarness.close()
   }
 
@@ -491,11 +487,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new Watermark(8))
 
     val result = testHarness.getOutput
-    verify(
-      expectedOutput,
-      result,
-      new RowResultSortComparatorWithWatermarks(),
-      checkWaterMark = true)
+    verifyWithWatermarks(expectedOutput, result)
     testHarness.close()
   }
 
@@ -605,11 +597,7 @@ class JoinHarnessTest extends HarnessTestBase {
 
 
     val result = testHarness.getOutput
-    verify(
-      expectedOutput,
-      result,
-      new RowResultSortComparatorWithWatermarks(),
-      checkWaterMark = true)
+    verifyWithWatermarks(expectedOutput, result)
     testHarness.close()
   }
 
@@ -718,11 +706,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new Watermark(91))
 
     val result = testHarness.getOutput
-    verify(
-      expectedOutput,
-      result,
-      new RowResultSortComparatorWithWatermarks(),
-      checkWaterMark = true)
+    verifyWithWatermarks(expectedOutput, result)
     testHarness.close()
   }
 
@@ -839,11 +823,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new Watermark(91))
 
     val result = testHarness.getOutput
-    verify(
-      expectedOutput,
-      result,
-      new RowResultSortComparatorWithWatermarks(),
-      checkWaterMark = true)
+    verifyWithWatermarks(expectedOutput, result)
     testHarness.close()
   }
 
@@ -944,7 +924,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(
       CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -1035,7 +1015,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(
       CRow(2: JInt, "bbb", 2: JInt, "Hello1")))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -1143,7 +1123,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(
       CRow(false, 1: JInt, "aaa", null: JInt, null)))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -1273,7 +1253,7 @@ class JoinHarnessTest extends HarnessTestBase {
       CRow(false, 1: JInt, "bbb", 1: JInt, "Hi1")))
     expectedOutput.add(new StreamRecord(
       CRow(1: JInt, "bbb", null: JInt, null)))
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -1381,7 +1361,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(
       CRow(false, null: JInt, null, 1: JInt, "aaa")))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -1511,7 +1491,7 @@ class JoinHarnessTest extends HarnessTestBase {
       CRow(false, 1: JInt, "Hi1", 1: JInt, "bbb")))
     expectedOutput.add(new StreamRecord(
       CRow(null: JInt, null, 1: JInt, "bbb")))
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -1681,7 +1661,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(
       CRow(null: JInt, null, 2: JInt, "bbb")))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
     testHarness.close()
   }
 
@@ -1836,7 +1816,7 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(
       CRow(null: JInt, null, 2: JInt, "bbb")))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
     testHarness.close()
   }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index d6daa9e..7c4f543 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -91,7 +91,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(CRow(9L: JLong, 18: JInt), 1))
     expectedOutput.add(new StreamRecord(CRow(10L: JLong, 3: JInt), 1))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -150,7 +150,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(CRow(false, 10L: JLong, 2: JInt), 10))
     expectedOutput.add(new StreamRecord(CRow(10L: JLong, 5: JInt), 10))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 63d7b5d..95b13a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -130,7 +130,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(2L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -261,7 +261,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(0L: JLong, "ccc", 20L: JLong, 10L: JLong, 20L: JLong)))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
 
     testHarness.close()
   }
@@ -351,7 +351,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
     testHarness.close()
   }
 
@@ -504,7 +504,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
     testHarness.close()
   }
 
@@ -649,7 +649,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
     testHarness.close()
   }
 
@@ -788,7 +788,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
     testHarness.close()
   }
 
@@ -923,7 +923,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
 
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(expectedOutput, result)
     testHarness.close()
   }
 }