You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/02/02 07:57:39 UTC
[iceberg] branch master updated: Flink: Upgrade version from 1.11.0
to 1.12.1 (#1956)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c5e6791 Flink: Upgrade version from 1.11.0 to 1.12.1 (#1956)
c5e6791 is described below
commit c5e67915b01c2cbf2eea40a412c2765f0c8278e8
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon Feb 1 23:57:24 2021 -0800
Flink: Upgrade version from 1.11.0 to 1.12.1 (#1956)
---
.../org/apache/iceberg/flink/FlinkCatalog.java | 3 +-
.../org/apache/iceberg/flink/actions/Actions.java | 8 ++-
.../org/apache/iceberg/flink/sink/FlinkSink.java | 4 +-
.../apache/iceberg/flink/source/FlinkSource.java | 4 +-
.../FlinkCompatibilityUtil.java} | 40 ++++++++-------
.../apache/iceberg/flink/FlinkCatalogTestBase.java | 8 +++
.../org/apache/iceberg/flink/FlinkTestBase.java | 31 ++++++------
.../apache/iceberg/flink/MiniClusterResource.java | 57 ++++++++++++++++++++++
.../iceberg/flink/TestFlinkCatalogTable.java | 52 +++++++++++---------
.../apache/iceberg/flink/TestFlinkTableSink.java | 14 +++++-
.../apache/iceberg/flink/TestFlinkTableSource.java | 8 +--
.../java/org/apache/iceberg/flink/TestHelpers.java | 6 +--
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 26 ++++++----
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 14 +++++-
.../apache/iceberg/flink/source/TestFlinkScan.java | 14 +++++-
.../iceberg/flink/source/TestFlinkScanSql.java | 43 +++++++++++-----
.../iceberg/flink/source/TestStreamScanSql.java | 4 +-
versions.props | 2 +-
18 files changed, 235 insertions(+), 103 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4917e9a..de2fbe0 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -69,6 +69,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -456,7 +457,7 @@ public class FlinkCatalog extends AbstractCatalog {
TableSchema schema = table.getSchema();
schema.getTableColumns().forEach(column -> {
- if (column.isGenerated()) {
+ if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
}
});
diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
index 3fc2bb7..98702ce 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
@@ -19,11 +19,17 @@
package org.apache.iceberg.flink.actions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.iceberg.Table;
public class Actions {
+ public static final Configuration CONFIG = new Configuration()
+ // disable classloader check as Avro may cache class/object in the serializers.
+ .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
private StreamExecutionEnvironment env;
private Table table;
@@ -37,7 +43,7 @@ public class Actions {
}
public static Actions forTable(Table table) {
- return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), table);
+ return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);
}
public RewriteDataFilesAction rewriteDataFiles() {
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index c691e58..f679e23 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
@@ -45,6 +44,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -101,7 +101,7 @@ public class FlinkSink {
DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
- return builderFor(input, rowConverter::toInternal, RowDataTypeInfo.of(rowType))
+ return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType))
.tableSchema(tableSchema);
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index c10bdf3..84507c4 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
@@ -40,6 +39,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkTableOptions;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -204,7 +204,7 @@ public class FlinkSource {
FlinkInputFormat format = buildFormat();
ScanContext context = contextBuilder.build();
- TypeInformation<RowData> typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project()));
+ TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
if (!context.isStreaming()) {
int parallelism = inferParallelism(format, context);
diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java b/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
similarity index 51%
copy from flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
copy to flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
index 3fc2bb7..274d2a8 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
@@ -17,31 +17,29 @@
* under the License.
*/
-package org.apache.iceberg.flink.actions;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.iceberg.Table;
-
-public class Actions {
-
- private StreamExecutionEnvironment env;
- private Table table;
-
- private Actions(StreamExecutionEnvironment env, Table table) {
- this.env = env;
- this.table = table;
- }
+package org.apache.iceberg.flink.util;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * This is a small util class that try to hide calls to Flink
+ * Internal or PublicEvolve interfaces as Flink can change
+ * those APIs during minor version release.
+ */
+public class FlinkCompatibilityUtil {
- public static Actions forTable(StreamExecutionEnvironment env, Table table) {
- return new Actions(env, table);
+ private FlinkCompatibilityUtil() {
}
- public static Actions forTable(Table table) {
- return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), table);
+ public static TypeInformation<RowData> toTypeInfo(RowType rowType) {
+ return InternalTypeInfo.of(rowType);
}
- public RewriteDataFilesAction rewriteDataFiles() {
- return new RewriteDataFilesAction(env, table);
+ public static boolean isPhysicalColumn(TableColumn column) {
+ return column.isPhysical();
}
-
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index 0c588ad..3c5f25e 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.flink;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import org.apache.flink.util.ArrayUtils;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -28,6 +29,7 @@ import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.After;
@@ -119,6 +121,12 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
}
}
+ protected String getFullQualifiedTableName(String tableName) {
+ final List<String> levels = Lists.newArrayList(icebergNamespace.levels());
+ levels.add(tableName);
+ return Joiner.on('.').join(levels);
+ }
+
static String getURI(HiveConf conf) {
return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index dabdabe..a39c771 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -20,12 +20,12 @@
package org.apache.iceberg.flink;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -34,8 +34,17 @@ import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
-public abstract class FlinkTestBase extends AbstractTestBase {
+public abstract class FlinkTestBase extends TestBaseUtils {
+
+ @ClassRule
+ public static MiniClusterWithClientResource miniClusterResource =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
private static TestHiveMetastore metastore = null;
protected static HiveConf hiveConf = null;
@@ -87,25 +96,15 @@ public abstract class FlinkTestBase extends AbstractTestBase {
protected List<Object[]> sql(String query, Object... args) {
TableResult tableResult = exec(query, args);
-
- tableResult.getJobClient().ifPresent(c -> {
- try {
- c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- });
-
- List<Object[]> results = Lists.newArrayList();
try (CloseableIterator<Row> iter = tableResult.collect()) {
+ List<Object[]> results = Lists.newArrayList();
while (iter.hasNext()) {
Row row = iter.next();
results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
}
+ return results;
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Failed to collect table result", e);
}
-
- return results;
}
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java b/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
new file mode 100644
index 0000000..9dfa1ac
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+public class MiniClusterResource {
+
+ private static final int DEFAULT_TM_NUM = 1;
+ private static final int DEFAULT_PARALLELISM = 4;
+
+ public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG = new Configuration()
+ // disable classloader check as Avro may cache class/object in the serializers.
+ .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+ private MiniClusterResource() {
+
+ }
+
+ /**
+ * It will start a mini cluster with classloader.check-leaked-classloader=false,
+ * so that we won't break the unit tests because of the class loader leak issue.
+ * In our iceberg integration tests, there're some that will assert the results
+ * after finished the flink jobs, so actually we may access the class loader
+ * that has been closed by the flink task managers if we enable the switch
+ * classloader.check-leaked-classloader by default.
+ */
+ public static MiniClusterWithClientResource createWithClassloaderCheckDisabled() {
+ return new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(DEFAULT_TM_NUM)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .build());
+ }
+
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index bf1b091..62efd16 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -23,10 +23,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
@@ -45,6 +45,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
@@ -52,7 +53,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
@@ -79,26 +79,22 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
@Test
public void testGetTable() {
- validationCatalog.createTable(
- TableIdentifier.of(icebergNamespace, "tl"),
- new Schema(
- Types.NestedField.optional(0, "id", Types.LongType.get()),
- Types.NestedField.optional(1, "strV", Types.StringType.get())));
- Assert.assertEquals(
- Arrays.asList(
- TableColumn.of("id", DataTypes.BIGINT()),
- TableColumn.of("strV", DataTypes.STRING())),
- getTableEnv().from("tl").getSchema().getTableColumns());
- Assert.assertTrue(getTableEnv().getCatalog(catalogName).get().tableExists(ObjectPath.fromString("db.tl")));
+ sql("CREATE TABLE tl(id BIGINT, strV STRING)");
+
+ Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, "tl"));
+ Schema iSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "strV", Types.StringType.get())
+ );
+ Assert.assertEquals("Should load the expected iceberg schema", iSchema.toString(), table.schema().toString());
}
@Test
public void testRenameTable() {
Assume.assumeFalse("HadoopCatalog does not support rename table", isHadoopCatalog);
- validationCatalog.createTable(
- TableIdentifier.of(icebergNamespace, "tl"),
- new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
+ final Schema tableSchema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
+ validationCatalog.createTable(TableIdentifier.of(icebergNamespace, "tl"), tableSchema);
sql("ALTER TABLE tl RENAME TO tl2");
AssertHelpers.assertThrows(
"Should fail if trying to get a nonexistent table",
@@ -106,9 +102,8 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
"Table `tl` was not found.",
() -> getTableEnv().from("tl")
);
- Assert.assertEquals(
- Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())),
- getTableEnv().from("tl2").getSchema().getTableColumns());
+ Schema actualSchema = FlinkSchemaUtil.convert(getTableEnv().from("tl2").getSchema());
+ Assert.assertEquals(tableSchema.asStruct(), actualSchema.asStruct());
}
@Test
@@ -126,7 +121,6 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
}
- @Ignore("Enable this after upgrade flink to 1.12.0, because it starts to support 'CREATE TABLE IF NOT EXISTS")
@Test
public void testCreateTableIfNotExists() {
sql("CREATE TABLE tl(id BIGINT)");
@@ -136,13 +130,23 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
sql("DROP TABLE tl");
AssertHelpers.assertThrows("Table 'tl' should be dropped",
- NoSuchTableException.class, "Table does not exist: db.tl", () -> table("tl"));
+ NoSuchTableException.class,
+ "Table does not exist: " + getFullQualifiedTableName("tl"),
+ () -> table("tl"));
- sql("CREATE TABLE IF NO EXISTS tl(id BIGINT)");
+ sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
- sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT) WITH ('location'='/tmp/location')");
- Assert.assertEquals("Should still be the old table.", Maps.newHashMap(), table("tl").properties());
+ final String uuid = UUID.randomUUID().toString();
+ final Map<String, String> expectedProperties = ImmutableMap.of("uuid", uuid);
+ table("tl").updateProperties()
+ .set("uuid", uuid)
+ .commit();
+ Assert.assertEquals(expectedProperties, table("tl").properties());
+
+ sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
+ Assert.assertEquals("Should still be the old table.",
+ expectedProperties, table("tl").properties());
}
@Test
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index f064c53..71753fb 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
@@ -46,12 +47,22 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestFlinkTableSink extends FlinkCatalogTestBase {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
private static final String TABLE_NAME = "test_table";
private TableEnvironment tEnv;
private Table icebergTable;
@@ -89,7 +100,8 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
.useBlinkPlanner();
if (isStreamingJob) {
settingsBuilder.inStreamingMode();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
index 229812a..97b9f11 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
@@ -455,14 +455,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
String sqlNotIn = String.format("SELECT * FROM %s WHERE id NOT IN (3,2) ", TABLE_NAME);
String explainNotIn = getTableEnv().explainSql(sqlNotIn);
Object[] expectRecord = new Object[] {1, "iceberg", 10.0};
- String expectedFilter = "ref(name=\"id\") != 3,ref(name=\"id\") != 2";
+ String expectedFilter = "FilterPushDown: ref(name=\"id\") != 2,ref(name=\"id\") != 3";
Assert.assertTrue("Explain should contain the push down filter", explainNotIn.contains(expectedFilter));
List<Object[]> resultNotIn = sql(sqlNotIn);
Assert.assertEquals("Should have 1 record", 1, resultNotIn.size());
Assert.assertArrayEquals("Should produce the expected record", expectRecord, resultNotIn.get(0));
Assert.assertEquals("Should create only one scan", 1, scanEventCount);
- String expectedScan = "(ref(name=\"id\") != 3 and ref(name=\"id\") != 2)";
+ String expectedScan = "(ref(name=\"id\") != 2 and ref(name=\"id\") != 3)";
Assert.assertEquals("Should contain the push down filter", expectedScan, lastScanEvent.filter().toString());
}
@@ -617,12 +617,14 @@ public class TestFlinkTableSource extends FlinkTestBase {
explainNoPushDown = getTableEnv().explainSql(sqlNoPushDown);
Assert.assertFalse("Explain should not contain FilterPushDown",
explainNoPushDown.contains(expectedFilterPushDownExplain));
+
sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%' ";
resultLike = sql(sqlNoPushDown);
- Assert.assertEquals("Should have 2 records", 2, resultLike.size());
+ Assert.assertEquals("Should have 3 records", 3, resultLike.size());
List<Object[]> expectedRecords = Lists.newArrayList();
expectedRecords.add(new Object[] {1, "iceberg", 10.0});
expectedRecords.add(new Object[] {2, "b", 20.0});
+ expectedRecords.add(new Object[] {3, null, 30.0});
Assert.assertArrayEquals("Should produce the expected record", expectedRecords.toArray(), resultLike.toArray());
Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index e4ca091..62865f5 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
@@ -41,7 +40,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
-import org.apache.flink.table.runtime.types.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
@@ -62,9 +61,8 @@ public class TestHelpers {
}
public static RowData copyRowData(RowData from, RowType rowType) {
- ExecutionConfig config = new ExecutionConfig();
TypeSerializer[] fieldSerializers = rowType.getChildren().stream()
- .map((LogicalType type) -> InternalSerializers.create(type, config))
+ .map((LogicalType type) -> InternalSerializers.create(type))
.toArray(TypeSerializer[]::new);
// Use rowType field count to avoid copy metadata column in case of merging position deletes
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 7982f27..dd79b97 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -35,37 +35,43 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-public class TestFlinkIcebergSink extends AbstractTestBase {
+public class TestFlinkIcebergSink {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
private static final TypeInformation<Row> ROW_TYPE_INFO = new RowTypeInfo(
SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
private static final DataFormatConverters.RowConverter CONVERTER = new DataFormatConverters.RowConverter(
SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
private String tablePath;
private Table table;
private StreamExecutionEnvironment env;
@@ -101,7 +107,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
@Before
public void before() throws IOException {
- File folder = tempFolder.newFolder();
+ File folder = TEMPORARY_FOLDER.newFolder();
String warehouse = folder.getAbsolutePath();
tablePath = warehouse.concat("/test");
@@ -110,7 +116,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
table = SimpleDataUtil.createTable(tablePath, props, partitioned);
- env = StreamExecutionEnvironment.getExecutionEnvironment()
+ env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
@@ -134,7 +140,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
Row.of(3, "foo")
);
DataStream<RowData> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
- .map(CONVERTER::toInternal, RowDataTypeInfo.of(SimpleDataUtil.ROW_TYPE));
+ .map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
FlinkSink.forRowData(dataStream)
.table(table)
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 93222dd..fd2a71a 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.FileFormat;
@@ -39,6 +40,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestTableLoader;
import org.apache.iceberg.flink.source.BoundedTestSource;
@@ -49,12 +51,22 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestFlinkIcebergSinkV2 extends TableTestBase {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
private static final int FORMAT_V2 = 2;
private static final TypeInformation<Row> ROW_TYPE_INFO =
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
@@ -112,7 +124,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
.set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
.commit();
- env = StreamExecutionEnvironment.getExecutionEnvironment()
+ env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100L)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 6d11389..fcc71d5 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -37,7 +37,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
@@ -54,6 +54,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -64,14 +65,23 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.required;
@RunWith(Parameterized.class)
-public abstract class TestFlinkScan extends AbstractTestBase {
+public abstract class TestFlinkScan {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
protected static final Schema SCHEMA = new Schema(
required(1, "data", Types.StringType.get()),
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
index 2594a00..558ea3e 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
@@ -25,8 +25,10 @@ import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
@@ -47,7 +49,7 @@ import org.junit.Test;
*/
public class TestFlinkScanSql extends TestFlinkScan {
- private TableEnvironment tEnv;
+ private volatile TableEnvironment tEnv;
public TestFlinkScanSql(String fileFormat) {
super(fileFormat);
@@ -56,12 +58,23 @@ public class TestFlinkScanSql extends TestFlinkScan {
@Override
public void before() throws IOException {
super.before();
- tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
- tEnv.executeSql(String.format(
- "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
- warehouse));
- tEnv.executeSql("use catalog iceberg_catalog");
- tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+ sql("create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", warehouse);
+ sql("use catalog iceberg_catalog");
+ getTableEnv().getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+ }
+
+ private TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ if (tEnv == null) {
+ this.tEnv = TableEnvironment.create(EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inBatchMode().build());
+ }
+ }
+ }
+ return tEnv;
}
@Override
@@ -82,8 +95,7 @@ public class TestFlinkScanSql extends TestFlinkScan {
optionStr = String.format("/*+ OPTIONS(%s)*/", optionStr);
}
- String sql = String.format("select %s from t %s %s", select, optionStr, sqlFilter);
- return executeSQL(sql);
+ return sql("select %s from t %s %s", select, optionStr, sqlFilter);
}
@Test
@@ -131,8 +143,7 @@ public class TestFlinkScanSql extends TestFlinkScan {
// Make sure to generate 2 CombinedScanTasks
long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), dataFile2.fileSizeInBytes());
- executeSQL(String
- .format("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen));
+ sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen);
// 2 splits (max infer is the default value 100 , max > splits num), the parallelism is splits num : 2
parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext);
@@ -165,8 +176,14 @@ public class TestFlinkScanSql extends TestFlinkScan {
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
}
- private List<Row> executeSQL(String sql) {
- return Lists.newArrayList(tEnv.executeSql(sql).collect());
+ private List<Row> sql(String query, Object... args) {
+ TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
+ try (CloseableIterator<Row> iter = tableResult.collect()) {
+ List<Row> results = Lists.newArrayList(iter);
+ return results;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to collect table result", e);
+ }
}
private String optionToKv(String key, Object value) {
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 91ce983..c0dbc10 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.After;
@@ -68,7 +69,8 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
.useBlinkPlanner()
.inStreamingMode();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(400);
diff --git a/versions.props b/versions.props
index 55aa5dd..dc4cadd 100644
--- a/versions.props
+++ b/versions.props
@@ -1,7 +1,7 @@
org.slf4j:* = 1.7.25
org.apache.avro:avro = 1.9.2
org.apache.calcite:* = 1.10.0
-org.apache.flink:* = 1.11.0
+org.apache.flink:* = 1.12.1
org.apache.hadoop:* = 2.7.3
org.apache.hive:hive-metastore = 2.3.8
org.apache.hive:hive-serde = 2.3.8