You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/29 03:38:16 UTC
[flink-table-store] branch master updated: [FLINK-29964] Support Spark/Hive with OSS
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 18cdaee7 [FLINK-29964] Support Spark/Hive with OSS
18cdaee7 is described below
commit 18cdaee7cf6eb3fadeeb958ecb3b3c4a7bd57440
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Nov 29 11:38:11 2022 +0800
[FLINK-29964] Support Spark/Hive with OSS
This closes #369
---
docs/content/docs/filesystem/_index.md | 26 +++++++
docs/content/docs/filesystem/overview.md | 91 ++++++++++++++++++++++
.../flink-table-store-fs-oss-hadoop/pom.xml | 78 +++++++++++++++++++
flink-table-store-filesystem/pom.xml | 38 +++++++++
.../apache/flink/table/store/hive/HiveCatalog.java | 1 -
.../apache/flink/table/store/hive/HiveSchema.java | 42 +++++++++-
.../flink/table/store/hive/TableStoreSerDe.java | 2 +-
.../table/store/hive/HiveTableSchemaTest.java | 13 ++--
.../store/spark/SparkCaseSensitiveConverter.java | 61 +++++++++++++++
.../flink/table/store/spark/SparkCatalog.java | 8 +-
.../flink/table/store/spark/SparkSource.java | 21 ++++-
.../store/spark/SparkCaseSensitiveConverter.java | 62 +++++++++++++++
.../flink/table/store/spark/SparkSource.java | 23 ++++--
pom.xml | 1 +
14 files changed, 449 insertions(+), 18 deletions(-)
diff --git a/docs/content/docs/filesystem/_index.md b/docs/content/docs/filesystem/_index.md
new file mode 100644
index 00000000..f678eaa6
--- /dev/null
+++ b/docs/content/docs/filesystem/_index.md
@@ -0,0 +1,26 @@
+---
+title: FileSystem
+icon: <i class="fa fa-sliders title maindish" aria-hidden="true"></i>
+bold: true
+sectionBreak: true
+bookCollapseSection: true
+weight: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/filesystem/overview.md b/docs/content/docs/filesystem/overview.md
new file mode 100644
index 00000000..b584fbe7
--- /dev/null
+++ b/docs/content/docs/filesystem/overview.md
@@ -0,0 +1,91 @@
+---
+title: "Overview"
+weight: 1
+type: docs
+aliases:
+- /filesystem/overview.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Overview
+
+# File Systems for Unified Engine
+
+Apache Flink Table Store utilizes the same pluggable file systems as Apache Flink. Users can follow the [standard plugin mechanism](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/) to configure the
+plugin structure if using Flink as compute engine. However, for other engines like Spark or Hive, the provided opt jars (by Flink) may get conflicts and cannot be used directly.
+It is not convenient for users to fix class conflicts, thus Flink Table Store provides the self-contained and engine-unified FileSystem pluggable jars for user
+to query tables from Spark/Hive side.
+
+## Supported FileSystem
+
+| FileSystem | URI Scheme | Pluggable | Description |
+|:------------------|:-----------------|-----------|:--------------------------------|
+| Local File System | file:// | N | Built-in Support |
+| Aliyun OSS | oss:// | Y | Tested on Spark3.3 and Hive 3.1 |
+
+## Build
+After [Build Flink Table Store]({{< ref "docs/engines/build" >}}) from the source code, you can find the shaded jars under
+`./flink-table-store-filesystem/flink-table-store-filesystem-${filesystem}/target/flink-table-store-filesystem-${filesystem}-{{< version >}}.jar`.
+
+
+## Common Configurations
+After building, users need pick the required file system jar, and configure the required file system parameters by adding a command/configuration prefix `tablestore`.
+
+For example, if users want set up a Flink job and use OSS as the underlay file system, and want to read from Spark/Hive side.
+
+- On Flink side, configure `flink-conf.yaml` like
+ ```yaml
+ fs.oss.endpoint: oss-cn-hangzhou.aliyun.cs.com
+ fs.oss.accessKey: xxx
+ fs.oss.accessSecret: yyy
+ ```
+
+- On Spark side, place `flink-table-store-filesystem-oss-{{< version >}}.jar` together with `flink-table-store-spark-{{< version >}}.jar` under Spark's jars directory, and start like
+ - Spark Shell
+ ```shell
+ spark-shell \
+ --conf spark.datasource.tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com \
+ --conf spark.datasource.tablestore.fs.oss.accessKey=xxx \
+ --conf spark.datasource.tablestore.fs.oss.accessSecret=yyy
+ ```
+ - Spark SQL
+
+ ```shell
+ spark-sql \
+ --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
+ --conf spark.sql.catalog.tablestore.warehouse=oss://<bucket-name>/ \
+ --conf spark.sql.catalog.tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com \
+ --conf spark.sql.catalog.tablestore.fs.oss.accessKey=xxx \
+ --conf spark.sql.catalog.tablestore.fs.oss.accessSecret=yyy
+ ```
+- On Hive side, place `flink-table-store-filesystem-oss-{{< version >}}.jar` together with `flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
+ - Hive Catalog
+ ```sql
+ SET tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com;
+ SET tablestore.fs.oss.accessKey=xxx;
+ SET tablestore.fs.oss.accessSecret=yyy;
+
+ CREATE EXTERNAL TABLE external_test_table
+ STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
+ LOCATION 'oss://<bucket-name>/<db-name>.db/<table-name>';
+ ```
+
+
+
diff --git a/flink-table-store-filesystem/flink-table-store-fs-oss-hadoop/pom.xml b/flink-table-store-filesystem/flink-table-store-fs-oss-hadoop/pom.xml
new file mode 100644
index 00000000..f5801981
--- /dev/null
+++ b/flink-table-store-filesystem/flink-table-store-fs-oss-hadoop/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table-store-filesystem</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.3-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-store-fs-oss-hadoop</artifactId>
+ <name>Flink Table Store : FileSystem : OSS FS</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-oss-fs-hadoop</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-oss</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.commons.lang3</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons.lang3</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common.base</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.com.google.common.base</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common.cache</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.com.google.common.cache</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-table-store-filesystem/pom.xml b/flink-table-store-filesystem/pom.xml
new file mode 100644
index 00000000..dcf3dfa8
--- /dev/null
+++ b/flink-table-store-filesystem/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table-store-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.3-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-store-filesystem</artifactId>
+ <name>Flink Table Store : FileSystem</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-table-store-fs-oss-hadoop</module>
+ </modules>
+
+</project>
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 7c8f1b23..8451628a 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -150,7 +150,6 @@ public class HiveCatalog extends AbstractCatalog {
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
try {
return client.getAllTables(databaseName).stream()
- .parallel()
.filter(
tableName ->
tableStoreTableExists(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
index f482ac34..8a26404d 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
@@ -18,21 +18,28 @@
package org.apache.flink.table.store.hive;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -40,6 +47,8 @@ import java.util.stream.Collectors;
/** Column names, types and comments of a Hive table. */
public class HiveSchema {
+ private static final String TABLE_STORE_PREFIX = "tablestore.";
+
private final TableSchema tableSchema;
private HiveSchema(TableSchema tableSchema) {
@@ -61,7 +70,7 @@ public class HiveSchema {
}
/** Extract {@link HiveSchema} from Hive serde properties. */
- public static HiveSchema extract(Properties properties) {
+ public static HiveSchema extract(@Nullable Configuration configuration, Properties properties) {
String location = properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
if (location == null) {
String tableName = properties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
@@ -71,6 +80,13 @@ public class HiveSchema {
+ ". Currently Flink table store only supports external table for Hive "
+ "so location property must be set.");
}
+ if (configuration != null) {
+ org.apache.flink.configuration.Configuration flinkConf =
+ org.apache.flink.configuration.Configuration.fromMap(
+ getPropsWithPrefix(configuration, TABLE_STORE_PREFIX));
+ FileSystem.initialize(
+ flinkConf, PluginUtils.createPluginManagerFromRootFolder(flinkConf));
+ }
TableSchema tableSchema =
new SchemaManager(new Path(location))
.latest()
@@ -102,6 +118,30 @@ public class HiveSchema {
return new HiveSchema(tableSchema);
}
+ /**
+ * Constructs a mapping of configuration and includes all properties that start with the
+ * specified configuration prefix. Property names in the mapping are trimmed to remove the
+ * configuration prefix.
+ *
+ * <p>Note: this is directly copied from {@link Configuration} to make E2E test happy, since
+ * this method is introduced since 2.8 but we are using a hive container with hadoop-2.7.4.
+ *
+ * @param confPrefix configuration prefix
+ * @return mapping of configuration properties with prefix stripped
+ */
+ private static Map<String, String> getPropsWithPrefix(Configuration conf, String confPrefix) {
+ Map<String, String> configMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : conf) {
+ String name = entry.getKey();
+ if (name.startsWith(confPrefix)) {
+ String value = conf.get(name);
+ name = name.substring(confPrefix.length());
+ configMap.put(name, value);
+ }
+ }
+ return configMap;
+ }
+
private static void checkSchemaMatched(
List<String> names, List<TypeInfo> typeInfos, TableSchema tableSchema) {
List<String> ddlNames = new ArrayList<>(names);
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
index 253ed4e2..0b79c5c3 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
@@ -44,7 +44,7 @@ public class TableStoreSerDe extends AbstractSerDe {
@Override
public void initialize(@Nullable Configuration configuration, Properties properties)
throws SerDeException {
- HiveSchema schema = HiveSchema.extract(properties);
+ HiveSchema schema = HiveSchema.extract(configuration, properties);
inspector =
new TableStoreRowDataObjectInspector(
schema.fieldNames(), schema.fieldTypes(), schema.fieldComments());
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
index 0b1864a0..889f11af 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
@@ -69,7 +69,7 @@ public class HiveTableSchemaTest {
TypeInfoFactory.getDecimalTypeInfo(5, 3).getTypeName())));
properties.setProperty("location", tempDir.toString());
- HiveSchema schema = HiveSchema.extract(properties);
+ HiveSchema schema = HiveSchema.extract(null, properties);
assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b", "c"));
assertThat(schema.fieldTypes())
.isEqualTo(
@@ -90,7 +90,7 @@ public class HiveTableSchemaTest {
properties.setProperty("columns.types", "");
properties.setProperty("location", tempDir.toString());
- HiveSchema schema = HiveSchema.extract(properties);
+ HiveSchema schema = HiveSchema.extract(null, properties);
assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b", "c"));
assertThat(schema.fieldTypes())
.isEqualTo(
@@ -133,7 +133,8 @@ public class HiveTableSchemaTest {
"Hive DDL : c decimal(6,3)",
"Table Store Schema: c decimal(5,3)");
IllegalArgumentException exception =
- assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+ assertThrows(
+ IllegalArgumentException.class, () -> HiveSchema.extract(null, properties));
assertThat(exception).hasMessageContaining(expected);
}
@@ -161,7 +162,8 @@ public class HiveTableSchemaTest {
"Hive DDL : null",
"Table Store Schema: c decimal(5,3)");
IllegalArgumentException exception =
- assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+ assertThrows(
+ IllegalArgumentException.class, () -> HiveSchema.extract(null, properties));
assertThat(exception).hasMessageContaining(expected);
}
@@ -198,7 +200,8 @@ public class HiveTableSchemaTest {
"Hive DDL : e string",
"Table Store Schema: null");
IllegalArgumentException exception =
- assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+ assertThrows(
+ IllegalArgumentException.class, () -> HiveSchema.extract(null, properties));
assertThat(exception).hasMessageContaining(expected);
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
new file mode 100644
index 00000000..562834f9
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This util convert lowercase key to case-sensitive key. The reason is that during {@link
+ * SparkCatalog} initialization, {@link org.apache.spark.sql.connector.catalog.Catalogs} puts all
+ * configuration to a {@link org.apache.spark.sql.util.CaseInsensitiveStringMap}. However, {@code
+ * org.apache.hadoop.fs.aliyun.oss.Constants} maintains case-sensitive keys to initialize the
+ * Hadoop-Oss FileSystem, which needs to be converted back.
+ */
+public class SparkCaseSensitiveConverter {
+
+ private static final Set<String> CASE_SENSITIVE_KEYS = new HashSet<>();
+
+ // OSS access verification
+ private static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
+ private static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+ private static final String SECURITY_TOKEN = "fs.oss.securityToken";
+
+ static {
+ CASE_SENSITIVE_KEYS.add(ACCESS_KEY_ID);
+ CASE_SENSITIVE_KEYS.add(ACCESS_KEY_SECRET);
+ CASE_SENSITIVE_KEYS.add(SECURITY_TOKEN);
+ }
+
+ public static Map<String, String> convert(Map<String, String> caseInsensitiveOptions) {
+ Map<String, String> options = new HashMap<>(caseInsensitiveOptions);
+ CASE_SENSITIVE_KEYS.forEach(
+ key -> {
+ String lowercaseKey = key.toLowerCase();
+ if (caseInsensitiveOptions.containsKey(lowercaseKey)) {
+ options.put(key, options.remove(lowercaseKey));
+ }
+ });
+ return options;
+ }
+
+ private SparkCaseSensitiveConverter() {}
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index d4db28db..0c9bef86 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
@@ -69,7 +71,11 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.name = name;
- this.catalog = CatalogFactory.createCatalog(Configuration.fromMap(options));
+ Configuration configuration =
+ Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
+ FileSystem.initialize(
+ configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
+ this.catalog = CatalogFactory.createCatalog(configuration);
}
@Override
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 87887a2b..8f60fde8 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,10 +19,12 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
@@ -31,12 +33,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.Map;
/** The spark source for table store. */
-public class SparkSource implements DataSourceRegister, TableProvider {
+public class SparkSource implements DataSourceRegister, SessionConfigSupport {
+
+ /** Not use 'table-store' here, the '-' is not allowed in SQL. */
+ private static final String SHORT_NAME = "tablestore";
@Override
public String shortName() {
- // Not use 'table-store' here, the '-' is not allowed in SQL
- return "tablestore";
+ return SHORT_NAME;
}
@Override
@@ -61,6 +65,15 @@ public class SparkSource implements DataSourceRegister, TableProvider {
@Override
public Table getTable(
StructType schema, Transform[] partitioning, Map<String, String> options) {
+ Configuration configuration =
+ Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
+ FileSystem.initialize(
+ configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
return new SparkTable(FileStoreTableFactory.create(Configuration.fromMap(options)));
}
+
+ @Override
+ public String keyPrefix() {
+ return SHORT_NAME;
+ }
}
diff --git a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
new file mode 100644
index 00000000..e662430d
--- /dev/null
+++ b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This util convert lowercase key to case-sensitive key. The reason is that during {@link
+ * SparkSource} initialization, {@link
+ * org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils} puts all configuration to a
+ * {@link org.apache.spark.sql.catalyst.util.CaseInsensitiveMap}. However, {@code
+ * org.apache.hadoop.fs.aliyun.oss.Constants} maintains case-sensitive keys to initialize the
+ * Hadoop-Oss FileSystem, which needs to be converted back.
+ */
+public class SparkCaseSensitiveConverter {
+
+ private static final Set<String> CASE_SENSITIVE_KEYS = new HashSet<>();
+
+ // OSS access verification
+ private static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
+ private static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+ private static final String SECURITY_TOKEN = "fs.oss.securityToken";
+
+ static {
+ CASE_SENSITIVE_KEYS.add(ACCESS_KEY_ID);
+ CASE_SENSITIVE_KEYS.add(ACCESS_KEY_SECRET);
+ CASE_SENSITIVE_KEYS.add(SECURITY_TOKEN);
+ }
+
+ public static Map<String, String> convert(DataSourceOptions options) {
+ Map<String, String> newOptions = new HashMap<>(options.asMap());
+ CASE_SENSITIVE_KEYS.forEach(
+ key -> {
+ String lowercaseKey = key.toLowerCase();
+ if (newOptions.containsKey(lowercaseKey)) {
+ newOptions.put(key, newOptions.remove(lowercaseKey));
+ }
+ });
+ return newOptions;
+ }
+}
diff --git a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 0f8dd8f4..2050b63a 100644
--- a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,21 +19,26 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.SessionConfigSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.types.StructType;
/** The Spark source for table store. */
-public class SparkSource implements DataSourceRegister, ReadSupport {
+public class SparkSource implements DataSourceRegister, ReadSupport, SessionConfigSupport {
+
+ /** Not use 'table-store' here, the '-' is not allowed in SQL. */
+ private static final String SHORT_NAME = "tablestore";
@Override
public String shortName() {
- // Not use 'table-store' here, the '-' is not allowed in SQL
- return "tablestore";
+ return SHORT_NAME;
}
@Override
@@ -43,7 +48,15 @@ public class SparkSource implements DataSourceRegister, ReadSupport {
@Override
public DataSourceReader createReader(DataSourceOptions options) {
- return new SparkDataSourceReader(
- FileStoreTableFactory.create(Configuration.fromMap(options.asMap())));
+ Configuration configuration =
+ Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
+ FileSystem.initialize(
+ configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
+ return new SparkDataSourceReader(FileStoreTableFactory.create(configuration));
+ }
+
+ @Override
+ public String keyPrefix() {
+ return SHORT_NAME;
}
}
diff --git a/pom.xml b/pom.xml
index b04f1dfc..f067f1be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@ under the License.
<module>flink-table-store-dist</module>
<module>flink-table-store-docs</module>
<module>flink-table-store-e2e-tests</module>
+ <module>flink-table-store-filesystem</module>
<module>flink-table-store-format</module>
<module>flink-table-store-shade</module>
<module>flink-table-store-hive</module>