You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/14 01:57:49 UTC
[hudi] branch master updated: [HUDI-5010] Fix flink hive catalog external config not work (#6923)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bec05ff4c5 [HUDI-5010] Fix flink hive catalog external config not work (#6923)
bec05ff4c5 is described below
commit bec05ff4c54f22d4e143c008ce38e5a16205fc9e
Author: 吴祥平 <40...@qq.com>
AuthorDate: Fri Oct 14 09:57:41 2022 +0800
[HUDI-5010] Fix flink hive catalog external config not work (#6923)
* fix flink catalog external config not work
---
.../hudi/table/catalog/HoodieCatalogFactory.java | 4 +--
.../hudi/table/catalog/HoodieHiveCatalog.java | 16 +++++------
.../hudi/table/catalog/HoodieCatalogTestUtils.java | 10 +++++--
.../table/catalog/TestHoodieCatalogFactory.java | 1 +
.../hudi/table/catalog/TestHoodieHiveCatalog.java | 31 +++++++++-------------
5 files changed, 30 insertions(+), 32 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
index ef33b1c991..436b836eff 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
@@ -56,9 +56,7 @@ public class HoodieCatalogFactory implements CatalogFactory {
case "hms":
return new HoodieHiveCatalog(
context.getName(),
- helper.getOptions().get(CatalogOptions.CATALOG_PATH),
- helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
- helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
+ (Configuration) helper.getOptions());
case "dfs":
return new HoodieCatalog(
context.getName(),
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 89172f22f5..01b73b8605 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -101,7 +101,6 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.hudi.configuration.FlinkOptions.PATH;
-import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
@@ -117,21 +116,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
// optional catalog base path: used for db/table path inference.
private final String catalogPath;
+ private final boolean external;
- public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) {
- this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
+ public HoodieHiveCatalog(String catalogName, Configuration options) {
+ this(catalogName, options, HoodieCatalogUtil.createHiveConf(options.getString(CatalogOptions.HIVE_CONF_DIR)), false);
}
public HoodieHiveCatalog(
String catalogName,
- String catalogPath,
- String defaultDatabase,
+ Configuration options,
HiveConf hiveConf,
boolean allowEmbedded) {
- super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
+ super(catalogName, options.getString(CatalogOptions.DEFAULT_DATABASE));
// fallback to hive.metastore.warehouse.dir if catalog path is not specified
- this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath;
this.hiveConf = hiveConf;
+ this.catalogPath = options.getString(CatalogOptions.CATALOG_PATH, hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE));
+ this.external = options.getBoolean(CatalogOptions.TABLE_EXTERNAL);
if (!allowEmbedded) {
checkArgument(
!HoodieCatalogUtil.isEmbeddedMetastore(this.hiveConf),
@@ -512,7 +512,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
Map<String, String> properties = new HashMap<>(table.getOptions());
- if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
+ if (external) {
hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
properties.put("EXTERNAL", "TRUE");
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
index 04bae73768..c98b4ac0da 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.catalog;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,10 +43,15 @@ public class HoodieCatalogTestUtils {
}
public static HoodieHiveCatalog createHiveCatalog(String name) {
+ return createHiveCatalog(name, false);
+ }
+
+ public static HoodieHiveCatalog createHiveCatalog(String name, boolean external) {
+ Configuration options = new Configuration();
+ options.setBoolean(CatalogOptions.TABLE_EXTERNAL, external);
return new HoodieHiveCatalog(
name,
- null,
- null,
+ options,
createHiveConf(),
true);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java
index 961a340e19..6e7ee2e8f8 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java
@@ -78,6 +78,7 @@ public class TestHoodieCatalogFactory {
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactory.IDENTIFIER);
options.put(CatalogOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath());
options.put(CatalogOptions.MODE.key(), "hms");
+ options.put(CatalogOptions.TABLE_EXTERNAL.key(), "false");
final Catalog actualCatalog =
FactoryUtil.createCatalog(
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 014933f39e..ffae71d6b2 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.catalog;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieCatalogException;
@@ -35,7 +36,6 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.jupiter.api.AfterAll;
@@ -44,7 +44,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Collections;
@@ -55,7 +54,6 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -149,28 +147,23 @@ public class TestHoodieHiveCatalog {
assertEquals("id", table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testCreateExternalTable(boolean isExternal) throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException {
+ @Test
+ public void testCreateExternalTable() throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException {
+ HoodieHiveCatalog catalog = HoodieCatalogTestUtils.createHiveCatalog("myCatalog", true);
+ catalog.open();
Map<String, String> originOptions = new HashMap<>();
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
- originOptions.put(CatalogOptions.TABLE_EXTERNAL.key(), String.valueOf(isExternal));
CatalogTable table =
new CatalogTableImpl(schema, originOptions, "hudi table");
- hoodieCatalog.createTable(tablePath, table, false);
- Table table1 = hoodieCatalog.getHiveTable(tablePath);
- if (isExternal) {
- assertTrue(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
- assertEquals("EXTERNAL_TABLE", table1.getTableType());
- } else {
- assertFalse(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
- assertEquals("MANAGED_TABLE", table1.getTableType());
- }
+ catalog.createTable(tablePath, table, false);
+ Table table1 = catalog.getHiveTable(tablePath);
+ assertTrue(Boolean.parseBoolean(table1.getParameters().get("EXTERNAL")));
+ assertEquals("EXTERNAL_TABLE", table1.getTableType());
- hoodieCatalog.dropTable(tablePath, false);
+ catalog.dropTable(tablePath, false);
Path path = new Path(table1.getParameters().get(FlinkOptions.PATH.key()));
- boolean exists = StreamerUtil.fileExists(FileSystem.getLocal(new Configuration()), path);
- assertTrue(isExternal && exists || !isExternal && !exists);
+ boolean created = StreamerUtil.fileExists(FSUtils.getFs(path, new Configuration()), path);
+ assertTrue(created, "Table should have been created");
}
@Test