You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/05 12:06:30 UTC

[flink-table-store] branch release-0.3 updated: [FLINK-30555] Hive cluster can not read oss/s3 tables

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.3 by this push:
     new 1574b224 [FLINK-30555] Hive cluster can not read oss/s3 tables
1574b224 is described below

commit 1574b224df0e88f82f5e374507938f8e38905791
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jan 5 20:05:41 2023 +0800

    [FLINK-30555] Hive cluster can not read oss/s3 tables
    
    This closes #456
---
 docs/content/docs/filesystems/oss.md               | 10 +++++--
 docs/content/docs/filesystems/s3.md                | 10 +++++--
 .../flink/table/store/TableStoreJobConf.java       | 34 ++++++++++++++++++++-
 .../apache/flink/table/store/hive/HiveSchema.java  | 35 ++--------------------
 .../store/hive/TableStoreHiveStorageHandler.java   |  2 +-
 .../table/store/mapred/TableStoreInputFormat.java  |  3 ++
 6 files changed, 54 insertions(+), 40 deletions(-)

diff --git a/docs/content/docs/filesystems/oss.md b/docs/content/docs/filesystems/oss.md
index 2423ff16..1e0bbc0c 100644
--- a/docs/content/docs/filesystems/oss.md
+++ b/docs/content/docs/filesystems/oss.md
@@ -72,16 +72,20 @@ spark-sql \
 
 {{< tab "Hive" >}}
 
+NOTE: You need to ensure that Hive metastore can access `oss`.
+
 Place `flink-table-store-oss-{{< version >}}.jar` together with `flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
 
 ```sql
 SET tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com;
 SET tablestore.fs.oss.accessKeyId=xxx;
 SET tablestore.fs.oss.accessKeySecret=yyy;
+```
 
-CREATE EXTERNAL TABLE external_test_table
-STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
-LOCATION 'oss://<bucket-name>/<db-name>.db/<table-name>';
+And read table from hive metastore, table can be created by Flink or Spark, see [Catalog with Hive Metastore]({{< ref "docs/how-to/creating-catalogs" >}})
+```sql
+SELECT * FROM test_table;
+SELECT COUNT(1) FROM test_table;
 ```
 
 {{< /tab >}}
diff --git a/docs/content/docs/filesystems/s3.md b/docs/content/docs/filesystems/s3.md
index 61505598..02179960 100644
--- a/docs/content/docs/filesystems/s3.md
+++ b/docs/content/docs/filesystems/s3.md
@@ -72,16 +72,20 @@ spark-sql \
 
 {{< tab "Hive" >}}
 
+NOTE: You need to ensure that Hive metastore can access `s3`.
+
 Place `flink-table-store-s3-{{< version >}}.jar` together with `flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
 
 ```sql
 SET tablestore.s3.endpoint=your-endpoint-hostname;
 SET tablestore.s3.access-key=xxx;
 SET tablestore.s3.secret-key=yyy;
+```
 
-CREATE EXTERNAL TABLE external_test_table
-STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
-LOCATION 's3://<bucket>/<endpoint>';
+And read table from hive metastore, table can be created by Flink or Spark, see [Catalog with Hive Metastore]({{< ref "docs/how-to/creating-catalogs" >}})
+```sql
+SELECT * FROM test_table;
+SELECT COUNT(1) FROM test_table;
 ```
 
 {{< /tab >}}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
index db3f6821..cf69b566 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.table.store;
 
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.mapred.JobConf;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -31,6 +35,9 @@ import java.util.Properties;
 public class TableStoreJobConf {
 
     private static final String INTERNAL_LOCATION = "table-store.internal.location";
+    private static final String INTERNAL_CATALOG_CONFIG = "table-store.catalog.config";
+
+    private static final String TABLE_STORE_PREFIX = "tablestore.";
 
     private final JobConf jobConf;
 
@@ -38,7 +45,11 @@ public class TableStoreJobConf {
         this.jobConf = jobConf;
     }
 
-    public static void configureInputJobProperties(Properties properties, Map<String, String> map) {
+    public static void configureInputJobProperties(
+            Configuration configuration, Properties properties, Map<String, String> map) {
+        map.put(
+                INTERNAL_CATALOG_CONFIG,
+                JsonSerdeUtil.toJson(extractCatalogConfig(configuration).toMap()));
         map.put(
                 INTERNAL_LOCATION,
                 properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
@@ -47,4 +58,25 @@ public class TableStoreJobConf {
     public String getLocation() {
         return jobConf.get(INTERNAL_LOCATION);
     }
+
+    @SuppressWarnings("unchecked")
+    public org.apache.flink.configuration.Configuration getCatalogConfig() {
+        return org.apache.flink.configuration.Configuration.fromMap(
+                JsonSerdeUtil.fromJson(jobConf.get(INTERNAL_CATALOG_CONFIG), Map.class));
+    }
+
+    /** Extract table store catalog conf from Hive conf. */
+    public static org.apache.flink.configuration.Configuration extractCatalogConfig(
+            Configuration hiveConf) {
+        Map<String, String> configMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : hiveConf) {
+            String name = entry.getKey();
+            if (name.startsWith(TABLE_STORE_PREFIX)) {
+                String value = hiveConf.get(name);
+                name = name.substring(TABLE_STORE_PREFIX.length());
+                configMap.put(name, value);
+            }
+        }
+        return org.apache.flink.configuration.Configuration.fromMap(configMap);
+    }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
index 6b5ab2f5..d44856cd 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
@@ -36,18 +36,16 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.TableStoreJobConf.extractCatalogConfig;
+
 /** Column names, types and comments of a Hive table. */
 public class HiveSchema {
 
-    private static final String TABLE_STORE_PREFIX = "tablestore.";
-
     private final TableSchema tableSchema;
 
     private HiveSchema(TableSchema tableSchema) {
@@ -81,10 +79,7 @@ public class HiveSchema {
         }
         Path path = new Path(location);
         if (configuration != null) {
-            org.apache.flink.configuration.Configuration flinkConf =
-                    org.apache.flink.configuration.Configuration.fromMap(
-                            getPropsWithPrefix(configuration, TABLE_STORE_PREFIX));
-            FileSystems.initialize(path, flinkConf);
+            FileSystems.initialize(path, extractCatalogConfig(configuration));
         }
         TableSchema tableSchema =
                 new SchemaManager(path)
@@ -117,30 +112,6 @@ public class HiveSchema {
         return new HiveSchema(tableSchema);
     }
 
-    /**
-     * Constructs a mapping of configuration and includes all properties that start with the
-     * specified configuration prefix. Property names in the mapping are trimmed to remove the
-     * configuration prefix.
-     *
-     * <p>Note: this is directly copied from {@link Configuration} to make E2E test happy, since
-     * this method is introduced since 2.8 but we are using a hive container with hadoop-2.7.4.
-     *
-     * @param confPrefix configuration prefix
-     * @return mapping of configuration properties with prefix stripped
-     */
-    private static Map<String, String> getPropsWithPrefix(Configuration conf, String confPrefix) {
-        Map<String, String> configMap = new HashMap<>();
-        for (Map.Entry<String, String> entry : conf) {
-            String name = entry.getKey();
-            if (name.startsWith(confPrefix)) {
-                String value = conf.get(name);
-                name = name.substring(confPrefix.length());
-                configMap.put(name, value);
-            }
-        }
-        return configMap;
-    }
-
     private static void checkSchemaMatched(
             List<String> names, List<TypeInfo> typeInfos, TableSchema tableSchema) {
         List<String> ddlNames = new ArrayList<>(names);
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
index 533e79bc..fa725d80 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
@@ -74,7 +74,7 @@ public class TableStoreHiveStorageHandler
     @Override
     public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
         Properties properties = tableDesc.getProperties();
-        TableStoreJobConf.configureInputJobProperties(properties, map);
+        TableStoreJobConf.configureInputJobProperties(conf, properties, map);
     }
 
     public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> map) {}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 34cd506c..3954ec65 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -19,12 +19,14 @@
 package org.apache.flink.table.store.mapred;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.SearchArgumentToPredicateConverter;
 import org.apache.flink.table.store.TableStoreJobConf;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.filesystem.FileSystems;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.source.DataTableScan;
@@ -77,6 +79,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
         TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, wrapper.getLocation());
+        FileSystems.initialize(new Path(wrapper.getLocation()), wrapper.getCatalogConfig());
         return FileStoreTableFactory.create(conf);
     }