You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/29 13:01:26 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #2217: Flink : migrate hive table to iceberg table

openinx commented on a change in pull request #2217:
URL: https://github.com/apache/iceberg/pull/2217#discussion_r603259159



##########
File path: flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
##########
@@ -50,4 +53,34 @@ public RewriteDataFilesAction rewriteDataFiles() {
     return new RewriteDataFilesAction(env, table);
   }
 
+  /**
+   * Migrate an exist hive table into iceberg table
+   *
+   * @param flinkHiveCatalog    the HiveCatalog of flink, we get hive table info from this
+   * @param hiveSourceDatabaseName    source hive database name
+   * @param hiveSourceTableName source hive table name
+   * @param icebergCatalog      target iceberg catalog
+   * @param baseNamespace       target iceberg Namespace
+   * @param icebergDbName       target iceberg database name
+   * @param icebergTableName    target iceberg table name
+   * @return the MigrateAction
+   */
+  public static MigrateAction migrateHive2Iceberg(HiveCatalog flinkHiveCatalog, String hiveSourceDatabaseName,
+                                                  String hiveSourceTableName, Catalog icebergCatalog,
+                                                  Namespace baseNamespace,
+                                                  String icebergDbName, String icebergTableName) {
+    return migrateHive2Iceberg(StreamExecutionEnvironment.getExecutionEnvironment(), flinkHiveCatalog,
+        hiveSourceDatabaseName,
+        hiveSourceTableName, icebergCatalog, baseNamespace,
+        icebergDbName, icebergTableName);
+  }
+
+  public static MigrateAction migrateHive2Iceberg(StreamExecutionEnvironment env, HiveCatalog flinkHiveCatalog,
+                                                  String hiveSourceDatabaseName, String hiveSourceTableName,
+                                                  Catalog icebergCatalog, Namespace baseNamespace,
+                                                  String icebergDbName, String icebergTableName) {

Review comment:
       Here we pass the `namespace`, `database`, `table` , then how about just use a `TableIdentifier` ?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/actions/MigrateAction.java
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.util.ArrayUtils;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.Action;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MigrateAction implements Action<List<ManifestFile>> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MigrateAction.class);
+
+  private static final String PATQUET_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
+  private static final String ORC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+  private static final PathFilter HIDDEN_PATH_FILTER =
+      p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
+  private static final String ICEBERG_METADATA_FOLDER = "metadata";
+
+  private final StreamExecutionEnvironment env;
+  private final HiveCatalog flinkHiveCatalog; // the HiveCatalog of flink
+  private final String hiveSourceDatabaseName;
+  private final String hiveSourceTableName;
+  private final Catalog icebergCatalog;
+  private final Namespace baseNamespace;
+  private final String icebergDatabaseName;
+  private final String icebergTableName;
+  private int maxParallelism;
+
+  public MigrateAction(StreamExecutionEnvironment env, HiveCatalog flinkHiveCatalog, String hiveSourceDatabaseName,
+                       String hiveSourceTableName, Catalog icebergCatalog, Namespace baseNamespace,
+                       String icebergDatabaseName,
+                       String icebergTableName) {
+    this.env = env;
+    this.flinkHiveCatalog = flinkHiveCatalog;
+    this.hiveSourceDatabaseName = hiveSourceDatabaseName;
+    this.hiveSourceTableName = hiveSourceTableName;
+    this.icebergCatalog = icebergCatalog;
+    this.baseNamespace = baseNamespace;
+    this.icebergDatabaseName = icebergDatabaseName;
+    this.icebergTableName = icebergTableName;
+    this.maxParallelism = env.getParallelism();
+  }
+
+  @Override
+  public List<ManifestFile> execute() {
+    flinkHiveCatalog.open();
+    // hive source table
+    ObjectPath tableSource = new ObjectPath(hiveSourceDatabaseName, hiveSourceTableName);
+    org.apache.hadoop.hive.metastore.api.Table hiveTable;
+    try {
+      hiveTable = flinkHiveCatalog.getHiveTable(tableSource);
+    } catch (TableNotExistException e) {
+      throw new RuntimeException(String.format("The source table %s not exists ! ", hiveSourceTableName));
+    }
+
+    List<FieldSchema> fieldSchemaList = hiveTable.getSd().getCols();
+    List<FieldSchema> partitionList = hiveTable.getPartitionKeys();
+    fieldSchemaList.addAll(partitionList);
+    Schema icebergSchema = HiveSchemaUtil.convert(fieldSchemaList);
+    PartitionSpec spec = toPartitionSpec(partitionList, icebergSchema);
+
+    FileFormat fileFormat = getHiveFileFormat(hiveTable);
+
+    TableIdentifier identifier = TableIdentifier.of(toNamespace(), icebergTableName);
+    String hiveLocation = hiveTable.getSd().getLocation();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+    if (!baseNamespace.isEmpty()) {
+      properties.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString());
+    }
+
+    Table icebergTable;
+    if (icebergCatalog instanceof HadoopCatalog) {

Review comment:
       @zhangjun0x01 Is possible to use the `FlinkCatalog` to create the table ?  Then we don't have to use different code paths to create the iceberg table,  I'm concerning that there will be more Catalog that will be introduced in future.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
##########
@@ -50,4 +53,34 @@ public RewriteDataFilesAction rewriteDataFiles() {
     return new RewriteDataFilesAction(env, table);
   }
 
+  /**
+   * Migrate an exist hive table into iceberg table
+   *
+   * @param flinkHiveCatalog    the HiveCatalog of flink, we get hive table info from this
+   * @param hiveSourceDatabaseName    source hive database name
+   * @param hiveSourceTableName source hive table name
+   * @param icebergCatalog      target iceberg catalog
+   * @param baseNamespace       target iceberg Namespace
+   * @param icebergDbName       target iceberg database name
+   * @param icebergTableName    target iceberg table name
+   * @return the MigrateAction
+   */
+  public static MigrateAction migrateHive2Iceberg(HiveCatalog flinkHiveCatalog, String hiveSourceDatabaseName,
+                                                  String hiveSourceTableName, Catalog icebergCatalog,
+                                                  Namespace baseNamespace,
+                                                  String icebergDbName, String icebergTableName) {
+    return migrateHive2Iceberg(StreamExecutionEnvironment.getExecutionEnvironment(), flinkHiveCatalog,
+        hiveSourceDatabaseName,
+        hiveSourceTableName, icebergCatalog, baseNamespace,
+        icebergDbName, icebergTableName);
+  }
+
+  public static MigrateAction migrateHive2Iceberg(StreamExecutionEnvironment env, HiveCatalog flinkHiveCatalog,
+                                                  String hiveSourceDatabaseName, String hiveSourceTableName,

Review comment:
       Those `database` & `table` could just be simplified by a `TableIdentifier`.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/actions/MigrateAction.java
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.util.ArrayUtils;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.Action;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Migrate exist hive table to iceberg table.
+ * <p>
+ * It support migrate hive table with parquet and orc format now, we can migrate it to iceberg table with {@link
+ * HadoopCatalog} and {@link HiveCatalog}.
+ * <p>
+ * The migration method is to keep the data file in the original hive table unchanged, and then create a new iceberg
+ * table, the new iceberg table use the data file of hive, and generate new metadata for the iceberg table.
+ * <p>
+ * In order to prevent data from being written to hive during the migration process so that new data cannot be migrated.
+ * so when we do the migration, we need to stop the writing hive job first, then migrate, and finally modify the logic
+ * to reading and writing iceberg table,If migrate failed, we will clean the iceberg table and metadata.If unfortunately
+ * the clean failed, you may need to manually clean the iceberg table and manifests.
+ */
+public class MigrateAction implements Action<List<ManifestFile>> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MigrateAction.class);
+
+  private static final String PARQUET_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
+  private static final String ORC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+
+  private static final String ICEBERG_METADATA_FOLDER = "metadata";
+
+  private final StreamExecutionEnvironment env;
+  private final HiveCatalog flinkHiveCatalog; // the HiveCatalog of flink
+  private final String hiveSourceDatabaseName;
+  private final String hiveSourceTableName;
+  private final Catalog icebergCatalog;
+  private final Namespace baseNamespace;
+  private final String icebergDatabaseName;
+  private final String icebergTableName;
+  private int maxParallelism;
+
+  public MigrateAction(StreamExecutionEnvironment env, HiveCatalog flinkHiveCatalog, String hiveSourceDatabaseName,
+                       String hiveSourceTableName, Catalog icebergCatalog, Namespace baseNamespace,
+                       String icebergDatabaseName,
+                       String icebergTableName) {
+    this.env = env;
+    this.flinkHiveCatalog = flinkHiveCatalog;
+    this.hiveSourceDatabaseName = hiveSourceDatabaseName;
+    this.hiveSourceTableName = hiveSourceTableName;
+    this.icebergCatalog = icebergCatalog;
+    this.baseNamespace = baseNamespace;
+    this.icebergDatabaseName = icebergDatabaseName;
+    this.icebergTableName = icebergTableName;
+    this.maxParallelism = env.getParallelism();
+  }
+
+  @Override
+  public List<ManifestFile> execute() {
+    flinkHiveCatalog.open();
+    // hive source table
+    ObjectPath tableSource = new ObjectPath(hiveSourceDatabaseName, hiveSourceTableName);
+    org.apache.hadoop.hive.metastore.api.Table hiveTable;
+    try {
+      hiveTable = flinkHiveCatalog.getHiveTable(tableSource);
+    } catch (TableNotExistException e) {
+      throw new RuntimeException(String.format("The source table %s not exists ! ", hiveSourceTableName));
+    }
+
+    List<FieldSchema> fieldSchemaList = hiveTable.getSd().getCols();
+    List<FieldSchema> partitionList = hiveTable.getPartitionKeys();
+    fieldSchemaList.addAll(partitionList);
+    Schema icebergSchema = HiveSchemaUtil.convert(fieldSchemaList);
+    PartitionSpec spec = HiveSchemaUtil.spec(icebergSchema, partitionList);
+
+    FileFormat fileFormat = getHiveFileFormat(hiveTable);
+
+    Namespace namespace = Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] {icebergDatabaseName}));

Review comment:
       The `ArrayUtils` is a `Internal` API for flink,  I will suggest not to use it because it may be changed in the next release.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org