You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/02/02 07:57:39 UTC

[iceberg] branch master updated: Flink: Upgrade version from 1.11.0 to 1.12.1 (#1956)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c5e6791  Flink: Upgrade version from 1.11.0 to 1.12.1 (#1956)
c5e6791 is described below

commit c5e67915b01c2cbf2eea40a412c2765f0c8278e8
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon Feb 1 23:57:24 2021 -0800

    Flink: Upgrade version from 1.11.0 to 1.12.1 (#1956)
---
 .../org/apache/iceberg/flink/FlinkCatalog.java     |  3 +-
 .../org/apache/iceberg/flink/actions/Actions.java  |  8 ++-
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  4 +-
 .../apache/iceberg/flink/source/FlinkSource.java   |  4 +-
 .../FlinkCompatibilityUtil.java}                   | 40 ++++++++-------
 .../apache/iceberg/flink/FlinkCatalogTestBase.java |  8 +++
 .../org/apache/iceberg/flink/FlinkTestBase.java    | 31 ++++++------
 .../apache/iceberg/flink/MiniClusterResource.java  | 57 ++++++++++++++++++++++
 .../iceberg/flink/TestFlinkCatalogTable.java       | 52 +++++++++++---------
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 14 +++++-
 .../apache/iceberg/flink/TestFlinkTableSource.java |  8 +--
 .../java/org/apache/iceberg/flink/TestHelpers.java |  6 +--
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   | 26 ++++++----
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 14 +++++-
 .../apache/iceberg/flink/source/TestFlinkScan.java | 14 +++++-
 .../iceberg/flink/source/TestFlinkScanSql.java     | 43 +++++++++++-----
 .../iceberg/flink/source/TestStreamScanSql.java    |  4 +-
 versions.props                                     |  2 +-
 18 files changed, 235 insertions(+), 103 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4917e9a..de2fbe0 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -69,6 +69,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -456,7 +457,7 @@ public class FlinkCatalog extends AbstractCatalog {
 
     TableSchema schema = table.getSchema();
     schema.getTableColumns().forEach(column -> {
-      if (column.isGenerated()) {
+      if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
         throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
       }
     });
diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
index 3fc2bb7..98702ce 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
@@ -19,11 +19,17 @@
 
 package org.apache.iceberg.flink.actions;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.iceberg.Table;
 
 public class Actions {
 
+  public static final Configuration CONFIG = new Configuration()
+      // disable classloader check as Avro may cache class/object in the serializers.
+      .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
   private StreamExecutionEnvironment env;
   private Table table;
 
@@ -37,7 +43,7 @@ public class Actions {
   }
 
   public static Actions forTable(Table table) {
-    return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), table);
+    return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);
   }
 
   public RewriteDataFilesAction rewriteDataFiles() {
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index c691e58..f679e23 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
@@ -45,6 +44,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -101,7 +101,7 @@ public class FlinkSink {
     DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
 
     DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
-    return builderFor(input, rowConverter::toInternal, RowDataTypeInfo.of(rowType))
+    return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType))
         .tableSchema(tableSchema);
   }
 
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index c10bdf3..84507c4 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
@@ -40,6 +39,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.FlinkTableOptions;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
@@ -204,7 +204,7 @@ public class FlinkSource {
       FlinkInputFormat format = buildFormat();
 
       ScanContext context = contextBuilder.build();
-      TypeInformation<RowData> typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project()));
+      TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
 
       if (!context.isStreaming()) {
         int parallelism = inferParallelism(format, context);
diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java b/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
similarity index 51%
copy from flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
copy to flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
index 3fc2bb7..274d2a8 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
@@ -17,31 +17,29 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink.actions;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.iceberg.Table;
-
-public class Actions {
-
-  private StreamExecutionEnvironment env;
-  private Table table;
-
-  private Actions(StreamExecutionEnvironment env, Table table) {
-    this.env = env;
-    this.table = table;
-  }
+package org.apache.iceberg.flink.util;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * This is a small util class that try to hide calls to Flink
+ * Internal or PublicEvolve interfaces as Flink can change
+ * those APIs during minor version release.
+ */
+public class FlinkCompatibilityUtil {
 
-  public static Actions forTable(StreamExecutionEnvironment env, Table table) {
-    return new Actions(env, table);
+  private FlinkCompatibilityUtil() {
   }
 
-  public static Actions forTable(Table table) {
-    return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), table);
+  public static TypeInformation<RowData> toTypeInfo(RowType rowType) {
+    return InternalTypeInfo.of(rowType);
   }
 
-  public RewriteDataFilesAction rewriteDataFiles() {
-    return new RewriteDataFilesAction(env, table);
+  public static boolean isPhysicalColumn(TableColumn column) {
+    return column.isPhysical();
   }
-
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index 0c588ad..3c5f25e 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.flink;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import org.apache.flink.util.ArrayUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -28,6 +29,7 @@ import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.After;
@@ -119,6 +121,12 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
     }
   }
 
+  protected String getFullQualifiedTableName(String tableName) {
+    final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
+    levels.add(tableName);
+    return Joiner.on('.').join(levels);
+  }
+
   static String getURI(HiveConf conf) {
     return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
   }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index dabdabe..a39c771 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -20,12 +20,12 @@
 package org.apache.iceberg.flink;
 
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.IntStream;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -34,8 +34,17 @@ import org.apache.iceberg.hive.TestHiveMetastore;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
 
-public abstract class FlinkTestBase extends AbstractTestBase {
+public abstract class FlinkTestBase extends TestBaseUtils {
+
+  @ClassRule
+  public static MiniClusterWithClientResource miniClusterResource =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
   private static TestHiveMetastore metastore = null;
   protected static HiveConf hiveConf = null;
@@ -87,25 +96,15 @@ public abstract class FlinkTestBase extends AbstractTestBase {
 
   protected List<Object[]> sql(String query, Object... args) {
     TableResult tableResult = exec(query, args);
-
-    tableResult.getJobClient().ifPresent(c -> {
-      try {
-        c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
-      } catch (InterruptedException | ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    });
-
-    List<Object[]> results = Lists.newArrayList();
     try (CloseableIterator<Row> iter = tableResult.collect()) {
+      List<Object[]> results = Lists.newArrayList();
       while (iter.hasNext()) {
         Row row = iter.next();
         results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
       }
+      return results;
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      throw new RuntimeException("Failed to collect table result", e);
     }
-
-    return results;
   }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java b/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
new file mode 100644
index 0000000..9dfa1ac
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+public class MiniClusterResource {
+
+  private static final int DEFAULT_TM_NUM = 1;
+  private static final int DEFAULT_PARALLELISM = 4;
+
+  public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG = new Configuration()
+      // disable classloader check as Avro may cache class/object in the serializers.
+      .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+  private MiniClusterResource() {
+
+  }
+
+  /**
+   * It will start a mini cluster with classloader.check-leaked-classloader=false,
+   * so that we won't break the unit tests because of the class loader leak issue.
+   * In our iceberg integration tests, there're some that will assert the results
+   * after finished the flink jobs, so actually we may access the class loader
+   * that has been closed by the flink task managers if we enable the switch
+   * classloader.check-leaked-classloader by default.
+   */
+  public static MiniClusterWithClientResource createWithClassloaderCheckDisabled() {
+    return new MiniClusterWithClientResource(
+        new MiniClusterResourceConfiguration.Builder()
+            .setNumberTaskManagers(DEFAULT_TM_NUM)
+            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+            .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .build());
+  }
+
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index bf1b091..62efd16 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -23,10 +23,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -45,6 +45,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
@@ -52,7 +53,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
@@ -79,26 +79,22 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
 
   @Test
   public void testGetTable() {
-    validationCatalog.createTable(
-        TableIdentifier.of(icebergNamespace, "tl"),
-        new Schema(
-            Types.NestedField.optional(0, "id", Types.LongType.get()),
-            Types.NestedField.optional(1, "strV", Types.StringType.get())));
-    Assert.assertEquals(
-        Arrays.asList(
-            TableColumn.of("id", DataTypes.BIGINT()),
-            TableColumn.of("strV", DataTypes.STRING())),
-        getTableEnv().from("tl").getSchema().getTableColumns());
-    Assert.assertTrue(getTableEnv().getCatalog(catalogName).get().tableExists(ObjectPath.fromString("db.tl")));
+    sql("CREATE TABLE tl(id BIGINT, strV STRING)");
+
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, "tl"));
+    Schema iSchema = new Schema(
+        Types.NestedField.optional(1, "id", Types.LongType.get()),
+        Types.NestedField.optional(2, "strV", Types.StringType.get())
+    );
+    Assert.assertEquals("Should load the expected iceberg schema", iSchema.toString(), table.schema().toString());
   }
 
   @Test
   public void testRenameTable() {
     Assume.assumeFalse("HadoopCatalog does not support rename table", isHadoopCatalog);
 
-    validationCatalog.createTable(
-        TableIdentifier.of(icebergNamespace, "tl"),
-        new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
+    final Schema tableSchema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
+    validationCatalog.createTable(TableIdentifier.of(icebergNamespace, "tl"), tableSchema);
     sql("ALTER TABLE tl RENAME TO tl2");
     AssertHelpers.assertThrows(
         "Should fail if trying to get a nonexistent table",
@@ -106,9 +102,8 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
         "Table `tl` was not found.",
         () -> getTableEnv().from("tl")
     );
-    Assert.assertEquals(
-        Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())),
-        getTableEnv().from("tl2").getSchema().getTableColumns());
+    Schema actualSchema = FlinkSchemaUtil.convert(getTableEnv().from("tl2").getSchema());
+    Assert.assertEquals(tableSchema.asStruct(), actualSchema.asStruct());
   }
 
   @Test
@@ -126,7 +121,6 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
     Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
   }
 
-  @Ignore("Enable this after upgrade flink to 1.12.0, because it starts to support 'CREATE TABLE IF NOT EXISTS")
   @Test
   public void testCreateTableIfNotExists() {
     sql("CREATE TABLE tl(id BIGINT)");
@@ -136,13 +130,23 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
 
     sql("DROP TABLE tl");
     AssertHelpers.assertThrows("Table 'tl' should be dropped",
-        NoSuchTableException.class, "Table does not exist: db.tl", () -> table("tl"));
+        NoSuchTableException.class,
+        "Table does not exist: " + getFullQualifiedTableName("tl"),
+        () -> table("tl"));
 
-    sql("CREATE TABLE IF NO EXISTS tl(id BIGINT)");
+    sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
     Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
 
-    sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT) WITH ('location'='/tmp/location')");
-    Assert.assertEquals("Should still be the old table.", Maps.newHashMap(), table("tl").properties());
+    final String uuid = UUID.randomUUID().toString();
+    final Map<String, String> expectedProperties = ImmutableMap.of("uuid", uuid);
+    table("tl").updateProperties()
+        .set("uuid", uuid)
+        .commit();
+    Assert.assertEquals(expectedProperties, table("tl").properties());
+
+    sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
+    Assert.assertEquals("Should still be the old table.",
+        expectedProperties, table("tl").properties());
   }
 
   @Test
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index f064c53..71753fb 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Expressions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
@@ -46,12 +47,22 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class TestFlinkTableSink extends FlinkCatalogTestBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
   private static final String TABLE_NAME = "test_table";
   private TableEnvironment tEnv;
   private Table icebergTable;
@@ -89,7 +100,8 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
             .useBlinkPlanner();
         if (isStreamingJob) {
           settingsBuilder.inStreamingMode();
-          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+          StreamExecutionEnvironment env = StreamExecutionEnvironment
+              .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
           env.enableCheckpointing(400);
           env.setMaxParallelism(2);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
index 229812a..97b9f11 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
@@ -455,14 +455,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlNotIn = String.format("SELECT * FROM %s WHERE id NOT IN (3,2) ", TABLE_NAME);
     String explainNotIn = getTableEnv().explainSql(sqlNotIn);
     Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
-    String expectedFilter = "ref(name=\"id\") != 3,ref(name=\"id\") != 2";
+    String expectedFilter = "FilterPushDown: ref(name=\"id\") != 2,ref(name=\"id\") != 3";
     Assert.assertTrue("Explain should contain the push down filter", explainNotIn.contains(expectedFilter));
 
     List<Object[]> resultNotIn = sql(sqlNotIn);
     Assert.assertEquals("Should have 1 record", 1, resultNotIn.size());
     Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultNotIn.get(0));
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
-    String expectedScan = "(ref(name=\"id\") != 3 and ref(name=\"id\") != 2)";
+    String expectedScan = "(ref(name=\"id\") != 2 and ref(name=\"id\") != 3)";
     Assert.assertEquals("Should contain the push down filter", expectedScan, lastScanEvent.filter().toString());
   }
 
@@ -617,12 +617,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
     explainNoPushDown = getTableEnv().explainSql(sqlNoPushDown);
     Assert.assertFalse("Explain should not contain FilterPushDown",
         explainNoPushDown.contains(expectedFilterPushDownExplain));
+
     sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%%' ";
     resultLike = sql(sqlNoPushDown);
-    Assert.assertEquals("Should have 2 records", 2, resultLike.size());
+    Assert.assertEquals("Should have 3 records", 3, resultLike.size());
     List<Object[]> expectedRecords = Lists.newArrayList();
     expectedRecords.add(new Object[] {1, "iceberg", 10.0});
     expectedRecords.add(new Object[] {2, "b", 20.0});
+    expectedRecords.add(new Object[] {3, null, 30.0});
     Assert.assertArrayEquals("Should produce the expected record", expectedRecords.toArray(), resultLike.toArray());
     Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index e4ca091..62865f5 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
@@ -41,7 +40,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.conversion.DataStructureConverter;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
-import org.apache.flink.table.runtime.types.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
@@ -62,9 +61,8 @@ public class TestHelpers {
   }
 
   public static RowData copyRowData(RowData from, RowType rowType) {
-    ExecutionConfig config = new ExecutionConfig();
     TypeSerializer[] fieldSerializers = rowType.getChildren().stream()
-        .map((LogicalType type) -> InternalSerializers.create(type, config))
+        .map((LogicalType type) -> InternalSerializers.create(type))
         .toArray(TypeSerializer[]::new);
 
     // Use rowType field count to avoid copy metadata column in case of merging position deletes
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 7982f27..dd79b97 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -35,37 +35,43 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class TestFlinkIcebergSink extends AbstractTestBase {
+public class TestFlinkIcebergSink {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
   private static final TypeInformation<Row> ROW_TYPE_INFO = new RowTypeInfo(
       SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
   private static final DataFormatConverters.RowConverter CONVERTER = new DataFormatConverters.RowConverter(
       SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
 
-  @Rule
-  public TemporaryFolder tempFolder = new TemporaryFolder();
-
   private String tablePath;
   private Table table;
   private StreamExecutionEnvironment env;
@@ -101,7 +107,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
 
   @Before
   public void before() throws IOException {
-    File folder = tempFolder.newFolder();
+    File folder = TEMPORARY_FOLDER.newFolder();
     String warehouse = folder.getAbsolutePath();
 
     tablePath = warehouse.concat("/test");
@@ -110,7 +116,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
     Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
     table = SimpleDataUtil.createTable(tablePath, props, partitioned);
 
-    env = StreamExecutionEnvironment.getExecutionEnvironment()
+    env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
         .enableCheckpointing(100)
         .setParallelism(parallelism)
         .setMaxParallelism(parallelism);
@@ -134,7 +140,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
         Row.of(3, "foo")
     );
     DataStream<RowData> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
-        .map(CONVERTER::toInternal, RowDataTypeInfo.of(SimpleDataUtil.ROW_TYPE));
+        .map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
 
     FlinkSink.forRowData(dataStream)
         .table(table)
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 93222dd..fd2a71a 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.iceberg.FileFormat;
@@ -39,6 +40,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableTestBase;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestTableLoader;
 import org.apache.iceberg.flink.source.BoundedTestSource;
@@ -49,12 +51,22 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class TestFlinkIcebergSinkV2 extends TableTestBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
   private static final int FORMAT_V2 = 2;
   private static final TypeInformation<Row> ROW_TYPE_INFO =
       new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
@@ -112,7 +124,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
         .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
         .commit();
 
-    env = StreamExecutionEnvironment.getExecutionEnvironment()
+    env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
         .enableCheckpointing(100L)
         .setParallelism(parallelism)
         .setMaxParallelism(parallelism);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 6d11389..fcc71d5 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -37,7 +37,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.DataStructureConverter;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AppendFiles;
@@ -54,6 +54,7 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.RowDataConverter;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -64,14 +65,23 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 @RunWith(Parameterized.class)
-public abstract class TestFlinkScan extends AbstractTestBase {
+public abstract class TestFlinkScan {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
   protected static final Schema SCHEMA = new Schema(
           required(1, "data", Types.StringType.get()),
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
index 2594a00..558ea3e 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
@@ -25,8 +25,10 @@ import java.util.Map;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
@@ -47,7 +49,7 @@ import org.junit.Test;
  */
 public class TestFlinkScanSql extends TestFlinkScan {
 
-  private TableEnvironment tEnv;
+  private volatile TableEnvironment tEnv;
 
   public TestFlinkScanSql(String fileFormat) {
     super(fileFormat);
@@ -56,12 +58,23 @@ public class TestFlinkScanSql extends TestFlinkScan {
   @Override
   public void before() throws IOException {
     super.before();
-    tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
-    tEnv.executeSql(String.format(
-        "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
-        warehouse));
-    tEnv.executeSql("use catalog iceberg_catalog");
-    tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+    sql("create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", warehouse);
+    sql("use catalog iceberg_catalog");
+    getTableEnv().getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+  }
+
+  private TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          this.tEnv = TableEnvironment.create(EnvironmentSettings
+              .newInstance()
+              .useBlinkPlanner()
+              .inBatchMode().build());
+        }
+      }
+    }
+    return tEnv;
   }
 
   @Override
@@ -82,8 +95,7 @@ public class TestFlinkScanSql extends TestFlinkScan {
       optionStr = String.format("/*+ OPTIONS(%s)*/", optionStr);
     }
 
-    String sql = String.format("select %s from t %s %s", select, optionStr, sqlFilter);
-    return executeSQL(sql);
+    return sql("select %s from t %s %s", select, optionStr, sqlFilter);
   }
 
   @Test
@@ -131,8 +143,7 @@ public class TestFlinkScanSql extends TestFlinkScan {
 
     // Make sure to generate 2 CombinedScanTasks
     long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), dataFile2.fileSizeInBytes());
-    executeSQL(String
-        .format("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen));
+    sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen);
 
     // 2 splits (max infer is the default value 100 , max > splits num), the parallelism is splits num : 2
     parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext);
@@ -165,8 +176,14 @@ public class TestFlinkScanSql extends TestFlinkScan {
     Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
   }
 
-  private List<Row> executeSQL(String sql) {
-    return Lists.newArrayList(tEnv.executeSql(sql).collect());
+  private List<Row> sql(String query, Object... args) {
+    TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
+    try (CloseableIterator<Row> iter = tableResult.collect()) {
+      List<Row> results = Lists.newArrayList(iter);
+      return results;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to collect table result", e);
+    }
   }
 
   private String optionToKv(String key, Object value) {
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 91ce983..c0dbc10 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.After;
@@ -68,7 +69,8 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
               .useBlinkPlanner()
               .inStreamingMode();
 
-          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+          StreamExecutionEnvironment env = StreamExecutionEnvironment
+              .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
           env.enableCheckpointing(400);
 
diff --git a/versions.props b/versions.props
index 55aa5dd..dc4cadd 100644
--- a/versions.props
+++ b/versions.props
@@ -1,7 +1,7 @@
 org.slf4j:* = 1.7.25
 org.apache.avro:avro = 1.9.2
 org.apache.calcite:* = 1.10.0
-org.apache.flink:* = 1.11.0
+org.apache.flink:* = 1.12.1
 org.apache.hadoop:* = 2.7.3
 org.apache.hive:hive-metastore = 2.3.8
 org.apache.hive:hive-serde = 2.3.8