You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/02 23:38:06 UTC

[iceberg] branch master updated: Flink 1.13: Backport fix for order dependent flink table source tests (#4682)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c82cf91da Flink 1.13: Backport fix for order dependent flink table source tests (#4682)
c82cf91da is described below

commit c82cf91da559829ea6429c26db329771667fb646
Author: Kyle Bendickson <kj...@gmail.com>
AuthorDate: Mon May 2 16:38:00 2022 -0700

    Flink 1.13: Backport fix for order dependent flink table source tests (#4682)
---
 .../org/apache/iceberg/flink/FlinkTestBase.java    | 14 ++++++++++++
 .../apache/iceberg/flink/TestFlinkTableSource.java | 26 +++++++++++-----------
 2 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index ea77366c9..8f5e50802 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -103,4 +104,17 @@ public abstract class FlinkTestBase extends TestBaseUtils {
       throw new RuntimeException("Failed to collect table result", e);
     }
   }
+
+  protected void assertSameElements(Iterable<Row> expected, Iterable<Row> actual) {
+    Assertions.assertThat(actual)
+        .isNotNull()
+        .containsExactlyInAnyOrderElementsOf(expected);
+  }
+
+  protected void assertSameElements(String message, Iterable<Row> expected, Iterable<Row> actual) {
+    Assertions.assertThat(actual)
+        .isNotNull()
+        .as(message)
+        .containsExactlyInAnyOrderElementsOf(expected);
+  }
 }
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
index 11a6b6781..06596d159 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
@@ -119,7 +119,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(2, "b", 20.0),
         Row.of(3, null, 30.0)
     );
-    Assert.assertEquals("Should produce the expected records", expectedList, resultExceed);
+    assertSameElements(expectedList, resultExceed);
 
     String sqlMixed = String.format("SELECT * FROM %s WHERE id = 1 LIMIT 2", TABLE_NAME);
     List<Row> mixedResult = sql(sqlMixed);
@@ -136,7 +136,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(2, "b", 20.0),
         Row.of(3, null, 30.0)
     );
-    Assert.assertArrayEquals("Should produce the expected record", expectedRecords.toArray(), result.toArray());
+    assertSameElements(expectedRecords, result);
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   }
 
@@ -187,7 +187,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(2, "b", 20.0),
         Row.of(3, null, 30.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedNE, resultNE);
+    assertSameElements(expectedNE, resultNE);
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
   }
@@ -226,7 +226,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(1, "iceberg", 10.0),
         Row.of(2, "b", 20.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedOR, resultOr);
+    assertSameElements(expectedOR, resultOr);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -244,7 +244,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(2, "b", 20.0),
         Row.of(3, null, 30.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedGT, resultGT);
+    assertSameElements(expectedGT, resultGT);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -271,7 +271,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(1, "iceberg", 10.0),
         Row.of(2, "b", 20.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedGT, resultGT);
+    assertSameElements(expectedGT, resultGT);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -289,7 +289,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(2, "b", 20.0),
         Row.of(3, null, 30.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedGTE, resultGTE);
+    assertSameElements(expectedGTE, resultGTE);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -316,7 +316,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(1, "iceberg", 10.0),
         Row.of(2, "b", 20.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedGTE, resultGTE);
+    assertSameElements(expectedGTE, resultGTE);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -403,7 +403,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(1, "iceberg", 10.0),
         Row.of(2, "b", 20.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedIN, resultIN);
+    assertSameElements(expectedIN, resultIN);
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
   }
@@ -450,7 +450,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(1, "iceberg", 10.0),
         Row.of(2, "b", 20.0)
     );
-    Assert.assertEquals("Should produce the expected record", expected, resultNotNull);
+    assertSameElements(expected, resultNotNull);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -493,7 +493,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(1, "iceberg", 10.0),
         Row.of(2, "b", 20.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedBetween, resultBetween);
+    assertSameElements(expectedBetween, resultBetween);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     String expected = "(ref(name=\"id\") >= 1 and ref(name=\"id\") <= 2)";
@@ -554,7 +554,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(2, "b", 20.0),
         Row.of(3, null, 30.0)
     );
-    Assert.assertEquals("Should produce the expected record", expectedRecords, resultLike);
+    assertSameElements(expectedRecords, resultLike);
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
     sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE 'iceber_' ";
@@ -579,7 +579,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
         Row.of(2, "b", 20.0),
         Row.of(3, null, 30.0)
     );
-    Assert.assertArrayEquals("Should produce the expected record", expectedRecords.toArray(), result.toArray());
+    assertSameElements(expectedRecords, result);
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   }