You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/10 02:13:45 UTC

[iceberg] branch master updated: Flink: Backport flaky test fix for limit 1 query in TestFlinkTableSource (#4724)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a8f9f2997 Flink: Backport flaky test fix for limit 1 query in TestFlinkTableSource (#4724)
a8f9f2997 is described below

commit a8f9f29976ab6fa1c734cb04c67a4184911a88cd
Author: Kyle Bendickson <kj...@gmail.com>
AuthorDate: Mon May 9 19:13:41 2022 -0700

    Flink: Backport flaky test fix for limit 1 query in TestFlinkTableSource (#4724)
---
 .../org/apache/iceberg/flink/TestFlinkTableSource.java  | 17 ++++++++++-------
 .../org/apache/iceberg/flink/TestFlinkTableSource.java  | 17 ++++++++++-------
 2 files changed, 20 insertions(+), 14 deletions(-)

diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
index 06596d159..fe9c9d832 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.events.ScanEvent;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -98,13 +99,6 @@ public class TestFlinkTableSource extends FlinkTestBase {
 
   @Test
   public void testLimitPushDown() {
-    String querySql = String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME);
-    String explain = getTableEnv().explainSql(querySql);
-    String expectedExplain = "limit=[1]";
-    Assert.assertTrue("Explain should contain LimitPushDown", explain.contains(expectedExplain));
-    List<Row> result = sql(querySql);
-    Assert.assertEquals("Should have 1 record", 1, result.size());
-    Assert.assertEquals("Should produce the expected records", Row.of(1, "iceberg", 10.0), result.get(0));
 
     AssertHelpers.assertThrows("Invalid limit number: -1 ", SqlParserException.class,
         () -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME));
@@ -121,6 +115,15 @@ public class TestFlinkTableSource extends FlinkTestBase {
     );
     assertSameElements(expectedList, resultExceed);
 
+    String querySql = String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME);
+    String explain = getTableEnv().explainSql(querySql);
+    String expectedExplain = "limit=[1]";
+    Assert.assertTrue("Explain should contain LimitPushDown", explain.contains(expectedExplain));
+    List<Row> result = sql(querySql);
+    Assert.assertEquals("Should have 1 record", 1, result.size());
+    Assertions.assertThat(result)
+        .containsAnyElementsOf(expectedList);
+
     String sqlMixed = String.format("SELECT * FROM %s WHERE id = 1 LIMIT 2", TABLE_NAME);
     List<Row> mixedResult = sql(sqlMixed);
     Assert.assertEquals("Should have 1 record", 1, mixedResult.size());
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
index 06596d159..fe9c9d832 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.events.ScanEvent;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -98,13 +99,6 @@ public class TestFlinkTableSource extends FlinkTestBase {
 
   @Test
   public void testLimitPushDown() {
-    String querySql = String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME);
-    String explain = getTableEnv().explainSql(querySql);
-    String expectedExplain = "limit=[1]";
-    Assert.assertTrue("Explain should contain LimitPushDown", explain.contains(expectedExplain));
-    List<Row> result = sql(querySql);
-    Assert.assertEquals("Should have 1 record", 1, result.size());
-    Assert.assertEquals("Should produce the expected records", Row.of(1, "iceberg", 10.0), result.get(0));
 
     AssertHelpers.assertThrows("Invalid limit number: -1 ", SqlParserException.class,
         () -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME));
@@ -121,6 +115,15 @@ public class TestFlinkTableSource extends FlinkTestBase {
     );
     assertSameElements(expectedList, resultExceed);
 
+    String querySql = String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME);
+    String explain = getTableEnv().explainSql(querySql);
+    String expectedExplain = "limit=[1]";
+    Assert.assertTrue("Explain should contain LimitPushDown", explain.contains(expectedExplain));
+    List<Row> result = sql(querySql);
+    Assert.assertEquals("Should have 1 record", 1, result.size());
+    Assertions.assertThat(result)
+        .containsAnyElementsOf(expectedList);
+
     String sqlMixed = String.format("SELECT * FROM %s WHERE id = 1 LIMIT 2", TABLE_NAME);
     List<Row> mixedResult = sql(sqlMixed);
     Assert.assertEquals("Should have 1 record", 1, mixedResult.size());