You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/14 01:57:49 UTC

[hudi] branch master updated: [HUDI-5010] Fix flink hive catalog external config not work (#6923)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bec05ff4c5 [HUDI-5010] Fix flink hive catalog external config not work (#6923)
bec05ff4c5 is described below

commit bec05ff4c54f22d4e143c008ce38e5a16205fc9e
Author: 吴祥平 <40...@qq.com>
AuthorDate: Fri Oct 14 09:57:41 2022 +0800

    [HUDI-5010] Fix flink hive catalog external config not work (#6923)
    
    * fix flink catalog external config not work
---
 .../hudi/table/catalog/HoodieCatalogFactory.java   |  4 +--
 .../hudi/table/catalog/HoodieHiveCatalog.java      | 16 +++++------
 .../hudi/table/catalog/HoodieCatalogTestUtils.java | 10 +++++--
 .../table/catalog/TestHoodieCatalogFactory.java    |  1 +
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  | 31 +++++++++-------------
 5 files changed, 30 insertions(+), 32 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
index ef33b1c991..436b836eff 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
@@ -56,9 +56,7 @@ public class HoodieCatalogFactory implements CatalogFactory {
       case "hms":
         return new HoodieHiveCatalog(
             context.getName(),
-            helper.getOptions().get(CatalogOptions.CATALOG_PATH),
-            helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
-            helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
+            (Configuration) helper.getOptions());
       case "dfs":
         return new HoodieCatalog(
             context.getName(),
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 89172f22f5..01b73b8605 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -101,7 +101,6 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 import static org.apache.hudi.configuration.FlinkOptions.PATH;
-import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
 import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
 import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
 import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
@@ -117,21 +116,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
 
   // optional catalog base path: used for db/table path inference.
   private final String catalogPath;
+  private final boolean external;
 
-  public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) {
-    this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
+  public HoodieHiveCatalog(String catalogName, Configuration options) {
+    this(catalogName, options, HoodieCatalogUtil.createHiveConf(options.getString(CatalogOptions.HIVE_CONF_DIR)), false);
   }
 
   public HoodieHiveCatalog(
       String catalogName,
-      String catalogPath,
-      String defaultDatabase,
+      Configuration options,
       HiveConf hiveConf,
       boolean allowEmbedded) {
-    super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
+    super(catalogName, options.getString(CatalogOptions.DEFAULT_DATABASE));
     // fallback to hive.metastore.warehouse.dir if catalog path is not specified
-    this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath;
     this.hiveConf = hiveConf;
+    this.catalogPath = options.getString(CatalogOptions.CATALOG_PATH, hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE));
+    this.external = options.getBoolean(CatalogOptions.TABLE_EXTERNAL);
     if (!allowEmbedded) {
       checkArgument(
           !HoodieCatalogUtil.isEmbeddedMetastore(this.hiveConf),
@@ -512,7 +512,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
 
     Map<String, String> properties = new HashMap<>(table.getOptions());
 
-    if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
+    if (external) {
       hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
       properties.put("EXTERNAL", "TRUE");
     }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
index 04bae73768..c98b4ac0da 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.hadoop.hive.conf.HiveConf;
 
@@ -42,10 +43,15 @@ public class HoodieCatalogTestUtils {
   }
 
   public static HoodieHiveCatalog createHiveCatalog(String name) {
+    return createHiveCatalog(name, false);
+  }
+
+  public static HoodieHiveCatalog createHiveCatalog(String name, boolean external) {
+    Configuration options = new Configuration();
+    options.setBoolean(CatalogOptions.TABLE_EXTERNAL, external);
     return new HoodieHiveCatalog(
         name,
-        null,
-        null,
+        options,
         createHiveConf(),
         true);
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java
index 961a340e19..6e7ee2e8f8 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java
@@ -78,6 +78,7 @@ public class TestHoodieCatalogFactory {
     options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactory.IDENTIFIER);
     options.put(CatalogOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath());
     options.put(CatalogOptions.MODE.key(), "hms");
+    options.put(CatalogOptions.TABLE_EXTERNAL.key(), "false");
 
     final Catalog actualCatalog =
         FactoryUtil.createCatalog(
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 014933f39e..ffae71d6b2 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieCatalogException;
@@ -35,7 +36,6 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.jupiter.api.AfterAll;
@@ -44,7 +44,6 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -55,7 +54,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -149,28 +147,23 @@ public class TestHoodieHiveCatalog {
     assertEquals("id", table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testCreateExternalTable(boolean isExternal) throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException {
+  @Test
+  public void testCreateExternalTable() throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException {
+    HoodieHiveCatalog catalog = HoodieCatalogTestUtils.createHiveCatalog("myCatalog", true);
+    catalog.open();
     Map<String, String> originOptions = new HashMap<>();
     originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
-    originOptions.put(CatalogOptions.TABLE_EXTERNAL.key(), String.valueOf(isExternal));
     CatalogTable table =
         new CatalogTableImpl(schema, originOptions, "hudi table");
-    hoodieCatalog.createTable(tablePath, table, false);
-    Table table1 = hoodieCatalog.getHiveTable(tablePath);
-    if (isExternal) {
-      assertTrue(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
-      assertEquals("EXTERNAL_TABLE", table1.getTableType());
-    } else {
-      assertFalse(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
-      assertEquals("MANAGED_TABLE", table1.getTableType());
-    }
+    catalog.createTable(tablePath, table, false);
+    Table table1 = catalog.getHiveTable(tablePath);
+    assertTrue(Boolean.parseBoolean(table1.getParameters().get("EXTERNAL")));
+    assertEquals("EXTERNAL_TABLE", table1.getTableType());
 
-    hoodieCatalog.dropTable(tablePath, false);
+    catalog.dropTable(tablePath, false);
     Path path = new Path(table1.getParameters().get(FlinkOptions.PATH.key()));
-    boolean exists = StreamerUtil.fileExists(FileSystem.getLocal(new Configuration()), path);
-    assertTrue(isExternal && exists || !isExternal && !exists);
+    boolean created = StreamerUtil.fileExists(FSUtils.getFs(path, new Configuration()), path);
+    assertTrue(created, "Table should have been created");
   }
 
   @Test