You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/05 12:06:30 UTC
[flink-table-store] branch release-0.3 updated: [FLINK-30555] Hive cluster can not read oss/s3 tables
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.3 by this push:
new 1574b224 [FLINK-30555] Hive cluster can not read oss/s3 tables
1574b224 is described below
commit 1574b224df0e88f82f5e374507938f8e38905791
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jan 5 20:05:41 2023 +0800
[FLINK-30555] Hive cluster can not read oss/s3 tables
This closes #456
---
docs/content/docs/filesystems/oss.md | 10 +++++--
docs/content/docs/filesystems/s3.md | 10 +++++--
.../flink/table/store/TableStoreJobConf.java | 34 ++++++++++++++++++++-
.../apache/flink/table/store/hive/HiveSchema.java | 35 ++--------------------
.../store/hive/TableStoreHiveStorageHandler.java | 2 +-
.../table/store/mapred/TableStoreInputFormat.java | 3 ++
6 files changed, 54 insertions(+), 40 deletions(-)
diff --git a/docs/content/docs/filesystems/oss.md b/docs/content/docs/filesystems/oss.md
index 2423ff16..1e0bbc0c 100644
--- a/docs/content/docs/filesystems/oss.md
+++ b/docs/content/docs/filesystems/oss.md
@@ -72,16 +72,20 @@ spark-sql \
{{< tab "Hive" >}}
+NOTE: You need to ensure that Hive metastore can access `oss`.
+
Place `flink-table-store-oss-{{< version >}}.jar` together with `flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
```sql
SET tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com;
SET tablestore.fs.oss.accessKeyId=xxx;
SET tablestore.fs.oss.accessKeySecret=yyy;
+```
-CREATE EXTERNAL TABLE external_test_table
-STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
-LOCATION 'oss://<bucket-name>/<db-name>.db/<table-name>';
+And read table from hive metastore, table can be created by Flink or Spark, see [Catalog with Hive Metastore]({{< ref "docs/how-to/creating-catalogs" >}})
+```sql
+SELECT * FROM test_table;
+SELECT COUNT(1) FROM test_table;
```
{{< /tab >}}
diff --git a/docs/content/docs/filesystems/s3.md b/docs/content/docs/filesystems/s3.md
index 61505598..02179960 100644
--- a/docs/content/docs/filesystems/s3.md
+++ b/docs/content/docs/filesystems/s3.md
@@ -72,16 +72,20 @@ spark-sql \
{{< tab "Hive" >}}
+NOTE: You need to ensure that Hive metastore can access `s3`.
+
Place `flink-table-store-s3-{{< version >}}.jar` together with `flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
```sql
SET tablestore.s3.endpoint=your-endpoint-hostname;
SET tablestore.s3.access-key=xxx;
SET tablestore.s3.secret-key=yyy;
+```
-CREATE EXTERNAL TABLE external_test_table
-STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
-LOCATION 's3://<bucket>/<endpoint>';
+And read table from hive metastore, table can be created by Flink or Spark, see [Catalog with Hive Metastore]({{< ref "docs/how-to/creating-catalogs" >}})
+```sql
+SELECT * FROM test_table;
+SELECT COUNT(1) FROM test_table;
```
{{< /tab >}}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
index db3f6821..cf69b566 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
@@ -18,9 +18,13 @@
package org.apache.flink.table.store;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.mapred.JobConf;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -31,6 +35,9 @@ import java.util.Properties;
public class TableStoreJobConf {
private static final String INTERNAL_LOCATION = "table-store.internal.location";
+ private static final String INTERNAL_CATALOG_CONFIG = "table-store.catalog.config";
+
+ private static final String TABLE_STORE_PREFIX = "tablestore.";
private final JobConf jobConf;
@@ -38,7 +45,11 @@ public class TableStoreJobConf {
this.jobConf = jobConf;
}
- public static void configureInputJobProperties(Properties properties, Map<String, String> map) {
+ public static void configureInputJobProperties(
+ Configuration configuration, Properties properties, Map<String, String> map) {
+ map.put(
+ INTERNAL_CATALOG_CONFIG,
+ JsonSerdeUtil.toJson(extractCatalogConfig(configuration).toMap()));
map.put(
INTERNAL_LOCATION,
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
@@ -47,4 +58,25 @@ public class TableStoreJobConf {
public String getLocation() {
return jobConf.get(INTERNAL_LOCATION);
}
+
+ @SuppressWarnings("unchecked")
+ public org.apache.flink.configuration.Configuration getCatalogConfig() {
+ return org.apache.flink.configuration.Configuration.fromMap(
+ JsonSerdeUtil.fromJson(jobConf.get(INTERNAL_CATALOG_CONFIG), Map.class));
+ }
+
+ /** Extract table store catalog conf from Hive conf. */
+ public static org.apache.flink.configuration.Configuration extractCatalogConfig(
+ Configuration hiveConf) {
+ Map<String, String> configMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : hiveConf) {
+ String name = entry.getKey();
+ if (name.startsWith(TABLE_STORE_PREFIX)) {
+ String value = hiveConf.get(name);
+ name = name.substring(TABLE_STORE_PREFIX.length());
+ configMap.put(name, value);
+ }
+ }
+ return org.apache.flink.configuration.Configuration.fromMap(configMap);
+ }
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
index 6b5ab2f5..d44856cd 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
@@ -36,18 +36,16 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.TableStoreJobConf.extractCatalogConfig;
+
/** Column names, types and comments of a Hive table. */
public class HiveSchema {
- private static final String TABLE_STORE_PREFIX = "tablestore.";
-
private final TableSchema tableSchema;
private HiveSchema(TableSchema tableSchema) {
@@ -81,10 +79,7 @@ public class HiveSchema {
}
Path path = new Path(location);
if (configuration != null) {
- org.apache.flink.configuration.Configuration flinkConf =
- org.apache.flink.configuration.Configuration.fromMap(
- getPropsWithPrefix(configuration, TABLE_STORE_PREFIX));
- FileSystems.initialize(path, flinkConf);
+ FileSystems.initialize(path, extractCatalogConfig(configuration));
}
TableSchema tableSchema =
new SchemaManager(path)
@@ -117,30 +112,6 @@ public class HiveSchema {
return new HiveSchema(tableSchema);
}
- /**
- * Constructs a mapping of configuration and includes all properties that start with the
- * specified configuration prefix. Property names in the mapping are trimmed to remove the
- * configuration prefix.
- *
- * <p>Note: this is directly copied from {@link Configuration} to make E2E test happy, since
- * this method is introduced since 2.8 but we are using a hive container with hadoop-2.7.4.
- *
- * @param confPrefix configuration prefix
- * @return mapping of configuration properties with prefix stripped
- */
- private static Map<String, String> getPropsWithPrefix(Configuration conf, String confPrefix) {
- Map<String, String> configMap = new HashMap<>();
- for (Map.Entry<String, String> entry : conf) {
- String name = entry.getKey();
- if (name.startsWith(confPrefix)) {
- String value = conf.get(name);
- name = name.substring(confPrefix.length());
- configMap.put(name, value);
- }
- }
- return configMap;
- }
-
private static void checkSchemaMatched(
List<String> names, List<TypeInfo> typeInfos, TableSchema tableSchema) {
List<String> ddlNames = new ArrayList<>(names);
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
index 533e79bc..fa725d80 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
@@ -74,7 +74,7 @@ public class TableStoreHiveStorageHandler
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
Properties properties = tableDesc.getProperties();
- TableStoreJobConf.configureInputJobProperties(properties, map);
+ TableStoreJobConf.configureInputJobProperties(conf, properties, map);
}
public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> map) {}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 34cd506c..3954ec65 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -19,12 +19,14 @@
package org.apache.flink.table.store.mapred;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.SearchArgumentToPredicateConverter;
import org.apache.flink.table.store.TableStoreJobConf;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.source.DataTableScan;
@@ -77,6 +79,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, wrapper.getLocation());
+ FileSystems.initialize(new Path(wrapper.getLocation()), wrapper.getCatalogConfig());
return FileStoreTableFactory.create(conf);
}