You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/17 03:45:57 UTC

[incubator-inlong] branch master updated: [INLONG-4157][Sort] Sort lightwieght support load data to Iceberg (#4210)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f52fd5f58 [INLONG-4157][Sort] Sort lightwieght support load data to Iceberg (#4210)
f52fd5f58 is described below

commit f52fd5f583d28d775ce502b7658538a0e205804a
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Tue May 17 11:45:52 2022 +0800

    [INLONG-4157][Sort] Sort lightwieght support load data to Iceberg (#4210)
---
 .../sort/protocol/node/load/IcebergLoadNode.java   |  97 +++++++++++++
 inlong-sort/sort-connectors/pom.xml                |  42 ++++--
 inlong-sort/sort-single-tenant/pom.xml             |  14 --
 .../flink/parser/IcebergNodeSqlParserTest.java     | 159 +++++++++++++++++++++
 licenses/inlong-sort/LICENSE                       |  36 ++---
 pom.xml                                            |  16 ++-
 6 files changed, 317 insertions(+), 47 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
new file mode 100644
index 000000000..460ee1251
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -0,0 +1,97 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("icebergLoad")
+@Data
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class IcebergLoadNode extends LoadNode implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    @JsonProperty("tableName")
+    @Nonnull
+    private String tableName;
+
+    @JsonProperty("tableName")
+    @Nonnull
+    private String dbName;
+
+    @JsonCreator
+    public IcebergLoadNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("dbName") String dbName,
+            @Nonnull @JsonProperty("tableName") String tableName) {
+        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
+        this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+        options.put("connector", "iceberg");
+        options.put("catalog-database", dbName);
+        options.put("catalog-table", tableName);
+        options.put("default-database", dbName);
+        return options;
+    }
+
+    @Override
+    public String genTableName() {
+        return tableName;
+    }
+
+    @Override
+    public String getPrimaryKey() {
+        return super.getPrimaryKey();
+    }
+
+    @Override
+    public List<FieldInfo> getPartitionFields() {
+        return super.getPartitionFields();
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index f4713775c..eeb4a41d3 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -84,19 +84,24 @@
             <artifactId>hadoop-common</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.orc</groupId>
-            <artifactId>orc-core</artifactId>
-            <classifier>nohive</classifier>
-        </dependency>
         <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-jdbc</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>parquet-hadoop-bundle</artifactId>
+                    <groupId>org.apache.parquet</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
+        <!--for iceberg-->
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-minicluster</artifactId>
-            <scope>test</scope>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-flink-1.13</artifactId>
         </dependency>
         <!--for doris-->
         <dependency>
@@ -113,6 +118,27 @@
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client-all</artifactId>
         </dependency>
+
+        <!--format dependency-->
+        <dependency>
+            <groupId>org.apache.orc</groupId>
+            <artifactId>orc-core</artifactId>
+            <classifier>nohive</classifier>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+        <!--test-->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 398653e42..cd29ee38b 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -79,20 +79,6 @@
             <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.iceberg</groupId>
-            <artifactId>iceberg-flink</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.iceberg</groupId>
-                    <artifactId>iceberg-hive-metastore</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.hadoop</groupId>
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/IcebergNodeSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/IcebergNodeSqlParserTest.java
new file mode 100644
index 000000000..dd33d0a02
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/IcebergNodeSqlParserTest.java
@@ -0,0 +1,159 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for Iceberg SQL parser.
+ */
+public class IcebergNodeSqlParserTest extends AbstractTestBase {
+
+    private MySqlExtractNode buildMySQLExtractNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()),
+                new FieldInfo("event_type", new StringFormatInfo()));
+        // if you hope hive load mode of append, please add this config
+        Map<String, String> map = new HashMap<>();
+        map.put("append-mode", "true");
+        return new MySqlExtractNode(id, "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("work1"), "localhost", "root", "123456",
+                "inlong", null, null,
+                null, null);
+    }
+
+    private IcebergLoadNode buildIcebergLoadNodeWithHadoopCatalog() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("salary", new StringFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelationShip> relations = Arrays
+                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                                new FieldInfo("ts", new TimestampFormatInfo()))
+                );
+
+        Map<String, String> props = new HashMap<>();
+        props.put("catalog-type", "hadoop");
+        props.put("catalog-name", "hadoop_prod");
+        props.put("warehouse", "hdfs://localhost:9000/iceberg/warehouse");
+        IcebergLoadNode node = new IcebergLoadNode("iceberg", "iceberg_output", fields, relations,
+                null, null, null, props, "inlong", "inlong_iceberg");
+        return node;
+    }
+
+    private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelationShip> relations = Arrays
+                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                                new FieldInfo("ts", new TimestampFormatInfo()))
+                );
+
+        // set HIVE_CONF_DIR,or set uri and warehouse
+        Map<String, String> props = new HashMap<>();
+        props.put("catalog-type", "hive");
+        props.put("catalog-name", "hive_prod");
+        props.put("catalog-database", "default");
+        props.put("uri", "thrift://localhost:9083");
+        props.put("warehouse", "/hive/warehouse");
+        IcebergLoadNode node = new IcebergLoadNode("iceberg", "iceberg_output", fields, relations,
+                null, null, null, props, "inlong", "inlong_iceberg");
+        return node;
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelationShip(inputIds, outputIds);
+    }
+
+    @Test
+    public void testIceberg() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMySQLExtractNode("1");
+        Node outputNode = buildIcebergLoadNodeWithHiveCatalog();
+        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
+                Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("group_id", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        FlinkSqlParseResult result = parser.parse();
+        Assert.assertTrue(!result.getLoadSqls().isEmpty() && !result.getCreateTableSqls().isEmpty());
+    }
+}
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index 7703d2d96..22c6de5de 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -506,8 +506,8 @@ The text of each license is the standard Apache 2.0 license.
   org.apache.arrow:arrow-vector:0.8.0 - Arrow Vectors (https://arrow.apache.org/), (Apache License, Version 2.0)
   org.apache.yetus:audience-annotations:0.5.0 - Apache Yetus - Audience Annotations (https://yetus.apache.org/), (Apache License, Version 2.0)
   org.apache.calcite.avatica:avatica-core:1.17.0 - Apache Calcite Avatica (https://calcite.apache.org/avatica), (The Apache License, Version 2.0)
-  org.apache.avro:avro:1.11.0 - Apache Avro (https://avro.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.avro:avro-ipc:1.11.0 - Apache Avro IPC (https://avro.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.avro:avro:1.10.1 - Apache Avro (https://avro.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.avro:avro-ipc:1.10.1 - Apache Avro IPC (https://avro.apache.org), (The Apache Software License, Version 2.0)
   org.apache.pulsar:bouncy-castle-bc:2.8.1 - Apache Pulsar :: Bouncy Castle :: BC (https://github.com/apache/pulsar/tree/master/bouncy-castle/bc), (Apache License, Version 2.0)
   com.github.ben-manes.caffeine:caffeine:2.8.4 - Caffeine cache (https://github.com/ben-manes/caffeine), (Apache License, Version 2.0)
   ru.yandex.clickhouse:clickhouse-jdbc:0.3.1 - clickhouse-jdbc (https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc), (The Apache Software License, Version 2.0) 
@@ -643,9 +643,9 @@ The text of each license is the standard Apache 2.0 license.
   org.apache.orc:orc-core:1.5.6 - ORC Core (https://orc.apache.org/), (Apache License, Version 2.0)
   org.apache.orc:orc-core:1.6.7 - ORC Core (https://orc.apache.org/), (Apache License, Version 2.0)
   org.apache.orc:orc-shims:1.6.7 - ORC Shims (https://orc.apache.org/), (Apache License, Version 2.0)
-  org.apache.parquet:parquet-avro:1.12.0 - Apache Parquet Avro (https://parquet.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.parquet:parquet-common:1.11.1 - Apache Parquet Common (https://parquet.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.parquet:parquet-format-structures:1.11.1 - Apache Parquet Format Structures (https://parquet.apache.org/), (The Apache Software License, Version 2.0)
+  org.apache.parquet:parquet-avro:1.12.2 - Apache Parquet Avro (https://parquet.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.parquet:parquet-common:1.12.2 - Apache Parquet Common (https://parquet.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.parquet:parquet-format-structures:1.12.2 - Apache Parquet Format Structures (https://parquet.apache.org/), (The Apache Software License, Version 2.0)
   org.apache.pulsar:pulsar-client-admin-api:2.8.1 - Pulsar Client Admin :: API (https://github.com/apache/pulsar/tree/v2.8.1), (Apache License, Version 2.0)
   org.apache.pulsar:pulsar-client-all:2.8.1 - Pulsar Client All (https://github.com/apache/pulsar/tree/v2.8.1), (Apache License, Version 2.0)
   org.apache.pulsar:pulsar-client-api:2.8.1 - Pulsar Client :: API (https://github.com/apache/pulsar/tree/v2.8.1), (Apache License, Version 2.0)
@@ -694,15 +694,15 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
   org.apache.hadoop:hadoop-yarn-server-resourcemanager:2.10.1 - Apache Hadoop YARN ResourceManager (https://github.com/apache/hadoop/tree/branch-2.10.1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager), (Apache License, Version 2.0)
   org.apache.hadoop:hadoop-yarn-server-web-proxy:2.10.1 - Apache Hadoop YARN Web Proxy (https://github.com/apache/hadoop/tree/branch-2.10.1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy), (Apache License, Version 2.0)
   com.carrotsearch:hppc:0.7.2 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.7.2), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-api:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-bundled-guava:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-common:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-core:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-data:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-flink:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-hive-metastore:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-orc:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-parquet:0.12.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-api:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-bundled-guava:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-common:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-core:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-data:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-flink-1.13:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-hive-metastore:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-orc:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-parquet:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
   org.codehaus.jackson:jackson-core-asl:1.9.13 - Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-core-asl/1.9.13), (The Apache Software License, Version 2.0)
   org.mortbay.jetty:jetty:6.1.26 - Jetty Server (https://www.eclipse.org/jetty), (Apache Software License - Version 2.0), (Apache 2.0 and EPL 1.0)
   org.mortbay.jetty:jetty-sslengine:6.1.26 - Jetty SSLEngine (https://www.eclipse.org/jetty), (Apache License Version 2)
@@ -729,10 +729,10 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
   org.apache.thrift:libfb303:0.9.3 - Apache Thrift (https://github.com/apache/thrift/tree/0.9.3), (The Apache Software License, Version 2.0)
   org.apache.thrift:libthrift:0.9.3 - Apache Thrift (https://github.com/apache/thrift/tree/0.9.3), (The Apache Software License, Version 2.0)
   log4j:log4j:1.2.17 - Apache Log4j (http://logging.apache.org/log4j/1.2/), (The Apache Software License, Version 2.0)
-  org.apache.parquet:parquet-column:1.11.1 - Apache Parquet Column (https://parquet.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.parquet:parquet-encoding:1.11.1 - Apache Parquet Encodings (https://parquet.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.parquet:parquet-hadoop:1.11.1 - Apache Parquet Hadoop (https://parquet.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.parquet:parquet-jackson:1.11.1 - Apache Parquet Jackson (https://parquet.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.parquet:parquet-column:1.12.2 - Apache Parquet Column (https://parquet.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.parquet:parquet-encoding:1.12.2 - Apache Parquet Encodings (https://parquet.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.parquet:parquet-hadoop:1.12.2 - Apache Parquet Hadoop (https://parquet.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.parquet:parquet-jackson:1.12.2 - Apache Parquet Jackson (https://parquet.apache.org), (The Apache Software License, Version 2.0)
   org.apache.parquet:parquet-hadoop-bundle:1.8.1 - Apache Parquet Hadoop Bundle (https://parquet.apache.org), (The Apache Software License, Version 2.0)
   org.eclipse.jetty.websocket:websocket-api:9.3.20.v20170531 - Jetty :: Websocket :: API (https://github.com/eclipse/jetty.project/tree/jetty-9.3.20.v20170531/jetty-websocket/websocket-api), (Apache Software License - Version 2.0;  Eclipse Public License - Version 1.0)
   org.eclipse.jetty.websocket:websocket-client:9.3.20.v20170531 - Jetty :: Websocket :: Client (https://github.com/eclipse/jetty.project/tree/jetty-9.3.20.v20170531/jetty-websocket/websocket-client), (Apache Software License - Version 2.0;  Eclipse Public License - Version 1.0)
diff --git a/pom.xml b/pom.xml
index bfac1e7a6..91c393601 100644
--- a/pom.xml
+++ b/pom.xml
@@ -191,7 +191,7 @@
         <pulsar.version>2.8.1</pulsar.version>
         <pulsar.testcontainers.version>1.15.3</pulsar.testcontainers.version>
         <kafka.version>2.4.1</kafka.version>
-        <iceberg.flink.version>0.12.1</iceberg.flink.version>
+        <iceberg.flink.version>0.13.1</iceberg.flink.version>
         <iceberg.version>0.13.1</iceberg.version>
         <flink.version>1.13.5</flink.version>
         <flink.connector.mysql.cdc.version>2.0.2</flink.connector.mysql.cdc.version>
@@ -204,8 +204,9 @@
 
         <curator.version>2.12.0</curator.version>
 
-        <avro.version>1.11.0</avro.version>
+        <avro.version>1.10.1</avro.version>
         <orc.core.version>1.6.7</orc.core.version>
+        <parquet.version>1.12.2</parquet.version>
         <oro.version>2.0.8</oro.version>
         <akka.version>2.5.21</akka.version>
         <antlr.verison>4.0.4</antlr.verison>
@@ -942,17 +943,13 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.iceberg</groupId>
-                <artifactId>iceberg-flink</artifactId>
+                <artifactId>iceberg-flink-1.13</artifactId>
                 <version>${iceberg.flink.version}</version>
                 <exclusions>
                     <exclusion>
                         <groupId>org.apache.avro</groupId>
                         <artifactId>avro</artifactId>
                     </exclusion>
-                    <exclusion>
-                        <groupId>org.apache.iceberg</groupId>
-                        <artifactId>iceberg-hive-metastore</artifactId>
-                    </exclusion>
                 </exclusions>
             </dependency>
 
@@ -1070,6 +1067,11 @@
                 <version>${orc.core.version}</version>
                 <classifier>nohive</classifier>
             </dependency>
+            <dependency>
+                <groupId>org.apache.parquet</groupId>
+                <artifactId>parquet-hadoop</artifactId>
+                <version>${parquet.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.typesafe.akka</groupId>
                 <artifactId>akka-stream_2.11</artifactId>