You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2023/01/16 19:06:20 UTC

[iceberg] branch master updated: Flink: Backport: Improve unit tests for sink (#6603)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 37142709ed Flink: Backport: Improve unit tests for sink (#6603)
37142709ed is described below

commit 37142709edd2b1b5da5a0db2036eacd9f608c8b5
Author: pvary <pe...@gmail.com>
AuthorDate: Mon Jan 16 20:06:12 2023 +0100

    Flink: Backport: Improve unit tests for sink (#6603)
---
 .../java/org/apache/iceberg/flink/SimpleDataUtil.java | 10 ++++++----
 .../apache/iceberg/flink/sink/TestFlinkManifest.java  |  9 ++++-----
 .../iceberg/flink/sink/TestIcebergFilesCommitter.java | 19 ++++++++++---------
 .../iceberg/flink/sink/TestIcebergStreamWriter.java   | 10 ++++------
 .../apache/iceberg/flink/sink/TestTaskWriters.java    | 10 ++++------
 5 files changed, 28 insertions(+), 30 deletions(-)

diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 970feea2ae..e296763508 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -156,13 +156,14 @@ public class SimpleDataUtil {
   public static DeleteFile writeEqDeleteFile(
       Table table,
       FileFormat format,
-      String tablePath,
       String filename,
       FileAppenderFactory<RowData> appenderFactory,
       List<RowData> deletes)
       throws IOException {
     EncryptedOutputFile outputFile =
-        table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
+        table
+            .encryption()
+            .encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
 
     EqualityDeleteWriter<RowData> eqWriter =
         appenderFactory.newEqDeleteWriter(outputFile, format, null);
@@ -175,13 +176,14 @@ public class SimpleDataUtil {
   public static DeleteFile writePosDeleteFile(
       Table table,
       FileFormat format,
-      String tablePath,
       String filename,
       FileAppenderFactory<RowData> appenderFactory,
       List<Pair<CharSequence, Long>> positions)
       throws IOException {
     EncryptedOutputFile outputFile =
-        table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
+        table
+            .encryption()
+            .encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
 
     PositionDeleteWriter<RowData> posWriter =
         appenderFactory.newPosDeleteWriter(outputFile, format, null);
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 5528e71b3d..3680142010 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -57,7 +57,6 @@ public class TestFlinkManifest {
 
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
-  private String tablePath;
   private Table table;
   private FileAppenderFactory<RowData> appenderFactory;
   private final AtomicInteger fileCount = new AtomicInteger(0);
@@ -67,7 +66,7 @@ public class TestFlinkManifest {
     File folder = tempFolder.newFolder();
     String warehouse = folder.getAbsolutePath();
 
-    tablePath = warehouse.concat("/test");
+    String tablePath = warehouse.concat("/test");
     Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());
 
     // Construct the iceberg table.
@@ -260,20 +259,20 @@ public class TestFlinkManifest {
         table.schema(),
         table.spec(),
         CONF,
-        tablePath,
+        table.location(),
         FileFormat.PARQUET.addExtension(filename),
         rows);
   }
 
   private DeleteFile writeEqDeleteFile(String filename, List<RowData> deletes) throws IOException {
     return SimpleDataUtil.writeEqDeleteFile(
-        table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
+        table, FileFormat.PARQUET, filename, appenderFactory, deletes);
   }
 
   private DeleteFile writePosDeleteFile(String filename, List<Pair<CharSequence, Long>> positions)
       throws IOException {
     return SimpleDataUtil.writePosDeleteFile(
-        table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions);
+        table, FileFormat.PARQUET, filename, appenderFactory, positions);
   }
 
   private List<DataFile> generateDataFiles(int fileNum) throws IOException {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index c4f93f0ec2..66baaeb0e9 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -73,7 +73,6 @@ import org.junit.runners.Parameterized;
 public class TestIcebergFilesCommitter extends TableTestBase {
   private static final Configuration CONF = new Configuration();
 
-  private String tablePath;
   private File flinkManifestFolder;
 
   private final FileFormat format;
@@ -104,8 +103,6 @@ public class TestIcebergFilesCommitter extends TableTestBase {
     this.metadataDir = new File(tableDir, "metadata");
     Assert.assertTrue(tableDir.delete());
 
-    tablePath = tableDir.getAbsolutePath();
-
     // Construct the iceberg table.
     table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
 
@@ -881,8 +878,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   private DeleteFile writeEqDeleteFile(
       FileAppenderFactory<RowData> appenderFactory, String filename, List<RowData> deletes)
       throws IOException {
-    return SimpleDataUtil.writeEqDeleteFile(
-        table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
+    return SimpleDataUtil.writeEqDeleteFile(table, format, filename, appenderFactory, deletes);
   }
 
   private DeleteFile writePosDeleteFile(
@@ -890,8 +886,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       String filename,
       List<Pair<CharSequence, Long>> positions)
       throws IOException {
-    return SimpleDataUtil.writePosDeleteFile(
-        table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions);
+    return SimpleDataUtil.writePosDeleteFile(table, format, filename, appenderFactory, positions);
   }
 
   private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
@@ -943,7 +938,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
   private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
     return SimpleDataUtil.writeFile(
-        table, table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
+        table,
+        table.schema(),
+        table.spec(),
+        CONF,
+        table.location(),
+        format.addExtension(filename),
+        rows);
   }
 
   private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long expectedId) {
@@ -961,7 +962,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
   private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
       throws Exception {
-    TestOperatorFactory factory = TestOperatorFactory.of(tablePath);
+    TestOperatorFactory factory = TestOperatorFactory.of(table.location());
     return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
   }
 
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index bd959bfb31..06942b8ebe 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -68,7 +68,6 @@ import org.junit.runners.Parameterized;
 public class TestIcebergStreamWriter {
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
-  private String tablePath;
   private Table table;
 
   private final FileFormat format;
@@ -94,11 +93,10 @@ public class TestIcebergStreamWriter {
   @Before
   public void before() throws IOException {
     File folder = tempFolder.newFolder();
-    tablePath = folder.getAbsolutePath();
 
     // Construct the iceberg table.
     Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
-    table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+    table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
   }
 
   @Test
@@ -136,7 +134,7 @@ public class TestIcebergStreamWriter {
 
       // Assert the table records.
       SimpleDataUtil.assertTableRecords(
-          tablePath,
+          table,
           Lists.newArrayList(
               SimpleDataUtil.createRecord(1, "hello"),
               SimpleDataUtil.createRecord(2, "world"),
@@ -192,7 +190,7 @@ public class TestIcebergStreamWriter {
   }
 
   private Set<String> scanDataFiles() throws IOException {
-    Path dataDir = new Path(tablePath, "data");
+    Path dataDir = new Path(table.location(), "data");
     FileSystem fs = FileSystem.get(new Configuration());
     if (!fs.exists(dataDir)) {
       return ImmutableSet.of();
@@ -302,7 +300,7 @@ public class TestIcebergStreamWriter {
     }
 
     // Assert the table records.
-    SimpleDataUtil.assertTableRecords(tablePath, records);
+    SimpleDataUtil.assertTableRecords(table, records);
   }
 
   @Test
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index c56a348e74..e428d5a19a 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -69,7 +69,6 @@ public class TestTaskWriters {
   private final FileFormat format;
   private final boolean partitioned;
 
-  private String path;
   private Table table;
 
   public TestTaskWriters(String format, boolean partitioned) {
@@ -80,11 +79,10 @@ public class TestTaskWriters {
   @Before
   public void before() throws IOException {
     File folder = tempFolder.newFolder();
-    path = folder.getAbsolutePath();
 
     // Construct the iceberg table with the specified file format.
     Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
-    table = SimpleDataUtil.createTable(path, props, partitioned);
+    table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
   }
 
   @Test
@@ -170,7 +168,7 @@ public class TestTaskWriters {
 
       // Assert the data rows.
       SimpleDataUtil.assertTableRecords(
-          path,
+          table,
           Lists.newArrayList(
               SimpleDataUtil.createRecord(1, "a"),
               SimpleDataUtil.createRecord(2, "b"),
@@ -205,7 +203,7 @@ public class TestTaskWriters {
       appendFiles.commit();
 
       // Assert the data rows.
-      SimpleDataUtil.assertTableRecords(path, records);
+      SimpleDataUtil.assertTableRecords(table, records);
     }
   }
 
@@ -226,7 +224,7 @@ public class TestTaskWriters {
       appendFiles.commit();
 
       // Assert the data rows.
-      SimpleDataUtil.assertTableRows(path, Lists.newArrayList(rows));
+      SimpleDataUtil.assertTableRows(table, Lists.newArrayList(rows));
     }
   }