You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/04/01 06:26:25 UTC

[iceberg] branch master updated: Tests: Get row collection from flink sql query. (#2386)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 988a33c  Tests: Get row collection from flink sql query. (#2386)
988a33c is described below

commit 988a33cb58981c3fabb221f6b49ed9dd176c9abd
Author: openinx <op...@gmail.com>
AuthorDate: Thu Apr 1 14:26:14 2021 +0800

    Tests: Get row collection from flink sql query. (#2386)
---
 .../org/apache/iceberg/flink/FlinkTestBase.java    |  10 +-
 .../iceberg/flink/TestFlinkCatalogDatabase.java    |  17 +-
 .../apache/iceberg/flink/TestFlinkTableSource.java | 247 +++++++++++----------
 3 files changed, 137 insertions(+), 137 deletions(-)

diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 7e663b2..2810326 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg.flink;
 
 import java.util.List;
-import java.util.stream.IntStream;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
@@ -96,15 +95,10 @@ public abstract class FlinkTestBase extends TestBaseUtils {
     return exec(getTableEnv(), query, args);
   }
 
-  protected List<Object[]> sql(String query, Object... args) {
+  protected List<Row> sql(String query, Object... args) {
     TableResult tableResult = exec(query, args);
     try (CloseableIterator<Row> iter = tableResult.collect()) {
-      List<Object[]> results = Lists.newArrayList();
-      while (iter.hasNext()) {
-        Row row = iter.next();
-        results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
-      }
-      return results;
+      return Lists.newArrayList(iter);
     } catch (Exception e) {
       throw new RuntimeException("Failed to collect table result", e);
     }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index ced4e66..180a2bc 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -22,7 +22,9 @@ package org.apache.iceberg.flink;
 import java.io.File;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.types.Row;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Namespace;
@@ -139,9 +141,9 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
         TableIdentifier.of(icebergNamespace, "tl"),
         new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
 
-    List<Object[]> tables = sql("SHOW TABLES");
+    List<Row> tables = sql("SHOW TABLES");
     Assert.assertEquals("Only 1 table", 1, tables.size());
-    Assert.assertEquals("Table name should match", "tl", tables.get(0)[0]);
+    Assert.assertEquals("Table name should match", "tl", tables.get(0).getField(0));
   }
 
   @Test
@@ -155,12 +157,13 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
 
     Assert.assertTrue("Namespace should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace));
 
-    List<Object[]> databases = sql("SHOW DATABASES");
+    List<Row> databases = sql("SHOW DATABASES");
 
     if (isHadoopCatalog) {
       Assert.assertEquals("Should have 2 database", 2, databases.size());
       Assert.assertEquals("Should have db and default database",
-          Sets.newHashSet("default", "db"), Sets.newHashSet(databases.get(0)[0], databases.get(1)[0]));
+          Sets.newHashSet("default", "db"),
+          Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0)));
 
       if (!baseNamespace.isEmpty()) {
         // test namespace not belongs to this catalog
@@ -168,12 +171,14 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
         databases = sql("SHOW DATABASES");
         Assert.assertEquals("Should have 2 database", 2, databases.size());
         Assert.assertEquals("Should have db and default database",
-            Sets.newHashSet("default", "db"), Sets.newHashSet(databases.get(0)[0], databases.get(1)[0]));
+            Sets.newHashSet("default", "db"),
+            Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0)));
       }
     } else {
       // If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the creation for default
       // database. See HiveMetaStore.HMSHandler.init.
-      Assert.assertTrue("Should have db database", databases.stream().anyMatch(d -> d[0].equals("db")));
+      Assert.assertTrue("Should have db database",
+          databases.stream().anyMatch(d -> Objects.equals(d.getField(0), "db")));
     }
   }
 
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
index 62e51b2..11a6b67 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.table.api.SqlParserException;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.events.Listeners;
@@ -100,11 +101,10 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String querySql = String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME);
     String explain = getTableEnv().explainSql(querySql);
     String expectedExplain = "limit=[1]";
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
     Assert.assertTrue("Explain should contain LimitPushDown", explain.contains(expectedExplain));
-    List<Object[]> result = sql(querySql);
+    List<Row> result = sql(querySql);
     Assert.assertEquals("Should have 1 record", 1, result.size());
-    Assert.assertArrayEquals("Should produce the expected records", expectRecord, result.get(0));
+    Assert.assertEquals("Should produce the expected records", Row.of(1, "iceberg", 10.0), result.get(0));
 
     AssertHelpers.assertThrows("Invalid limit number: -1 ", SqlParserException.class,
         () -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME));
@@ -112,28 +112,30 @@ public class TestFlinkTableSource extends FlinkTestBase {
     Assert.assertEquals("Should have 0 record", 0, sql("SELECT * FROM %s LIMIT 0", TABLE_NAME).size());
 
     String sqlLimitExceed = String.format("SELECT * FROM %s LIMIT 4", TABLE_NAME);
-    List<Object[]> resultExceed = sql(sqlLimitExceed);
+    List<Row> resultExceed = sql(sqlLimitExceed);
     Assert.assertEquals("Should have 3 records", 3, resultExceed.size());
-    List<Object[]> expectedList = Lists.newArrayList();
-    expectedList.add(new Object[] {1, "iceberg", 10.0});
-    expectedList.add(new Object[] {2, "b", 20.0});
-    expectedList.add(new Object[] {3, null, 30.0});
-    Assert.assertArrayEquals("Should produce the expected records", expectedList.toArray(), resultExceed.toArray());
+    List<Row> expectedList = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0),
+        Row.of(3, null, 30.0)
+    );
+    Assert.assertEquals("Should produce the expected records", expectedList, resultExceed);
 
     String sqlMixed = String.format("SELECT * FROM %s WHERE id = 1 LIMIT 2", TABLE_NAME);
-    List<Object[]> mixedResult = sql(sqlMixed);
+    List<Row> mixedResult = sql(sqlMixed);
     Assert.assertEquals("Should have 1 record", 1, mixedResult.size());
-    Assert.assertArrayEquals("Should produce the expected records", expectRecord, mixedResult.get(0));
+    Assert.assertEquals("Should produce the expected records", Row.of(1, "iceberg", 10.0), mixedResult.get(0));
   }
 
   @Test
   public void testNoFilterPushDown() {
     String sql = String.format("SELECT * FROM %s ", TABLE_NAME);
-    List<Object[]> result = sql(sql);
-    List<Object[]> expectedRecords = Lists.newArrayList();
-    expectedRecords.add(new Object[] {1, "iceberg", 10.0});
-    expectedRecords.add(new Object[] {2, "b", 20.0});
-    expectedRecords.add(new Object[] {3, null, 30.0});
+    List<Row> result = sql(sql);
+    List<Row> expectedRecords = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0),
+        Row.of(3, null, 30.0)
+    );
     Assert.assertArrayEquals("Should produce the expected record", expectedRecords.toArray(), result.toArray());
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   }
@@ -142,11 +144,10 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownEqual() {
     String sqlLiteralRight = String.format("SELECT * FROM %s WHERE id = 1 ", TABLE_NAME);
     String expectedFilter = "ref(name=\"id\") == 1";
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
 
-    List<Object[]> result = sql(sqlLiteralRight);
+    List<Row> result = sql(sqlLiteralRight);
     Assert.assertEquals("Should have 1 record", 1, result.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, result.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(1, "iceberg", 10.0), result.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -156,7 +157,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownEqualNull() {
     String sqlEqualNull = String.format("SELECT * FROM %s WHERE data = NULL ", TABLE_NAME);
 
-    List<Object[]> result = sql(sqlEqualNull);
+    List<Row> result = sql(sqlEqualNull);
     Assert.assertEquals("Should have 0 record", 0, result.size());
     Assert.assertNull("Should not push down a filter", lastScanEvent);
   }
@@ -165,11 +166,10 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownEqualLiteralOnLeft() {
     String sqlLiteralLeft = String.format("SELECT * FROM %s WHERE 1 = id ", TABLE_NAME);
     String expectedFilter = "ref(name=\"id\") == 1";
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
 
-    List<Object[]> resultLeft = sql(sqlLiteralLeft);
+    List<Row> resultLeft = sql(sqlLiteralLeft);
     Assert.assertEquals("Should have 1 record", 1, resultLeft.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLeft.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(1, "iceberg", 10.0), resultLeft.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -180,13 +180,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlNE = String.format("SELECT * FROM %s WHERE id <> 1 ", TABLE_NAME);
     String expectedFilter = "ref(name=\"id\") != 1";
 
-    List<Object[]> resultNE = sql(sqlNE);
+    List<Row> resultNE = sql(sqlNE);
     Assert.assertEquals("Should have 2 records", 2, resultNE.size());
 
-    List<Object[]> expectedNE = Lists.newArrayList();
-    expectedNE.add(new Object[] {2, "b", 20.0});
-    expectedNE.add(new Object[] {3, null, 30.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedNE.toArray(), resultNE.toArray());
+    List<Row> expectedNE = Lists.newArrayList(
+        Row.of(2, "b", 20.0),
+        Row.of(3, null, 30.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedNE, resultNE);
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
   }
@@ -195,7 +196,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownNoEqualNull() {
     String sqlNotEqualNull = String.format("SELECT * FROM %s WHERE data <> NULL ", TABLE_NAME);
 
-    List<Object[]> resultNE = sql(sqlNotEqualNull);
+    List<Row> resultNE = sql(sqlNotEqualNull);
     Assert.assertEquals("Should have 0 records", 0, resultNE.size());
     Assert.assertNull("Should not push down a filter", lastScanEvent);
   }
@@ -203,11 +204,10 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownAnd() {
     String sqlAnd = String.format("SELECT * FROM %s WHERE id = 1 AND data = 'iceberg' ", TABLE_NAME);
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
 
-    List<Object[]> resultAnd = sql(sqlAnd);
+    List<Row> resultAnd = sql(sqlAnd);
     Assert.assertEquals("Should have 1 record", 1, resultAnd.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultAnd.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(1, "iceberg", 10.0), resultAnd.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     String expected = "(ref(name=\"id\") == 1 and ref(name=\"data\") == \"iceberg\")";
@@ -219,13 +219,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlOr = String.format("SELECT * FROM %s WHERE id = 1 OR data = 'b' ", TABLE_NAME);
     String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"data\") == \"b\")";
 
-    List<Object[]> resultOr = sql(sqlOr);
+    List<Row> resultOr = sql(sqlOr);
     Assert.assertEquals("Should have 2 record", 2, resultOr.size());
 
-    List<Object[]> expectedOR = Lists.newArrayList();
-    expectedOR.add(new Object[] {1, "iceberg", 10.0});
-    expectedOR.add(new Object[] {2, "b", 20.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedOR.toArray(), resultOr.toArray());
+    List<Row> expectedOR = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedOR, resultOr);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -236,13 +237,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlGT = String.format("SELECT * FROM %s WHERE id > 1 ", TABLE_NAME);
     String expectedFilter = "ref(name=\"id\") > 1";
 
-    List<Object[]> resultGT = sql(sqlGT);
+    List<Row> resultGT = sql(sqlGT);
     Assert.assertEquals("Should have 2 record", 2, resultGT.size());
 
-    List<Object[]> expectedGT = Lists.newArrayList();
-    expectedGT.add(new Object[] {2, "b", 20.0});
-    expectedGT.add(new Object[] {3, null, 30.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedGT.toArray(), resultGT.toArray());
+    List<Row> expectedGT = Lists.newArrayList(
+        Row.of(2, "b", 20.0),
+        Row.of(3, null, 30.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedGT, resultGT);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -252,7 +254,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownGreaterThanNull() {
     String sqlGT = String.format("SELECT * FROM %s WHERE data > null ", TABLE_NAME);
 
-    List<Object[]> resultGT = sql(sqlGT);
+    List<Row> resultGT = sql(sqlGT);
     Assert.assertEquals("Should have 0 record", 0, resultGT.size());
     Assert.assertNull("Should not push down a filter", lastScanEvent);
   }
@@ -262,13 +264,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlGT = String.format("SELECT * FROM %s WHERE 3 > id ", TABLE_NAME);
     String expectedFilter = "ref(name=\"id\") < 3";
 
-    List<Object[]> resultGT = sql(sqlGT);
+    List<Row> resultGT = sql(sqlGT);
     Assert.assertEquals("Should have 2 records", 2, resultGT.size());
 
-    List<Object[]> expectedGT = Lists.newArrayList();
-    expectedGT.add(new Object[] {1, "iceberg", 10.0});
-    expectedGT.add(new Object[] {2, "b", 20.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedGT.toArray(), resultGT.toArray());
+    List<Row> expectedGT = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedGT, resultGT);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -279,13 +282,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlGTE = String.format("SELECT * FROM %s WHERE id >= 2 ", TABLE_NAME);
     String expectedFilter = "ref(name=\"id\") >= 2";
 
-    List<Object[]> resultGTE = sql(sqlGTE);
+    List<Row> resultGTE = sql(sqlGTE);
     Assert.assertEquals("Should have 2 records", 2, resultGTE.size());
 
-    List<Object[]> expectedGTE = Lists.newArrayList();
-    expectedGTE.add(new Object[] {2, "b", 20.0});
-    expectedGTE.add(new Object[] {3, null, 30.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedGTE.toArray(), resultGTE.toArray());
+    List<Row> expectedGTE = Lists.newArrayList(
+        Row.of(2, "b", 20.0),
+        Row.of(3, null, 30.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedGTE, resultGTE);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -295,7 +299,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownGreaterThanEqualNull() {
     String sqlGTE = String.format("SELECT * FROM %s WHERE data >= null ", TABLE_NAME);
 
-    List<Object[]> resultGT = sql(sqlGTE);
+    List<Row> resultGT = sql(sqlGTE);
     Assert.assertEquals("Should have 0 record", 0, resultGT.size());
     Assert.assertNull("Should not push down a filter", lastScanEvent);
   }
@@ -305,13 +309,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlGTE = String.format("SELECT * FROM %s WHERE 2 >= id ", TABLE_NAME);
     String expectedFilter = "ref(name=\"id\") <= 2";
 
-    List<Object[]> resultGTE = sql(sqlGTE);
+    List<Row> resultGTE = sql(sqlGTE);
     Assert.assertEquals("Should have 2 records", 2, resultGTE.size());
 
-    List<Object[]> expectedGTE = Lists.newArrayList();
-    expectedGTE.add(new Object[] {1, "iceberg", 10.0});
-    expectedGTE.add(new Object[] {2, "b", 20.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedGTE.toArray(), resultGTE.toArray());
+    List<Row> expectedGTE = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedGTE, resultGTE);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -321,11 +326,10 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownLessThan() {
     String sqlLT = String.format("SELECT * FROM %s WHERE id < 2 ", TABLE_NAME);
     String expectedFilter = "ref(name=\"id\") < 2";
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
 
-    List<Object[]> resultLT = sql(sqlLT);
+    List<Row> resultLT = sql(sqlLT);
     Assert.assertEquals("Should have 1 record", 1, resultLT.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLT.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(1, "iceberg", 10.0), resultLT.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -335,7 +339,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownLessThanNull() {
     String sqlLT = String.format("SELECT * FROM %s WHERE data < null ", TABLE_NAME);
 
-    List<Object[]> resultGT = sql(sqlLT);
+    List<Row> resultGT = sql(sqlLT);
     Assert.assertEquals("Should have 0 record", 0, resultGT.size());
     Assert.assertNull("Should not push down a filter", lastScanEvent);
   }
@@ -343,12 +347,11 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownLessThanLiteralOnLeft() {
     String sqlLT = String.format("SELECT * FROM %s WHERE 2 < id ", TABLE_NAME);
-    Object[] expectRecord = new Object[] {3, null, 30.0};
     String expectedFilter = "ref(name=\"id\") > 2";
 
-    List<Object[]> resultLT = sql(sqlLT);
+    List<Row> resultLT = sql(sqlLT);
     Assert.assertEquals("Should have 1 record", 1, resultLT.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLT.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(3, null, 30.0), resultLT.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -357,12 +360,11 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownLessThanEqual() {
     String sqlLTE = String.format("SELECT * FROM %s WHERE id <= 1 ", TABLE_NAME);
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
     String expectedFilter = "ref(name=\"id\") <= 1";
 
-    List<Object[]> resultLTE = sql(sqlLTE);
+    List<Row> resultLTE = sql(sqlLTE);
     Assert.assertEquals("Should have 1 record", 1, resultLTE.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLTE.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(1, "iceberg", 10.0), resultLTE.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -372,7 +374,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownLessThanEqualNull() {
     String sqlLTE = String.format("SELECT * FROM %s WHERE data <= null ", TABLE_NAME);
 
-    List<Object[]> resultGT = sql(sqlLTE);
+    List<Row> resultGT = sql(sqlLTE);
     Assert.assertEquals("Should have 0 record", 0, resultGT.size());
     Assert.assertNull("Should not push down a filter", lastScanEvent);
   }
@@ -380,12 +382,11 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownLessThanEqualLiteralOnLeft() {
     String sqlLTE = String.format("SELECT * FROM %s WHERE 3 <= id  ", TABLE_NAME);
-    Object[] expectRecord = new Object[] {3, null, 30.0};
     String expectedFilter = "ref(name=\"id\") >= 3";
 
-    List<Object[]> resultLTE = sql(sqlLTE);
+    List<Row> resultLTE = sql(sqlLTE);
     Assert.assertEquals("Should have 1 record", 1, resultLTE.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLTE.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(3, null, 30.0), resultLTE.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -395,13 +396,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownIn() {
     String sqlIN = String.format("SELECT * FROM %s WHERE id IN (1,2) ", TABLE_NAME);
     String expectedFilter = "(ref(name=\"id\") == 1 or ref(name=\"id\") == 2)";
-    List<Object[]> resultIN = sql(sqlIN);
+    List<Row> resultIN = sql(sqlIN);
     Assert.assertEquals("Should have 2 records", 2, resultIN.size());
 
-    List<Object[]> expectedIN = Lists.newArrayList();
-    expectedIN.add(new Object[] {1, "iceberg", 10.0});
-    expectedIN.add(new Object[] {2, "b", 20.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedIN.toArray(), resultIN.toArray());
+    List<Row> expectedIN = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedIN, resultIN);
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
   }
@@ -409,22 +411,20 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownInNull() {
     String sqlInNull = String.format("SELECT * FROM %s WHERE data IN ('iceberg',NULL) ", TABLE_NAME);
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
 
-    List<Object[]> result = sql(sqlInNull);
+    List<Row> result = sql(sqlInNull);
     Assert.assertEquals("Should have 1 record", 1, result.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, result.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(1, "iceberg", 10.0), result.get(0));
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   }
 
   @Test
   public void testFilterPushDownNotIn() {
     String sqlNotIn = String.format("SELECT * FROM %s WHERE id NOT IN (3,2) ", TABLE_NAME);
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
 
-    List<Object[]> resultNotIn = sql(sqlNotIn);
+    List<Row> resultNotIn = sql(sqlNotIn);
     Assert.assertEquals("Should have 1 record", 1, resultNotIn.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultNotIn.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(1, "iceberg", 10.0), resultNotIn.get(0));
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     String expectedScan = "(ref(name=\"id\") != 2 and ref(name=\"id\") != 3)";
     Assert.assertEquals("Should contain the push down filter", expectedScan, lastScanEvent.filter().toString());
@@ -433,7 +433,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownNotInNull() {
     String sqlNotInNull = String.format("SELECT * FROM %s WHERE id NOT IN (1,2,NULL) ", TABLE_NAME);
-    List<Object[]> resultGT = sql(sqlNotInNull);
+    List<Row> resultGT = sql(sqlNotInNull);
     Assert.assertEquals("Should have 0 record", 0, resultGT.size());
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   }
@@ -443,13 +443,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlNotNull = String.format("SELECT * FROM %s WHERE data IS NOT NULL", TABLE_NAME);
     String expectedFilter = "not_null(ref(name=\"data\"))";
 
-    List<Object[]> resultNotNull = sql(sqlNotNull);
+    List<Row> resultNotNull = sql(sqlNotNull);
     Assert.assertEquals("Should have 2 record", 2, resultNotNull.size());
 
-    List<Object[]> expected = Lists.newArrayList();
-    expected.add(new Object[] {1, "iceberg", 10.0});
-    expected.add(new Object[] {2, "b", 20.0});
-    Assert.assertArrayEquals("Should produce the expected record", expected.toArray(), resultNotNull.toArray());
+    List<Row> expected = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expected, resultNotNull);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -458,12 +459,11 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownIsNull() {
     String sqlNull = String.format("SELECT * FROM %s WHERE data IS  NULL", TABLE_NAME);
-    Object[] expectRecord = new Object[] {3, null, 30.0};
     String expectedFilter = "is_null(ref(name=\"data\"))";
 
-    List<Object[]> resultNull = sql(sqlNull);
+    List<Row> resultNull = sql(sqlNull);
     Assert.assertEquals("Should have 1 record", 1, resultNull.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultNull.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(3, null, 30.0), resultNull.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -472,11 +472,10 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownNot() {
     String sqlNot = String.format("SELECT * FROM %s WHERE NOT (id = 1 OR id = 2 ) ", TABLE_NAME);
-    Object[] expectRecord = new Object[] {3, null, 30.0};
 
-    List<Object[]> resultNot = sql(sqlNot);
+    List<Row> resultNot = sql(sqlNot);
     Assert.assertEquals("Should have 1 record", 1, resultNot.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultNot.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(3, null, 30.0), resultNot.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     String expectedFilter = "(ref(name=\"id\") != 1 and ref(name=\"id\") != 2)";
@@ -487,13 +486,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void testFilterPushDownBetween() {
     String sqlBetween = String.format("SELECT * FROM %s WHERE id BETWEEN 1 AND 2 ", TABLE_NAME);
 
-    List<Object[]> resultBetween = sql(sqlBetween);
+    List<Row> resultBetween = sql(sqlBetween);
     Assert.assertEquals("Should have 2 record", 2, resultBetween.size());
 
-    List<Object[]> expectedBetween = Lists.newArrayList();
-    expectedBetween.add(new Object[] {1, "iceberg", 10.0});
-    expectedBetween.add(new Object[] {2, "b", 20.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedBetween.toArray(), resultBetween.toArray());
+    List<Row> expectedBetween = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedBetween, resultBetween);
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     String expected = "(ref(name=\"id\") >= 1 and ref(name=\"id\") <= 2)";
@@ -503,12 +503,11 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testFilterPushDownNotBetween() {
     String sqlNotBetween = String.format("SELECT * FROM %s WHERE id  NOT BETWEEN 2 AND 3 ", TABLE_NAME);
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
     String expectedFilter = "(ref(name=\"id\") < 2 or ref(name=\"id\") > 3)";
 
-    List<Object[]> resultNotBetween = sql(sqlNotBetween);
+    List<Row> resultNotBetween = sql(sqlNotBetween);
     Assert.assertEquals("Should have 1 record", 1, resultNotBetween.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultNotBetween.get(0));
+    Assert.assertEquals("Should produce the expected record", Row.of(1, "iceberg", 10.0), resultNotBetween.get(0));
 
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
@@ -516,68 +515,70 @@ public class TestFlinkTableSource extends FlinkTestBase {
 
   @Test
   public void testFilterPushDownLike() {
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
     String expectedFilter = "ref(name=\"data\") startsWith \"\"ice\"\"";
 
     String sqlLike = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE 'ice%%' ";
-    List<Object[]> resultLike = sql(sqlLike);
+    List<Row> resultLike = sql(sqlLike);
     Assert.assertEquals("Should have 1 record", 1, resultLike.size());
-    Assert.assertArrayEquals("The like result should produce the expected record", expectRecord, resultLike.get(0));
+    Assert.assertEquals("The like result should produce the expected record",
+        Row.of(1, "iceberg", 10.0), resultLike.get(0));
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals("Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
   }
 
   @Test
   public void testFilterNotPushDownLike() {
-    Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
+    Row expectRecord = Row.of(1, "iceberg", 10.0);
     String sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i' ";
-    List<Object[]> resultLike = sql(sqlNoPushDown);
+    List<Row> resultLike = sql(sqlNoPushDown);
     Assert.assertEquals("Should have 1 record", 0, resultLike.size());
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
     sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i%%' ";
     resultLike = sql(sqlNoPushDown);
     Assert.assertEquals("Should have 1 record", 1, resultLike.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLike.get(0));
+    Assert.assertEquals("Should produce the expected record", expectRecord, resultLike.get(0));
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
     sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%%ice%%g' ";
     resultLike = sql(sqlNoPushDown);
     Assert.assertEquals("Should have 1 record", 1, resultLike.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLike.get(0));
+    Assert.assertEquals("Should produce the expected record", expectRecord, resultLike.get(0));
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
     sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%%' ";
     resultLike = sql(sqlNoPushDown);
     Assert.assertEquals("Should have 3 records", 3, resultLike.size());
-    List<Object[]> expectedRecords = Lists.newArrayList();
-    expectedRecords.add(new Object[] {1, "iceberg", 10.0});
-    expectedRecords.add(new Object[] {2, "b", 20.0});
-    expectedRecords.add(new Object[] {3, null, 30.0});
-    Assert.assertArrayEquals("Should produce the expected record", expectedRecords.toArray(), resultLike.toArray());
+    List<Row> expectedRecords = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0),
+        Row.of(3, null, 30.0)
+    );
+    Assert.assertEquals("Should produce the expected record", expectedRecords, resultLike);
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
     sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE 'iceber_' ";
     resultLike = sql(sqlNoPushDown);
     Assert.assertEquals("Should have 1 record", 1, resultLike.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLike.get(0));
+    Assert.assertEquals("Should produce the expected record", expectRecord, resultLike.get(0));
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
     sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE 'i%%g' ";
     resultLike = sql(sqlNoPushDown);
     Assert.assertEquals("Should have 1 record", 1, resultLike.size());
-    Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultLike.get(0));
+    Assert.assertEquals("Should produce the expected record", expectRecord, resultLike.get(0));
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   }
 
   @Test
   public void testFilterPushDown2Literal() {
     String sql2Literal = String.format("SELECT * FROM %s WHERE 1 > 0 ", TABLE_NAME);
-    List<Object[]> result = sql(sql2Literal);
-    List<Object[]> expectedRecords = Lists.newArrayList();
-    expectedRecords.add(new Object[] {1, "iceberg", 10.0});
-    expectedRecords.add(new Object[] {2, "b", 20.0});
-    expectedRecords.add(new Object[] {3, null, 30.0});
+    List<Row> result = sql(sql2Literal);
+    List<Row> expectedRecords = Lists.newArrayList(
+        Row.of(1, "iceberg", 10.0),
+        Row.of(2, "b", 20.0),
+        Row.of(3, null, 30.0)
+    );
     Assert.assertArrayEquals("Should produce the expected record", expectedRecords.toArray(), result.toArray());
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   }