You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/04/24 18:42:54 UTC

[iceberg] branch master updated: Flink: sync 1.15 with 1.17 for missed backports previously (#7402)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a04faa54f Flink: sync 1.15 with 1.17 for missed backports previously (#7402)
3a04faa54f is described below

commit 3a04faa54fbef29602950877d58009d6d4306bc7
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon Apr 24 11:42:48 2023 -0700

    Flink: sync 1.15 with 1.17 for missed backports previously (#7402)
---
 .../org/apache/iceberg/flink/CatalogLoader.java    | 12 +++++-----
 .../iceberg/flink/FlinkEnvironmentContext.java     |  1 -
 .../java/org/apache/iceberg/flink/TableLoader.java | 12 +++++-----
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  3 +++
 .../flink/source/RowDataFileScanTaskReader.java    |  7 +-----
 .../iceberg/flink/source/RowDataRewriter.java      |  4 +++-
 .../reader/AvroGenericRecordReaderFunction.java    | 25 ++------------------
 .../flink/source/reader/RowDataReaderFunction.java | 27 ++++------------------
 .../apache/iceberg/flink/FlinkCatalogTestBase.java |  2 +-
 .../org/apache/iceberg/flink/FlinkTestBase.java    | 13 +++++++++++
 .../apache/iceberg/flink/TestChangeLogTable.java   |  2 +-
 .../apache/iceberg/flink/TestFlinkHiveCatalog.java |  2 +-
 .../flink/source/TestFlinkMergingMetrics.java      |  5 ++--
 .../iceberg/flink/source/reader/ReaderUtil.java    |  4 +++-
 .../source/reader/TestIcebergSourceReader.java     |  4 +++-
 .../source/reader/TestRowDataReaderFunction.java   |  4 +++-
 16 files changed, 53 insertions(+), 74 deletions(-)

diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
index f4acb030b4..18473bf4f1 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
@@ -81,18 +81,18 @@ public interface CatalogLoader extends Serializable, Cloneable {
       this.properties = Maps.newHashMap(properties);
     }
 
-    @Override
-    @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
-    public CatalogLoader clone() {
-      return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
-    }
-
     @Override
     public Catalog loadCatalog() {
       return CatalogUtil.loadCatalog(
           HadoopCatalog.class.getName(), catalogName, properties, hadoopConf.get());
     }
 
+    @Override
+    @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
+    public CatalogLoader clone() {
+      return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
+    }
+
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(this)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java
index 49602eea45..f35bb577fb 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java
@@ -22,7 +22,6 @@ import org.apache.iceberg.EnvironmentContext;
 import org.apache.iceberg.flink.util.FlinkPackage;
 
 class FlinkEnvironmentContext {
-
   private FlinkEnvironmentContext() {}
 
   public static void init() {
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
index 6646338b62..f18c5ccda1 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
@@ -121,12 +121,6 @@ public interface TableLoader extends Closeable, Serializable, Cloneable {
       return catalog.loadTable(TableIdentifier.parse(identifier));
     }
 
-    @Override
-    @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
-    public TableLoader clone() {
-      return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier));
-    }
-
     @Override
     public void close() throws IOException {
       if (catalog instanceof Closeable) {
@@ -134,6 +128,12 @@ public interface TableLoader extends Closeable, Serializable, Cloneable {
       }
     }
 
+    @Override
+    @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
+    public TableLoader clone() {
+      return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier));
+    }
+
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(this)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 22b4dc9d21..e11975b3ef 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -233,6 +233,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     // the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
     if (checkpointId > maxCommittedCheckpointId) {
+      LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
       commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     } else {
@@ -288,6 +289,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
       }
       continuousEmptyCheckpoints = 0;
+    } else {
+      LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
     }
   }
 
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index c2c587267c..88364f4e87 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.flink.source;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.annotation.Internal;
@@ -61,11 +60,6 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
   private final boolean caseSensitive;
   private final FlinkSourceFilter rowFilter;
 
-  public RowDataFileScanTaskReader(
-      Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
-    this(tableSchema, projectedSchema, nameMapping, caseSensitive, Collections.emptyList());
-  }
-
   public RowDataFileScanTaskReader(
       Schema tableSchema,
       Schema projectedSchema,
@@ -76,6 +70,7 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
     this.projectedSchema = projectedSchema;
     this.nameMapping = nameMapping;
     this.caseSensitive = caseSensitive;
+
     if (filters != null && !filters.isEmpty()) {
       Expression combinedExpression =
           filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 23665b7c9f..c958604c00 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;
 import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -125,7 +126,8 @@ public class RowDataRewriter {
       this.encryptionManager = encryptionManager;
       this.taskWriterFactory = taskWriterFactory;
       this.rowDataReader =
-          new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive);
+          new RowDataFileScanTaskReader(
+              schema, schema, nameMapping, caseSensitive, Collections.emptyList());
     }
 
     @Override
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
index b1ce166748..66e59633ff 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
-import java.util.Collections;
 import java.util.List;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.configuration.Configuration;
@@ -57,28 +56,8 @@ public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<
         null,
         false,
         table.io(),
-        table.encryption());
-  }
-
-  public AvroGenericRecordReaderFunction(
-      String name,
-      Configuration config,
-      Schema schema,
-      Schema projectedSchema,
-      String nameMapping,
-      boolean caseSensitive,
-      FileIO io,
-      EncryptionManager encryption) {
-    this(
-        name,
-        config,
-        schema,
-        projectedSchema,
-        nameMapping,
-        caseSensitive,
-        io,
-        encryption,
-        Collections.emptyList());
+        table.encryption(),
+        null);
   }
 
   public AvroGenericRecordReaderFunction(
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
index dcf84305f8..5d0a00954e 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
-import java.util.Collections;
 import java.util.List;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.data.RowData;
@@ -48,33 +47,15 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
       String nameMapping,
       boolean caseSensitive,
       FileIO io,
-      EncryptionManager encryption) {
-    this(
-        config,
-        tableSchema,
-        projectedSchema,
-        nameMapping,
-        caseSensitive,
-        io,
-        encryption,
-        Collections.emptyList());
-  }
-
-  public RowDataReaderFunction(
-      ReadableConfig config,
-      Schema schema,
-      Schema project,
-      String nameMapping,
-      boolean caseSensitive,
-      FileIO io,
       EncryptionManager encryption,
       List<Expression> filters) {
     super(
         new ArrayPoolDataIteratorBatcher<>(
             config,
-            new RowDataRecordFactory(FlinkSchemaUtil.convert(readSchema(schema, project)))));
-    this.tableSchema = schema;
-    this.readSchema = readSchema(schema, project);
+            new RowDataRecordFactory(
+                FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)))));
+    this.tableSchema = tableSchema;
+    this.readSchema = readSchema(tableSchema, projectedSchema);
     this.nameMapping = nameMapping;
     this.caseSensitive = caseSensitive;
     this.io = io;
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index d4da736dcd..74c5d343e9 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -65,7 +65,7 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
 
   @After
   public void clean() {
-    sql("DROP CATALOG IF EXISTS %s", catalogName);
+    dropCatalog(catalogName, true);
   }
 
   @Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}")
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 95471ac882..8076e0ec76 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -113,4 +113,17 @@ public abstract class FlinkTestBase extends TestBaseUtils {
         .as(message)
         .containsExactlyInAnyOrderElementsOf(expected);
   }
+
+  /**
+   * We can not drop currently used catalog after FLINK-29677, so we have make sure that we do not
+   * use the current catalog before dropping it. This method switches to the 'default_catalog' and
+   * drops the one requested.
+   *
+   * @param catalogName The catalog to drop
+   * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
+   */
+  protected void dropCatalog(String catalogName, boolean ifExists) {
+    sql("USE CATALOG default_catalog");
+    sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
+  }
 }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index 975d77cb35..8e9066e391 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -99,7 +99,7 @@ public class TestChangeLogTable extends ChangeLogTableTestBase {
   public void clean() {
     sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
     sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
-    sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME);
+    dropCatalog(CATALOG_NAME, true);
     BoundedTableFactory.clearDataSets();
   }
 
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
index 6474635663..8f238587d3 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
@@ -100,6 +100,6 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
 
     sql("DROP TABLE test_table");
     sql("DROP DATABASE test_db");
-    sql("DROP CATALOG test_catalog");
+    dropCatalog("test_catalog", false);
   }
 }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index 30ed8a9742..1d52acb2fe 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -37,11 +37,12 @@ import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 public class TestFlinkMergingMetrics extends TestMergingMetrics<RowData> {
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
   @Rule
   public final HadoopTableResource tableResource =
-      new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA);
+      new HadoopTableResource(TEMP_FOLDER, "test_db", "test_table", SCHEMA);
 
   public TestFlinkMergingMetrics(FileFormat fileFormat) {
     super(fileFormat);
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f2e89428a9..f9ceaf8422 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.source.reader;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.BaseCombinedScanTask;
@@ -83,7 +84,8 @@ public class ReaderUtil {
 
   public static DataIterator<RowData> createDataIterator(CombinedScanTask combinedTask) {
     return new DataIterator<>(
-        new RowDataFileScanTaskReader(TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true),
+        new RowDataFileScanTaskReader(
+            TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, Collections.emptyList()),
         combinedTask,
         new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
         new PlaintextEncryptionManager());
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index ddc144be88..56af0caf12 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.source.reader;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
@@ -106,7 +107,8 @@ public class TestIcebergSourceReader {
             null,
             true,
             new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
-            new PlaintextEncryptionManager());
+            new PlaintextEncryptionManager(),
+            Collections.emptyList());
     return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
   }
 }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
index aee271a3a7..d063ad7f4a 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.flink.configuration.Configuration;
@@ -55,7 +56,8 @@ public class TestRowDataReaderFunction extends ReaderFunctionTestBase<RowData> {
         null,
         true,
         new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
-        new PlaintextEncryptionManager());
+        new PlaintextEncryptionManager(),
+        Collections.emptyList());
   }
 
   @Override