You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2023/01/16 19:06:20 UTC
[iceberg] branch master updated: Flink: Backport: Improve unit tests for sink (#6603)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 37142709ed Flink: Backport: Improve unit tests for sink (#6603)
37142709ed is described below
commit 37142709edd2b1b5da5a0db2036eacd9f608c8b5
Author: pvary <pe...@gmail.com>
AuthorDate: Mon Jan 16 20:06:12 2023 +0100
Flink: Backport: Improve unit tests for sink (#6603)
---
.../java/org/apache/iceberg/flink/SimpleDataUtil.java | 10 ++++++----
.../apache/iceberg/flink/sink/TestFlinkManifest.java | 9 ++++-----
.../iceberg/flink/sink/TestIcebergFilesCommitter.java | 19 ++++++++++---------
.../iceberg/flink/sink/TestIcebergStreamWriter.java | 10 ++++------
.../apache/iceberg/flink/sink/TestTaskWriters.java | 10 ++++------
5 files changed, 28 insertions(+), 30 deletions(-)
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 970feea2ae..e296763508 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -156,13 +156,14 @@ public class SimpleDataUtil {
public static DeleteFile writeEqDeleteFile(
Table table,
FileFormat format,
- String tablePath,
String filename,
FileAppenderFactory<RowData> appenderFactory,
List<RowData> deletes)
throws IOException {
EncryptedOutputFile outputFile =
- table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
+ table
+ .encryption()
+ .encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
EqualityDeleteWriter<RowData> eqWriter =
appenderFactory.newEqDeleteWriter(outputFile, format, null);
@@ -175,13 +176,14 @@ public class SimpleDataUtil {
public static DeleteFile writePosDeleteFile(
Table table,
FileFormat format,
- String tablePath,
String filename,
FileAppenderFactory<RowData> appenderFactory,
List<Pair<CharSequence, Long>> positions)
throws IOException {
EncryptedOutputFile outputFile =
- table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration()));
+ table
+ .encryption()
+ .encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
PositionDeleteWriter<RowData> posWriter =
appenderFactory.newPosDeleteWriter(outputFile, format, null);
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 5528e71b3d..3680142010 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -57,7 +57,6 @@ public class TestFlinkManifest {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
- private String tablePath;
private Table table;
private FileAppenderFactory<RowData> appenderFactory;
private final AtomicInteger fileCount = new AtomicInteger(0);
@@ -67,7 +66,7 @@ public class TestFlinkManifest {
File folder = tempFolder.newFolder();
String warehouse = folder.getAbsolutePath();
- tablePath = warehouse.concat("/test");
+ String tablePath = warehouse.concat("/test");
Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir());
// Construct the iceberg table.
@@ -260,20 +259,20 @@ public class TestFlinkManifest {
table.schema(),
table.spec(),
CONF,
- tablePath,
+ table.location(),
FileFormat.PARQUET.addExtension(filename),
rows);
}
private DeleteFile writeEqDeleteFile(String filename, List<RowData> deletes) throws IOException {
return SimpleDataUtil.writeEqDeleteFile(
- table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
+ table, FileFormat.PARQUET, filename, appenderFactory, deletes);
}
private DeleteFile writePosDeleteFile(String filename, List<Pair<CharSequence, Long>> positions)
throws IOException {
return SimpleDataUtil.writePosDeleteFile(
- table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions);
+ table, FileFormat.PARQUET, filename, appenderFactory, positions);
}
private List<DataFile> generateDataFiles(int fileNum) throws IOException {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index c4f93f0ec2..66baaeb0e9 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -73,7 +73,6 @@ import org.junit.runners.Parameterized;
public class TestIcebergFilesCommitter extends TableTestBase {
private static final Configuration CONF = new Configuration();
- private String tablePath;
private File flinkManifestFolder;
private final FileFormat format;
@@ -104,8 +103,6 @@ public class TestIcebergFilesCommitter extends TableTestBase {
this.metadataDir = new File(tableDir, "metadata");
Assert.assertTrue(tableDir.delete());
- tablePath = tableDir.getAbsolutePath();
-
// Construct the iceberg table.
table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
@@ -881,8 +878,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private DeleteFile writeEqDeleteFile(
FileAppenderFactory<RowData> appenderFactory, String filename, List<RowData> deletes)
throws IOException {
- return SimpleDataUtil.writeEqDeleteFile(
- table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes);
+ return SimpleDataUtil.writeEqDeleteFile(table, format, filename, appenderFactory, deletes);
}
private DeleteFile writePosDeleteFile(
@@ -890,8 +886,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
String filename,
List<Pair<CharSequence, Long>> positions)
throws IOException {
- return SimpleDataUtil.writePosDeleteFile(
- table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions);
+ return SimpleDataUtil.writePosDeleteFile(table, format, filename, appenderFactory, positions);
}
private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
@@ -943,7 +938,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
return SimpleDataUtil.writeFile(
- table, table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
+ table,
+ table.schema(),
+ table.spec(),
+ CONF,
+ table.location(),
+ format.addExtension(filename),
+ rows);
}
private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long expectedId) {
@@ -961,7 +962,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
throws Exception {
- TestOperatorFactory factory = TestOperatorFactory.of(tablePath);
+ TestOperatorFactory factory = TestOperatorFactory.of(table.location());
return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index bd959bfb31..06942b8ebe 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -68,7 +68,6 @@ import org.junit.runners.Parameterized;
public class TestIcebergStreamWriter {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
- private String tablePath;
private Table table;
private final FileFormat format;
@@ -94,11 +93,10 @@ public class TestIcebergStreamWriter {
@Before
public void before() throws IOException {
File folder = tempFolder.newFolder();
- tablePath = folder.getAbsolutePath();
// Construct the iceberg table.
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
- table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+ table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
}
@Test
@@ -136,7 +134,7 @@ public class TestIcebergStreamWriter {
// Assert the table records.
SimpleDataUtil.assertTableRecords(
- tablePath,
+ table,
Lists.newArrayList(
SimpleDataUtil.createRecord(1, "hello"),
SimpleDataUtil.createRecord(2, "world"),
@@ -192,7 +190,7 @@ public class TestIcebergStreamWriter {
}
private Set<String> scanDataFiles() throws IOException {
- Path dataDir = new Path(tablePath, "data");
+ Path dataDir = new Path(table.location(), "data");
FileSystem fs = FileSystem.get(new Configuration());
if (!fs.exists(dataDir)) {
return ImmutableSet.of();
@@ -302,7 +300,7 @@ public class TestIcebergStreamWriter {
}
// Assert the table records.
- SimpleDataUtil.assertTableRecords(tablePath, records);
+ SimpleDataUtil.assertTableRecords(table, records);
}
@Test
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index c56a348e74..e428d5a19a 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -69,7 +69,6 @@ public class TestTaskWriters {
private final FileFormat format;
private final boolean partitioned;
- private String path;
private Table table;
public TestTaskWriters(String format, boolean partitioned) {
@@ -80,11 +79,10 @@ public class TestTaskWriters {
@Before
public void before() throws IOException {
File folder = tempFolder.newFolder();
- path = folder.getAbsolutePath();
// Construct the iceberg table with the specified file format.
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
- table = SimpleDataUtil.createTable(path, props, partitioned);
+ table = SimpleDataUtil.createTable(folder.getAbsolutePath(), props, partitioned);
}
@Test
@@ -170,7 +168,7 @@ public class TestTaskWriters {
// Assert the data rows.
SimpleDataUtil.assertTableRecords(
- path,
+ table,
Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "b"),
@@ -205,7 +203,7 @@ public class TestTaskWriters {
appendFiles.commit();
// Assert the data rows.
- SimpleDataUtil.assertTableRecords(path, records);
+ SimpleDataUtil.assertTableRecords(table, records);
}
}
@@ -226,7 +224,7 @@ public class TestTaskWriters {
appendFiles.commit();
// Assert the data rows.
- SimpleDataUtil.assertTableRows(path, Lists.newArrayList(rows));
+ SimpleDataUtil.assertTableRows(table, Lists.newArrayList(rows));
}
}