You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/04/24 18:42:54 UTC
[iceberg] branch master updated: Flink: sync 1.15 with 1.17 for missed backports previously (#7402)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3a04faa54f Flink: sync 1.15 with 1.17 for missed backports previously (#7402)
3a04faa54f is described below
commit 3a04faa54fbef29602950877d58009d6d4306bc7
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon Apr 24 11:42:48 2023 -0700
Flink: sync 1.15 with 1.17 for missed backports previously (#7402)
---
.../org/apache/iceberg/flink/CatalogLoader.java | 12 +++++-----
.../iceberg/flink/FlinkEnvironmentContext.java | 1 -
.../java/org/apache/iceberg/flink/TableLoader.java | 12 +++++-----
.../iceberg/flink/sink/IcebergFilesCommitter.java | 3 +++
.../flink/source/RowDataFileScanTaskReader.java | 7 +-----
.../iceberg/flink/source/RowDataRewriter.java | 4 +++-
.../reader/AvroGenericRecordReaderFunction.java | 25 ++------------------
.../flink/source/reader/RowDataReaderFunction.java | 27 ++++------------------
.../apache/iceberg/flink/FlinkCatalogTestBase.java | 2 +-
.../org/apache/iceberg/flink/FlinkTestBase.java | 13 +++++++++++
.../apache/iceberg/flink/TestChangeLogTable.java | 2 +-
.../apache/iceberg/flink/TestFlinkHiveCatalog.java | 2 +-
.../flink/source/TestFlinkMergingMetrics.java | 5 ++--
.../iceberg/flink/source/reader/ReaderUtil.java | 4 +++-
.../source/reader/TestIcebergSourceReader.java | 4 +++-
.../source/reader/TestRowDataReaderFunction.java | 4 +++-
16 files changed, 53 insertions(+), 74 deletions(-)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
index f4acb030b4..18473bf4f1 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
@@ -81,18 +81,18 @@ public interface CatalogLoader extends Serializable, Cloneable {
this.properties = Maps.newHashMap(properties);
}
- @Override
- @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
- public CatalogLoader clone() {
- return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
- }
-
@Override
public Catalog loadCatalog() {
return CatalogUtil.loadCatalog(
HadoopCatalog.class.getName(), catalogName, properties, hadoopConf.get());
}
+ @Override
+ @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
+ public CatalogLoader clone() {
+ return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java
index 49602eea45..f35bb577fb 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java
@@ -22,7 +22,6 @@ import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.flink.util.FlinkPackage;
class FlinkEnvironmentContext {
-
private FlinkEnvironmentContext() {}
public static void init() {
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
index 6646338b62..f18c5ccda1 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
@@ -121,12 +121,6 @@ public interface TableLoader extends Closeable, Serializable, Cloneable {
return catalog.loadTable(TableIdentifier.parse(identifier));
}
- @Override
- @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
- public TableLoader clone() {
- return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier));
- }
-
@Override
public void close() throws IOException {
if (catalog instanceof Closeable) {
@@ -134,6 +128,12 @@ public interface TableLoader extends Closeable, Serializable, Cloneable {
}
}
+ @Override
+ @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
+ public TableLoader clone() {
+ return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier));
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 22b4dc9d21..e11975b3ef 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -233,6 +233,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
+ LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
@@ -288,6 +289,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;
+ } else {
+ LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
}
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index c2c587267c..88364f4e87 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
@@ -61,11 +60,6 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
private final boolean caseSensitive;
private final FlinkSourceFilter rowFilter;
- public RowDataFileScanTaskReader(
- Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
- this(tableSchema, projectedSchema, nameMapping, caseSensitive, Collections.emptyList());
- }
-
public RowDataFileScanTaskReader(
Schema tableSchema,
Schema projectedSchema,
@@ -76,6 +70,7 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
+
if (filters != null && !filters.isEmpty()) {
Expression combinedExpression =
filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 23665b7c9f..c958604c00 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;
import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -125,7 +126,8 @@ public class RowDataRewriter {
this.encryptionManager = encryptionManager;
this.taskWriterFactory = taskWriterFactory;
this.rowDataReader =
- new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive);
+ new RowDataFileScanTaskReader(
+ schema, schema, nameMapping, caseSensitive, Collections.emptyList());
}
@Override
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
index b1ce166748..66e59633ff 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source.reader;
-import java.util.Collections;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
@@ -57,28 +56,8 @@ public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<
null,
false,
table.io(),
- table.encryption());
- }
-
- public AvroGenericRecordReaderFunction(
- String name,
- Configuration config,
- Schema schema,
- Schema projectedSchema,
- String nameMapping,
- boolean caseSensitive,
- FileIO io,
- EncryptionManager encryption) {
- this(
- name,
- config,
- schema,
- projectedSchema,
- nameMapping,
- caseSensitive,
- io,
- encryption,
- Collections.emptyList());
+ table.encryption(),
+ null);
}
public AvroGenericRecordReaderFunction(
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
index dcf84305f8..5d0a00954e 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source.reader;
-import java.util.Collections;
import java.util.List;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
@@ -48,33 +47,15 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
String nameMapping,
boolean caseSensitive,
FileIO io,
- EncryptionManager encryption) {
- this(
- config,
- tableSchema,
- projectedSchema,
- nameMapping,
- caseSensitive,
- io,
- encryption,
- Collections.emptyList());
- }
-
- public RowDataReaderFunction(
- ReadableConfig config,
- Schema schema,
- Schema project,
- String nameMapping,
- boolean caseSensitive,
- FileIO io,
EncryptionManager encryption,
List<Expression> filters) {
super(
new ArrayPoolDataIteratorBatcher<>(
config,
- new RowDataRecordFactory(FlinkSchemaUtil.convert(readSchema(schema, project)))));
- this.tableSchema = schema;
- this.readSchema = readSchema(schema, project);
+ new RowDataRecordFactory(
+ FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)))));
+ this.tableSchema = tableSchema;
+ this.readSchema = readSchema(tableSchema, projectedSchema);
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.io = io;
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index d4da736dcd..74c5d343e9 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -65,7 +65,7 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
@After
public void clean() {
- sql("DROP CATALOG IF EXISTS %s", catalogName);
+ dropCatalog(catalogName, true);
}
@Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}")
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 95471ac882..8076e0ec76 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -113,4 +113,17 @@ public abstract class FlinkTestBase extends TestBaseUtils {
.as(message)
.containsExactlyInAnyOrderElementsOf(expected);
}
+
+ /**
+ * We can not drop currently used catalog after FLINK-29677, so we have make sure that we do not
+ * use the current catalog before dropping it. This method switches to the 'default_catalog' and
+ * drops the one requested.
+ *
+ * @param catalogName The catalog to drop
+ * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
+ */
+ protected void dropCatalog(String catalogName, boolean ifExists) {
+ sql("USE CATALOG default_catalog");
+ sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
+ }
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index 975d77cb35..8e9066e391 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -99,7 +99,7 @@ public class TestChangeLogTable extends ChangeLogTableTestBase {
public void clean() {
sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
- sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME);
+ dropCatalog(CATALOG_NAME, true);
BoundedTableFactory.clearDataSets();
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
index 6474635663..8f238587d3 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
@@ -100,6 +100,6 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
sql("DROP TABLE test_table");
sql("DROP DATABASE test_db");
- sql("DROP CATALOG test_catalog");
+ dropCatalog("test_catalog", false);
}
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index 30ed8a9742..1d52acb2fe 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -37,11 +37,12 @@ import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
public class TestFlinkMergingMetrics extends TestMergingMetrics<RowData> {
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
@Rule
public final HadoopTableResource tableResource =
- new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA);
+ new HadoopTableResource(TEMP_FOLDER, "test_db", "test_table", SCHEMA);
public TestFlinkMergingMetrics(FileFormat fileFormat) {
super(fileFormat);
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f2e89428a9..f9ceaf8422 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.source.reader;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.BaseCombinedScanTask;
@@ -83,7 +84,8 @@ public class ReaderUtil {
public static DataIterator<RowData> createDataIterator(CombinedScanTask combinedTask) {
return new DataIterator<>(
- new RowDataFileScanTaskReader(TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true),
+ new RowDataFileScanTaskReader(
+ TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, Collections.emptyList()),
combinedTask,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager());
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index ddc144be88..56af0caf12 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source.reader;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
@@ -106,7 +107,8 @@ public class TestIcebergSourceReader {
null,
true,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
- new PlaintextEncryptionManager());
+ new PlaintextEncryptionManager(),
+ Collections.emptyList());
return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
}
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
index aee271a3a7..d063ad7f4a 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.source.reader;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
@@ -55,7 +56,8 @@ public class TestRowDataReaderFunction extends ReaderFunctionTestBase<RowData> {
null,
true,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
- new PlaintextEncryptionManager());
+ new PlaintextEncryptionManager(),
+ Collections.emptyList());
}
@Override