You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/13 16:03:25 UTC

[iceberg] branch master updated: Flink 1.15: Improve unit tests for sink (#4699)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new fbe2a4d47 Flink 1.15: Improve unit tests for sink (#4699)
fbe2a4d47 is described below

commit fbe2a4d47e1c2495650d2d2ae14373f0d2a18d9d
Author: Mingliang Liu <li...@apache.org>
AuthorDate: Fri May 13 09:03:19 2022 -0700

    Flink 1.15: Improve unit tests for sink (#4699)
---
 .../test/java/org/apache/iceberg/flink/SimpleDataUtil.java  |  9 ++++-----
 .../org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java |  7 +++----
 .../org/apache/iceberg/flink/sink/TestFlinkManifest.java    |  9 ++++-----
 .../iceberg/flink/sink/TestIcebergFilesCommitter.java       | 13 +++++--------
 .../apache/iceberg/flink/sink/TestIcebergStreamWriter.java  | 11 ++++-------
 .../java/org/apache/iceberg/flink/sink/TestTaskWriters.java | 11 ++++-------
 6 files changed, 24 insertions(+), 36 deletions(-)

diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 38b1cf360..92e935f25 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -147,11 +147,11 @@ public class SimpleDataUtil {
         .build();
   }
 
-  public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, String tablePath, String filename,
+  public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, String filename,
                                              FileAppenderFactory<RowData> appenderFactory,
                                              List<RowData> deletes) throws IOException {
     EncryptedOutputFile outputFile =
-        table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
+        table.encryption().encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
 
     EqualityDeleteWriter<RowData> eqWriter = appenderFactory.newEqDeleteWriter(outputFile, format, null);
     try (EqualityDeleteWriter<RowData> writer = eqWriter) {
@@ -160,12 +160,11 @@ public class SimpleDataUtil {
     return eqWriter.toDeleteFile();
   }
 
-  public static DeleteFile writePosDeleteFile(Table table, FileFormat format, String tablePath,
-                                              String filename,
+  public static DeleteFile writePosDeleteFile(Table table, FileFormat format, String filename,
                                               FileAppenderFactory<RowData> appenderFactory,
                                               List<Pair<CharSequence, Long>> positions) throws IOException {
     EncryptedOutputFile outputFile =
-        table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
+        table.encryption().encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
 
     PositionDeleteWriter<RowData> posWriter = appenderFactory.newPosDeleteWriter(outputFile, format, null);
     try (PositionDeleteWriter<RowData> writer = posWriter) {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index c99322325..70f87f270 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -70,7 +70,6 @@ public class TestFlinkIcebergSink {
   private static final DataFormatConverters.RowConverter CONVERTER = new DataFormatConverters.RowConverter(
       SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
 
-  private String tablePath;
   private Table table;
   private StreamExecutionEnvironment env;
   private TableLoader tableLoader;
@@ -108,7 +107,7 @@ public class TestFlinkIcebergSink {
     File folder = TEMPORARY_FOLDER.newFolder();
     String warehouse = folder.getAbsolutePath();
 
-    tablePath = warehouse.concat("/test");
+    String tablePath = warehouse.concat("/test");
     Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir());
 
     Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
@@ -150,7 +149,7 @@ public class TestFlinkIcebergSink {
     env.execute("Test Iceberg DataStream");
 
     // Assert the iceberg table's records.
-    SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
   }
 
   private List<Row> createRows(String prefix) {
@@ -182,7 +181,7 @@ public class TestFlinkIcebergSink {
     // Execute the program.
     env.execute("Test Iceberg DataStream.");
 
-    SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
   }
 
   private int partitionFiles(String partition) throws IOException {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 4a47656e8..eccf3da1a 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -59,7 +59,6 @@ public class TestFlinkManifest {
   @Rule
   public TemporaryFolder tempFolder = new TemporaryFolder();
 
-  private String tablePath;
   private Table table;
   private FileAppenderFactory<RowData> appenderFactory;
   private final AtomicInteger fileCount = new AtomicInteger(0);
@@ -69,7 +68,7 @@ public class TestFlinkManifest {
     File folder = tempFolder.newFolder();
     String warehouse = folder.getAbsolutePath();
 
-    tablePath = warehouse.concat("/test");
+    String tablePath = warehouse.concat("/test");
     Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());
 
     // Construct the iceberg table.
@@ -231,17 +230,17 @@ public class TestFlinkManifest {
 
   private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
     return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF,
-        tablePath, FileFormat.PARQUET.addExtension(filename), rows);
+        table.location(), FileFormat.PARQUET.addExtension(filename), rows);
   }
 
   private DeleteFile writeEqDeleteFile(String filename, List<RowData> deletes) throws IOException {
-    return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
+    return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, filename, appenderFactory, deletes);
   }
 
   private DeleteFile writePosDeleteFile(String filename, List<Pair<CharSequence, Long>> positions)
       throws IOException {
     return SimpleDataUtil
-        .writePosDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions);
+        .writePosDeleteFile(table, FileFormat.PARQUET, filename, appenderFactory, positions);
   }
 
   private List<DataFile> generateDataFiles(int fileNum) throws IOException {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 78ffe72c2..d1fb23720 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -74,7 +74,6 @@ import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANI
 public class TestIcebergFilesCommitter extends TableTestBase {
   private static final Configuration CONF = new Configuration();
 
-  private String tablePath;
   private File flinkManifestFolder;
 
   private final FileFormat format;
@@ -105,8 +104,6 @@ public class TestIcebergFilesCommitter extends TableTestBase {
     this.metadataDir = new File(tableDir, "metadata");
     Assert.assertTrue(tableDir.delete());
 
-    tablePath = tableDir.getAbsolutePath();
-
     // Construct the iceberg table.
     table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
 
@@ -743,14 +740,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
   private DeleteFile writeEqDeleteFile(FileAppenderFactory<RowData> appenderFactory,
                                        String filename, List<RowData> deletes) throws IOException {
-    return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
+    return SimpleDataUtil.writeEqDeleteFile(table, format, filename, appenderFactory, deletes);
   }
 
   private DeleteFile writePosDeleteFile(FileAppenderFactory<RowData> appenderFactory,
                                         String filename,
                                         List<Pair<CharSequence, Long>> positions) throws IOException {
-    return SimpleDataUtil.writePosDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory,
-        positions);
+    return SimpleDataUtil.writePosDeleteFile(table, format, filename, appenderFactory, positions);
   }
 
   private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
@@ -778,7 +774,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   }
 
   private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
-    return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
+    return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, table.location(),
+        format.addExtension(filename), rows);
   }
 
   private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
@@ -794,7 +791,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
   private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
       throws Exception {
-    TestOperatorFactory factory = TestOperatorFactory.of(tablePath);
+    TestOperatorFactory factory = TestOperatorFactory.of(table.location());
     return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
   }
 
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index 86c2f6672..433c6a1ec 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -69,7 +69,6 @@ public class TestIcebergStreamWriter {
   @Rule
   public TemporaryFolder tempFolder = new TemporaryFolder();
 
-  private String tablePath;
   private Table table;
 
   private final FileFormat format;
@@ -95,11 +94,9 @@ public class TestIcebergStreamWriter {
   @Before
   public void before() throws IOException {
     File folder = tempFolder.newFolder();
-    tablePath = folder.getAbsolutePath();
-
     // Construct the iceberg table.
     Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
-    table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+    table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
   }
 
   @Test
@@ -135,7 +132,7 @@ public class TestIcebergStreamWriter {
       appendFiles.commit();
 
       // Assert the table records.
-      SimpleDataUtil.assertTableRecords(tablePath, Lists.newArrayList(
+      SimpleDataUtil.assertTableRecords(table, Lists.newArrayList(
           SimpleDataUtil.createRecord(1, "hello"),
           SimpleDataUtil.createRecord(2, "world"),
           SimpleDataUtil.createRecord(3, "hello"),
@@ -188,7 +185,7 @@ public class TestIcebergStreamWriter {
   }
 
   private Set<String> scanDataFiles() throws IOException {
-    Path dataDir = new Path(tablePath, "data");
+    Path dataDir = new Path(table.location(), "data");
     FileSystem fs = FileSystem.get(new Configuration());
     if (!fs.exists(dataDir)) {
       return ImmutableSet.of();
@@ -270,7 +267,7 @@ public class TestIcebergStreamWriter {
     }
 
     // Assert the table records.
-    SimpleDataUtil.assertTableRecords(tablePath, records);
+    SimpleDataUtil.assertTableRecords(table, records);
   }
 
   @Test
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index 2595b098d..ddeedbb09 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -72,7 +72,6 @@ public class TestTaskWriters {
   private final FileFormat format;
   private final boolean partitioned;
 
-  private String path;
   private Table table;
 
   public TestTaskWriters(String format, boolean partitioned) {
@@ -83,11 +82,9 @@ public class TestTaskWriters {
   @Before
   public void before() throws IOException {
     File folder = tempFolder.newFolder();
-    path = folder.getAbsolutePath();
-
     // Construct the iceberg table with the specified file format.
     Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
-    table = SimpleDataUtil.createTable(path, props, partitioned);
+    table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
   }
 
   @Test
@@ -172,7 +169,7 @@ public class TestTaskWriters {
       appendFiles.commit();
 
       // Assert the data rows.
-      SimpleDataUtil.assertTableRecords(path, Lists.newArrayList(
+      SimpleDataUtil.assertTableRecords(table, Lists.newArrayList(
           SimpleDataUtil.createRecord(1, "a"),
           SimpleDataUtil.createRecord(2, "b"),
           SimpleDataUtil.createRecord(3, "c"),
@@ -207,7 +204,7 @@ public class TestTaskWriters {
       appendFiles.commit();
 
       // Assert the data rows.
-      SimpleDataUtil.assertTableRecords(path, records);
+      SimpleDataUtil.assertTableRecords(table, records);
     }
   }
 
@@ -228,7 +225,7 @@ public class TestTaskWriters {
       appendFiles.commit();
 
       // Assert the data rows.
-      SimpleDataUtil.assertTableRows(path, Lists.newArrayList(rows));
+      SimpleDataUtil.assertTableRows(table, Lists.newArrayList(rows));
     }
   }