You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/30 02:09:08 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #181: [FLINK-28067] Introduce HiveCatalog

JingsongLi commented on code in PR #181:
URL: https://github.com/apache/flink-table-store/pull/181#discussion_r910543240


##########
flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java:
##########
@@ -99,6 +92,54 @@ public void testReadExternalTable() throws Exception {
         }
     }
 
+    @Test
+    public void testFlinkWriteAndHiveRead() throws Exception {
+        String sql =
+                String.join(
+                        "\n",
+                        "CREATE CATALOG my_hive WITH (",

Review Comment:
   What dependencies need about hive catalog?



##########
flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A catalog implementation for Hive. */
+public class HiveCatalog implements Catalog {
+
+    // we don't include flink-table-store-hive-mr as dependencies because it depends on hive-exec
+    private static final String INPUT_FORMAT_CLASS_NAME =
+            "org.apache.flink.table.store.mapred.TableStoreInputFormat";
+    private static final String OUTPUT_FORMAT_CLASS_NAME =
+            "org.apache.flink.table.store.mapred.TableStoreOutputFormat";
+    private static final String SERDE_CLASS_NAME =
+            "org.apache.flink.table.store.hive.TableStoreSerDe";
+    private static final String STORAGE_HANDLER_CLASS_NAME =
+            "org.apache.flink.table.store.hive.TableStoreHiveStorageHandler";
+
+    private final HiveConf hiveConf;
+    private final IMetaStoreClient client;
+
+    public HiveCatalog(String thriftUri, String warehousePath) {
+        Configuration conf = new Configuration();
+        conf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
+        conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath);
+        this.hiveConf = new HiveConf(conf, HiveConf.class);
+        try {
+            IMetaStoreClient client =
+                    RetryingMetaStoreClient.getProxy(
+                            hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
+            this.client =
+                    StringUtils.isNullOrWhitespaceOnly(thriftUri)
+                            ? client
+                            : HiveMetaStoreClient.newSynchronizedClient(client);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() {
+        try {
+            return client.getAllDatabases();
+        } catch (TException e) {
+            throw new RuntimeException("Failed to list all databases", e);
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) {
+        try {
+            client.getDatabase(databaseName);
+            return true;
+        } catch (NoSuchObjectException e) {
+            return false;
+        } catch (TException e) {
+            throw new RuntimeException(
+                    "Failed to determine if database " + databaseName + " exists", e);
+        }
+    }
+
+    @Override
+    public void createDatabase(String name, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException {
+        try {
+            client.createDatabase(convertToDatabase(name));
+        } catch (AlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new DatabaseAlreadyExistException(name, e);
+            }
+        } catch (TException e) {
+            throw new RuntimeException("Failed to create database " + name, e);
+        }
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException {
+        try {
+            if (!cascade && client.getAllTables(name).size() > 0) {
+                throw new DatabaseNotEmptyException(name);
+            }
+            client.dropDatabase(name, true, false, true);
+        } catch (NoSuchObjectException | UnknownDBException e) {
+            if (!ignoreIfNotExists) {
+                throw new DatabaseNotExistException(name, e);
+            }
+        } catch (TException e) {
+            throw new RuntimeException("Failed to drop database " + name, e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws DatabaseNotExistException {
+        try {
+            return client.getAllTables(databaseName);
+        } catch (UnknownDBException e) {
+            throw new DatabaseNotExistException(databaseName, e);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to list all tables in database " + databaseName, e);
+        }
+    }
+
+    @Override
+    public Path getTableLocation(ObjectPath tablePath) {
+        return new Path(
+                getDatabaseLocation(tablePath.getDatabaseName()), tablePath.getObjectName());
+    }
+
+    @Override
+    public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
+        if (isTableStoreTableNotExisted(tablePath)) {
+            throw new TableNotExistException(tablePath);
+        }
+        Path tableLocation = getTableLocation(tablePath);
+        return new SchemaManager(tableLocation)
+                .latest()
+                .orElseThrow(
+                        () -> new RuntimeException("There is no table stored in " + tableLocation));
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) {
+        try {
+            client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+            return true;
+        } catch (NoSuchObjectException e) {
+            return false;
+        } catch (TException e) {
+            throw new RuntimeException(
+                    "Failed to determine if table " + tablePath.getFullName() + " exists", e);
+        }
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException {
+        if (isTableStoreTableNotExisted(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException(tablePath);
+            }
+        }
+
+        try {
+            client.dropTable(
+                    tablePath.getDatabaseName(), tablePath.getObjectName(), true, false, true);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to drop table " + tablePath.getFullName(), e);
+        }
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        String databaseName = tablePath.getDatabaseName();
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(databaseName);
+        }
+        if (tableExists(tablePath)) {
+            if (ignoreIfExists) {
+                return;
+            } else {
+                throw new TableAlreadyExistException(tablePath);
+            }
+        }
+
+        // first commit changes to underlying files
+        // if changes on Hive fails there is no harm to perform the same changes to files again
+        TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
+        Table table = newHmsTable(tablePath);
+        updateHmsTable(table, tablePath, schema);
+        try {
+            client.createTable(table);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to create table " + tablePath.getFullName(), e);
+        }
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfNotExists)
+            throws TableNotExistException {
+        if (isTableStoreTableNotExisted(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException(tablePath);
+            }
+        }
+
+        // first commit changes to underlying files
+        // if changes on Hive fails there is no harm to perform the same changes to files again
+        TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
+        try {
+            Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+            updateHmsTable(table, tablePath, schema);
+            client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to alter table " + tablePath.getFullName(), e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        client.close();
+    }
+
+    private String getWarehouseRoot() {
+        return hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+    }
+
+    private Path getDatabaseLocation(String name) {

Review Comment:
   Can we have a `AbstractCatalog`? I can see path is the same.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java:
##########
@@ -21,66 +21,82 @@
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.util.Preconditions;
 
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ServiceLoader;
 import java.util.Set;
-
-import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import java.util.stream.Collectors;
 
 /** Factory for {@link FlinkCatalog}. */
-public class FlinkCatalogFactory implements CatalogFactory {
+public class FlinkCatalogFactory implements org.apache.flink.table.factories.CatalogFactory {
 
     public static final String IDENTIFIER = "table-store";
 
+    public static final ConfigOption<String> CATALOG_TYPE =
+            ConfigOptions.key("catalog-type").stringType().noDefaultValue();
     public static final ConfigOption<String> DEFAULT_DATABASE =
             ConfigOptions.key("default-database").stringType().defaultValue("default");
 
-    public static final ConfigOption<String> WAREHOUSE =
-            ConfigOptions.key("warehouse")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The warehouse root path of catalog.");
-
     @Override
     public String factoryIdentifier() {
         return IDENTIFIER;
     }
 
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
-        Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(WAREHOUSE);
-        return options;
+        return Collections.emptySet();
     }
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(DEFAULT_DATABASE);
-        options.add(PROPERTY_VERSION);
-        return options;
+        return Collections.emptySet();
     }
 
     @Override
     public FlinkCatalog createCatalog(Context context) {
         FactoryUtil.CatalogFactoryHelper helper =
                 FactoryUtil.createCatalogFactoryHelper(this, context);
-        helper.validate();
         ReadableConfig options = helper.getOptions();
-        return createCatalog(
-                new Path(options.get(WAREHOUSE)), context.getName(), options.get(DEFAULT_DATABASE));
+        return createCatalog(context.getName(), options);
     }
 
-    public static FlinkCatalog createCatalog(Path warehouse, String catalogName) {
-        return createCatalog(warehouse, catalogName, DEFAULT_DATABASE.defaultValue());
-    }
+    public static FlinkCatalog createCatalog(String catalogName, ReadableConfig options) {
+        // manual validation
+        // because different catalog types may have different options
+        // we can't list them all in the optionalOptions() method
+        String catalogType =
+                Preconditions.checkNotNull(
+                        options.get(CATALOG_TYPE), "Table store catalog type must be set");

Review Comment:
   default can work with filesystem



##########
flink-table-store-hive/flink-table-store-hive-mr/pom.xml:
##########
@@ -0,0 +1,491 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-hive</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-hive-mr</artifactId>

Review Comment:
   `flink-table-store-hive-connector`?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java:
##########
@@ -21,66 +21,82 @@
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.util.Preconditions;
 
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ServiceLoader;
 import java.util.Set;
-
-import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import java.util.stream.Collectors;
 
 /** Factory for {@link FlinkCatalog}. */
-public class FlinkCatalogFactory implements CatalogFactory {
+public class FlinkCatalogFactory implements org.apache.flink.table.factories.CatalogFactory {
 
     public static final String IDENTIFIER = "table-store";
 
+    public static final ConfigOption<String> CATALOG_TYPE =

Review Comment:
   We have created a Catalog in Flink, and also specified its catalog type as table-store, and then this key feels a bit difficult to understand.
   Can we consider renaming it to
   - metastore.type: hive
   - metastore.uri: hive_uri



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org