You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/13 16:03:25 UTC
[iceberg] branch master updated: Flink 1.15: Improve unit tests for sink (#4699)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new fbe2a4d47 Flink 1.15: Improve unit tests for sink (#4699)
fbe2a4d47 is described below
commit fbe2a4d47e1c2495650d2d2ae14373f0d2a18d9d
Author: Mingliang Liu <li...@apache.org>
AuthorDate: Fri May 13 09:03:19 2022 -0700
Flink 1.15: Improve unit tests for sink (#4699)
---
.../test/java/org/apache/iceberg/flink/SimpleDataUtil.java | 9 ++++-----
.../org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java | 7 +++----
.../org/apache/iceberg/flink/sink/TestFlinkManifest.java | 9 ++++-----
.../iceberg/flink/sink/TestIcebergFilesCommitter.java | 13 +++++--------
.../apache/iceberg/flink/sink/TestIcebergStreamWriter.java | 11 ++++-------
.../java/org/apache/iceberg/flink/sink/TestTaskWriters.java | 11 ++++-------
6 files changed, 24 insertions(+), 36 deletions(-)
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 38b1cf360..92e935f25 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -147,11 +147,11 @@ public class SimpleDataUtil {
.build();
}
- public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, String tablePath, String filename,
+ public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, String filename,
FileAppenderFactory<RowData> appenderFactory,
List<RowData> deletes) throws IOException {
EncryptedOutputFile outputFile =
- table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
+ table.encryption().encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
EqualityDeleteWriter<RowData> eqWriter = appenderFactory.newEqDeleteWriter(outputFile, format, null);
try (EqualityDeleteWriter<RowData> writer = eqWriter) {
@@ -160,12 +160,11 @@ public class SimpleDataUtil {
return eqWriter.toDeleteFile();
}
- public static DeleteFile writePosDeleteFile(Table table, FileFormat format, String tablePath,
- String filename,
+ public static DeleteFile writePosDeleteFile(Table table, FileFormat format, String filename,
FileAppenderFactory<RowData> appenderFactory,
List<Pair<CharSequence, Long>> positions) throws IOException {
EncryptedOutputFile outputFile =
- table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
+ table.encryption().encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
PositionDeleteWriter<RowData> posWriter = appenderFactory.newPosDeleteWriter(outputFile, format, null);
try (PositionDeleteWriter<RowData> writer = posWriter) {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index c99322325..70f87f270 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -70,7 +70,6 @@ public class TestFlinkIcebergSink {
private static final DataFormatConverters.RowConverter CONVERTER = new DataFormatConverters.RowConverter(
SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
- private String tablePath;
private Table table;
private StreamExecutionEnvironment env;
private TableLoader tableLoader;
@@ -108,7 +107,7 @@ public class TestFlinkIcebergSink {
File folder = TEMPORARY_FOLDER.newFolder();
String warehouse = folder.getAbsolutePath();
- tablePath = warehouse.concat("/test");
+ String tablePath = warehouse.concat("/test");
Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir());
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
@@ -150,7 +149,7 @@ public class TestFlinkIcebergSink {
env.execute("Test Iceberg DataStream");
// Assert the iceberg table's records.
- SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
+ SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
}
private List<Row> createRows(String prefix) {
@@ -182,7 +181,7 @@ public class TestFlinkIcebergSink {
// Execute the program.
env.execute("Test Iceberg DataStream.");
- SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
+ SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
}
private int partitionFiles(String partition) throws IOException {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 4a47656e8..eccf3da1a 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -59,7 +59,6 @@ public class TestFlinkManifest {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
- private String tablePath;
private Table table;
private FileAppenderFactory<RowData> appenderFactory;
private final AtomicInteger fileCount = new AtomicInteger(0);
@@ -69,7 +68,7 @@ public class TestFlinkManifest {
File folder = tempFolder.newFolder();
String warehouse = folder.getAbsolutePath();
- tablePath = warehouse.concat("/test");
+ String tablePath = warehouse.concat("/test");
Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());
// Construct the iceberg table.
@@ -231,17 +230,17 @@ public class TestFlinkManifest {
private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF,
- tablePath, FileFormat.PARQUET.addExtension(filename), rows);
+ table.location(), FileFormat.PARQUET.addExtension(filename), rows);
}
private DeleteFile writeEqDeleteFile(String filename, List<RowData> deletes) throws IOException {
- return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
+ return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, filename, appenderFactory, deletes);
}
private DeleteFile writePosDeleteFile(String filename, List<Pair<CharSequence, Long>> positions)
throws IOException {
return SimpleDataUtil
- .writePosDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions);
+ .writePosDeleteFile(table, FileFormat.PARQUET, filename, appenderFactory, positions);
}
private List<DataFile> generateDataFiles(int fileNum) throws IOException {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 78ffe72c2..d1fb23720 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -74,7 +74,6 @@ import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANI
public class TestIcebergFilesCommitter extends TableTestBase {
private static final Configuration CONF = new Configuration();
- private String tablePath;
private File flinkManifestFolder;
private final FileFormat format;
@@ -105,8 +104,6 @@ public class TestIcebergFilesCommitter extends TableTestBase {
this.metadataDir = new File(tableDir, "metadata");
Assert.assertTrue(tableDir.delete());
- tablePath = tableDir.getAbsolutePath();
-
// Construct the iceberg table.
table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
@@ -743,14 +740,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private DeleteFile writeEqDeleteFile(FileAppenderFactory<RowData> appenderFactory,
String filename, List<RowData> deletes) throws IOException {
- return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
+ return SimpleDataUtil.writeEqDeleteFile(table, format, filename, appenderFactory, deletes);
}
private DeleteFile writePosDeleteFile(FileAppenderFactory<RowData> appenderFactory,
String filename,
List<Pair<CharSequence, Long>> positions) throws IOException {
- return SimpleDataUtil.writePosDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory,
- positions);
+ return SimpleDataUtil.writePosDeleteFile(table, format, filename, appenderFactory, positions);
}
private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
@@ -778,7 +774,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
}
private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
- return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
+ return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, table.location(),
+ format.addExtension(filename), rows);
}
private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
@@ -794,7 +791,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
throws Exception {
- TestOperatorFactory factory = TestOperatorFactory.of(tablePath);
+ TestOperatorFactory factory = TestOperatorFactory.of(table.location());
return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index 86c2f6672..433c6a1ec 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -69,7 +69,6 @@ public class TestIcebergStreamWriter {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
- private String tablePath;
private Table table;
private final FileFormat format;
@@ -95,11 +94,9 @@ public class TestIcebergStreamWriter {
@Before
public void before() throws IOException {
File folder = tempFolder.newFolder();
- tablePath = folder.getAbsolutePath();
-
// Construct the iceberg table.
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
- table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+ table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
}
@Test
@@ -135,7 +132,7 @@ public class TestIcebergStreamWriter {
appendFiles.commit();
// Assert the table records.
- SimpleDataUtil.assertTableRecords(tablePath, Lists.newArrayList(
+ SimpleDataUtil.assertTableRecords(table, Lists.newArrayList(
SimpleDataUtil.createRecord(1, "hello"),
SimpleDataUtil.createRecord(2, "world"),
SimpleDataUtil.createRecord(3, "hello"),
@@ -188,7 +185,7 @@ public class TestIcebergStreamWriter {
}
private Set<String> scanDataFiles() throws IOException {
- Path dataDir = new Path(tablePath, "data");
+ Path dataDir = new Path(table.location(), "data");
FileSystem fs = FileSystem.get(new Configuration());
if (!fs.exists(dataDir)) {
return ImmutableSet.of();
@@ -270,7 +267,7 @@ public class TestIcebergStreamWriter {
}
// Assert the table records.
- SimpleDataUtil.assertTableRecords(tablePath, records);
+ SimpleDataUtil.assertTableRecords(table, records);
}
@Test
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index 2595b098d..ddeedbb09 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -72,7 +72,6 @@ public class TestTaskWriters {
private final FileFormat format;
private final boolean partitioned;
- private String path;
private Table table;
public TestTaskWriters(String format, boolean partitioned) {
@@ -83,11 +82,9 @@ public class TestTaskWriters {
@Before
public void before() throws IOException {
File folder = tempFolder.newFolder();
- path = folder.getAbsolutePath();
-
// Construct the iceberg table with the specified file format.
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
- table = SimpleDataUtil.createTable(path, props, partitioned);
+ table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
}
@Test
@@ -172,7 +169,7 @@ public class TestTaskWriters {
appendFiles.commit();
// Assert the data rows.
- SimpleDataUtil.assertTableRecords(path, Lists.newArrayList(
+ SimpleDataUtil.assertTableRecords(table, Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "b"),
SimpleDataUtil.createRecord(3, "c"),
@@ -207,7 +204,7 @@ public class TestTaskWriters {
appendFiles.commit();
// Assert the data rows.
- SimpleDataUtil.assertTableRecords(path, records);
+ SimpleDataUtil.assertTableRecords(table, records);
}
}
@@ -228,7 +225,7 @@ public class TestTaskWriters {
appendFiles.commit();
// Assert the data rows.
- SimpleDataUtil.assertTableRows(path, Lists.newArrayList(rows));
+ SimpleDataUtil.assertTableRows(table, Lists.newArrayList(rows));
}
}