You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/29 03:38:16 UTC

[flink-table-store] branch master updated: [FLINK-29964] Support Spark/Hive with OSS

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 18cdaee7 [FLINK-29964] Support Spark/Hive with OSS
18cdaee7 is described below

commit 18cdaee7cf6eb3fadeeb958ecb3b3c4a7bd57440
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Nov 29 11:38:11 2022 +0800

    [FLINK-29964] Support Spark/Hive with OSS
    
    This closes #369
---
 docs/content/docs/filesystem/_index.md             | 26 +++++++
 docs/content/docs/filesystem/overview.md           | 91 ++++++++++++++++++++++
 .../flink-table-store-fs-oss-hadoop/pom.xml        | 78 +++++++++++++++++++
 flink-table-store-filesystem/pom.xml               | 38 +++++++++
 .../apache/flink/table/store/hive/HiveCatalog.java |  1 -
 .../apache/flink/table/store/hive/HiveSchema.java  | 42 +++++++++-
 .../flink/table/store/hive/TableStoreSerDe.java    |  2 +-
 .../table/store/hive/HiveTableSchemaTest.java      | 13 ++--
 .../store/spark/SparkCaseSensitiveConverter.java   | 61 +++++++++++++++
 .../flink/table/store/spark/SparkCatalog.java      |  8 +-
 .../flink/table/store/spark/SparkSource.java       | 21 ++++-
 .../store/spark/SparkCaseSensitiveConverter.java   | 62 +++++++++++++++
 .../flink/table/store/spark/SparkSource.java       | 23 ++++--
 pom.xml                                            |  1 +
 14 files changed, 449 insertions(+), 18 deletions(-)

diff --git a/docs/content/docs/filesystem/_index.md b/docs/content/docs/filesystem/_index.md
new file mode 100644
index 00000000..f678eaa6
--- /dev/null
+++ b/docs/content/docs/filesystem/_index.md
@@ -0,0 +1,26 @@
+---
+title: FileSystem
+icon: <i class="fa fa-sliders title maindish" aria-hidden="true"></i>
+bold: true
+sectionBreak: true
+bookCollapseSection: true
+weight: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/filesystem/overview.md b/docs/content/docs/filesystem/overview.md
new file mode 100644
index 00000000..b584fbe7
--- /dev/null
+++ b/docs/content/docs/filesystem/overview.md
@@ -0,0 +1,91 @@
+---
+title: "Overview"
+weight: 1
+type: docs
+aliases:
+- /filesystem/overview.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Overview
+
+# File Systems for Unified Engine
+
+Apache Flink Table Store utilizes the same pluggable file systems as Apache Flink. Users can follow the [standard plugin mechanism](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/) to configure the
+plugin structure if using Flink as compute engine. However, for other engines like Spark or Hive, the provided opt jars (by Flink) may get conflicts and cannot be used directly.
+It is not convenient for users to fix class conflicts, thus Flink Table Store provides the self-contained and engine-unified FileSystem pluggable jars for user
+to query tables from Spark/Hive side.
+
+## Supported FileSystem
+
+| FileSystem        | URI Scheme       | Pluggable | Description                     |
+|:------------------|:-----------------|-----------|:--------------------------------|
+| Local File System | file://          | N         | Built-in Support                |
+| Aliyun OSS        | oss://           | Y         | Tested on Spark3.3 and Hive 3.1 |
+
+## Build
+After [Build Flink Table Store]({{< ref "docs/engines/build" >}}) from the source code, you can find the shaded jars under
+`./flink-table-store-filesystem/flink-table-store-filesystem-${filesystem}/target/flink-table-store-filesystem-${filesystem}-{{< version >}}.jar`.
+
+
+## Common Configurations
+After building, users need pick the required file system jar, and configure the required file system parameters by adding a command/configuration prefix `tablestore`.
+
+For example, if users want set up a Flink job and use OSS as the underlay file system, and want to read from Spark/Hive side.
+
+- On Flink side, configure `flink-conf.yaml` like
+    ```yaml
+    fs.oss.endpoint: oss-cn-hangzhou.aliyun.cs.com
+    fs.oss.accessKey: xxx
+    fs.oss.accessSecret: yyy
+    ```
+
+- On Spark side, place `flink-table-store-filesystem-oss-{{< version >}}.jar` together with `flink-table-store-spark-{{< version >}}.jar` under Spark's jars directory, and start like
+  - Spark Shell
+    ```shell
+    spark-shell \ 
+      --conf spark.datasource.tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com \
+      --conf spark.datasource.tablestore.fs.oss.accessKey=xxx \
+      --conf spark.datasource.tablestore.fs.oss.accessSecret=yyy
+    ```
+  - Spark SQL
+  
+    ```shell
+    spark-sql \ 
+      --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
+      --conf spark.sql.catalog.tablestore.warehouse=oss://<bucket-name>/ \
+      --conf spark.sql.catalog.tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com \
+      --conf spark.sql.catalog.tablestore.fs.oss.accessKey=xxx \
+      --conf spark.sql.catalog.tablestore.fs.oss.accessSecret=yyy
+    ```
+- On Hive side, place `flink-table-store-filesystem-oss-{{< version >}}.jar` together with `flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
+  - Hive Catalog
+    ```sql
+    SET tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com;
+    SET tablestore.fs.oss.accessKey=xxx;
+    SET tablestore.fs.oss.accessSecret=yyy;
+
+    CREATE EXTERNAL TABLE external_test_table
+    STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
+    LOCATION 'oss://<bucket-name>/<db-name>.db/<table-name>';
+    ```
+
+
+
diff --git a/flink-table-store-filesystem/flink-table-store-fs-oss-hadoop/pom.xml b/flink-table-store-filesystem/flink-table-store-fs-oss-hadoop/pom.xml
new file mode 100644
index 00000000..f5801981
--- /dev/null
+++ b/flink-table-store-filesystem/flink-table-store-fs-oss-hadoop/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-filesystem</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.3-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-fs-oss-hadoop</artifactId>
+    <name>Flink Table Store : FileSystem : OSS FS</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-oss-fs-hadoop</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-oss</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                            <shadedArtifactAttached>false</shadedArtifactAttached>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.commons.lang3</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons.lang3</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google.common.base</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.com.google.common.base</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google.common.cache</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.com.google.common.cache</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/flink-table-store-filesystem/pom.xml b/flink-table-store-filesystem/pom.xml
new file mode 100644
index 00000000..dcf3dfa8
--- /dev/null
+++ b/flink-table-store-filesystem/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.3-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-filesystem</artifactId>
+    <name>Flink Table Store : FileSystem</name>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>flink-table-store-fs-oss-hadoop</module>
+    </modules>
+
+</project>
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 7c8f1b23..8451628a 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -150,7 +150,6 @@ public class HiveCatalog extends AbstractCatalog {
     public List<String> listTables(String databaseName) throws DatabaseNotExistException {
         try {
             return client.getAllTables(databaseName).stream()
-                    .parallel()
                     .filter(
                             tableName ->
                                     tableStoreTableExists(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
index f482ac34..8a26404d 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
@@ -18,21 +18,28 @@
 
 package org.apache.flink.table.store.hive;
 
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.table.store.file.schema.DataField;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -40,6 +47,8 @@ import java.util.stream.Collectors;
 /** Column names, types and comments of a Hive table. */
 public class HiveSchema {
 
+    private static final String TABLE_STORE_PREFIX = "tablestore.";
+
     private final TableSchema tableSchema;
 
     private HiveSchema(TableSchema tableSchema) {
@@ -61,7 +70,7 @@ public class HiveSchema {
     }
 
     /** Extract {@link HiveSchema} from Hive serde properties. */
-    public static HiveSchema extract(Properties properties) {
+    public static HiveSchema extract(@Nullable Configuration configuration, Properties properties) {
         String location = properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
         if (location == null) {
             String tableName = properties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
@@ -71,6 +80,13 @@ public class HiveSchema {
                             + ". Currently Flink table store only supports external table for Hive "
                             + "so location property must be set.");
         }
+        if (configuration != null) {
+            org.apache.flink.configuration.Configuration flinkConf =
+                    org.apache.flink.configuration.Configuration.fromMap(
+                            getPropsWithPrefix(configuration, TABLE_STORE_PREFIX));
+            FileSystem.initialize(
+                    flinkConf, PluginUtils.createPluginManagerFromRootFolder(flinkConf));
+        }
         TableSchema tableSchema =
                 new SchemaManager(new Path(location))
                         .latest()
@@ -102,6 +118,30 @@ public class HiveSchema {
         return new HiveSchema(tableSchema);
     }
 
+    /**
+     * Constructs a mapping of configuration and includes all properties that start with the
+     * specified configuration prefix. Property names in the mapping are trimmed to remove the
+     * configuration prefix.
+     *
+     * <p>Note: this is directly copied from {@link Configuration} to make E2E test happy, since
+     * this method is introduced since 2.8 but we are using a hive container with hadoop-2.7.4.
+     *
+     * @param confPrefix configuration prefix
+     * @return mapping of configuration properties with prefix stripped
+     */
+    private static Map<String, String> getPropsWithPrefix(Configuration conf, String confPrefix) {
+        Map<String, String> configMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : conf) {
+            String name = entry.getKey();
+            if (name.startsWith(confPrefix)) {
+                String value = conf.get(name);
+                name = name.substring(confPrefix.length());
+                configMap.put(name, value);
+            }
+        }
+        return configMap;
+    }
+
     private static void checkSchemaMatched(
             List<String> names, List<TypeInfo> typeInfos, TableSchema tableSchema) {
         List<String> ddlNames = new ArrayList<>(names);
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
index 253ed4e2..0b79c5c3 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
@@ -44,7 +44,7 @@ public class TableStoreSerDe extends AbstractSerDe {
     @Override
     public void initialize(@Nullable Configuration configuration, Properties properties)
             throws SerDeException {
-        HiveSchema schema = HiveSchema.extract(properties);
+        HiveSchema schema = HiveSchema.extract(configuration, properties);
         inspector =
                 new TableStoreRowDataObjectInspector(
                         schema.fieldNames(), schema.fieldTypes(), schema.fieldComments());
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
index 0b1864a0..889f11af 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
@@ -69,7 +69,7 @@ public class HiveTableSchemaTest {
                                 TypeInfoFactory.getDecimalTypeInfo(5, 3).getTypeName())));
         properties.setProperty("location", tempDir.toString());
 
-        HiveSchema schema = HiveSchema.extract(properties);
+        HiveSchema schema = HiveSchema.extract(null, properties);
         assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b", "c"));
         assertThat(schema.fieldTypes())
                 .isEqualTo(
@@ -90,7 +90,7 @@ public class HiveTableSchemaTest {
         properties.setProperty("columns.types", "");
         properties.setProperty("location", tempDir.toString());
 
-        HiveSchema schema = HiveSchema.extract(properties);
+        HiveSchema schema = HiveSchema.extract(null, properties);
         assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b", "c"));
         assertThat(schema.fieldTypes())
                 .isEqualTo(
@@ -133,7 +133,8 @@ public class HiveTableSchemaTest {
                         "Hive DDL          : c decimal(6,3)",
                         "Table Store Schema: c decimal(5,3)");
         IllegalArgumentException exception =
-                assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+                assertThrows(
+                        IllegalArgumentException.class, () -> HiveSchema.extract(null, properties));
         assertThat(exception).hasMessageContaining(expected);
     }
 
@@ -161,7 +162,8 @@ public class HiveTableSchemaTest {
                         "Hive DDL          : null",
                         "Table Store Schema: c decimal(5,3)");
         IllegalArgumentException exception =
-                assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+                assertThrows(
+                        IllegalArgumentException.class, () -> HiveSchema.extract(null, properties));
         assertThat(exception).hasMessageContaining(expected);
     }
 
@@ -198,7 +200,8 @@ public class HiveTableSchemaTest {
                         "Hive DDL          : e string",
                         "Table Store Schema: null");
         IllegalArgumentException exception =
-                assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+                assertThrows(
+                        IllegalArgumentException.class, () -> HiveSchema.extract(null, properties));
         assertThat(exception).hasMessageContaining(expected);
     }
 
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
new file mode 100644
index 00000000..562834f9
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This util convert lowercase key to case-sensitive key. The reason is that during {@link
+ * SparkCatalog} initialization, {@link org.apache.spark.sql.connector.catalog.Catalogs} puts all
+ * configuration to a {@link org.apache.spark.sql.util.CaseInsensitiveStringMap}. However, {@code
+ * org.apache.hadoop.fs.aliyun.oss.Constants} maintains case-sensitive keys to initialize the
+ * Hadoop-Oss FileSystem, which needs to be converted back.
+ */
+public class SparkCaseSensitiveConverter {
+
+    private static final Set<String> CASE_SENSITIVE_KEYS = new HashSet<>();
+
+    // OSS access verification
+    private static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
+    private static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+    private static final String SECURITY_TOKEN = "fs.oss.securityToken";
+
+    static {
+        CASE_SENSITIVE_KEYS.add(ACCESS_KEY_ID);
+        CASE_SENSITIVE_KEYS.add(ACCESS_KEY_SECRET);
+        CASE_SENSITIVE_KEYS.add(SECURITY_TOKEN);
+    }
+
+    public static Map<String, String> convert(Map<String, String> caseInsensitiveOptions) {
+        Map<String, String> options = new HashMap<>(caseInsensitiveOptions);
+        CASE_SENSITIVE_KEYS.forEach(
+                key -> {
+                    String lowercaseKey = key.toLowerCase();
+                    if (caseInsensitiveOptions.containsKey(lowercaseKey)) {
+                        options.put(key, options.remove(lowercaseKey));
+                    }
+                });
+        return options;
+    }
+
+    private SparkCaseSensitiveConverter() {}
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index d4db28db..0c9bef86 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
@@ -69,7 +71,11 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
         this.name = name;
-        this.catalog = CatalogFactory.createCatalog(Configuration.fromMap(options));
+        Configuration configuration =
+                Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
+        FileSystem.initialize(
+                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
+        this.catalog = CatalogFactory.createCatalog(configuration);
     }
 
     @Override
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 87887a2b..8f60fde8 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,10 +19,12 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
+import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
 import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableProvider;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.types.StructType;
@@ -31,12 +33,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import java.util.Map;
 
 /** The spark source for table store. */
-public class SparkSource implements DataSourceRegister, TableProvider {
+public class SparkSource implements DataSourceRegister, SessionConfigSupport {
+
+    /** Not use 'table-store' here, the '-' is not allowed in SQL. */
+    private static final String SHORT_NAME = "tablestore";
 
     @Override
     public String shortName() {
-        // Not use 'table-store' here, the '-' is not allowed in SQL
-        return "tablestore";
+        return SHORT_NAME;
     }
 
     @Override
@@ -61,6 +65,15 @@ public class SparkSource implements DataSourceRegister, TableProvider {
     @Override
     public Table getTable(
             StructType schema, Transform[] partitioning, Map<String, String> options) {
+        Configuration configuration =
+                Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
+        FileSystem.initialize(
+                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
         return new SparkTable(FileStoreTableFactory.create(Configuration.fromMap(options)));
     }
+
+    @Override
+    public String keyPrefix() {
+        return SHORT_NAME;
+    }
 }
diff --git a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
new file mode 100644
index 00000000..e662430d
--- /dev/null
+++ b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkCaseSensitiveConverter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This util convert lowercase key to case-sensitive key. The reason is that during {@link
+ * SparkSource} initialization, {@link
+ * org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils} puts all configuration to a
+ * {@link org.apache.spark.sql.catalyst.util.CaseInsensitiveMap}. However, {@code
+ * org.apache.hadoop.fs.aliyun.oss.Constants} maintains case-sensitive keys to initialize the
+ * Hadoop-Oss FileSystem, which needs to be converted back.
+ */
+public class SparkCaseSensitiveConverter {
+
+    private static final Set<String> CASE_SENSITIVE_KEYS = new HashSet<>();
+
+    // OSS access verification
+    private static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
+    private static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+    private static final String SECURITY_TOKEN = "fs.oss.securityToken";
+
+    static {
+        CASE_SENSITIVE_KEYS.add(ACCESS_KEY_ID);
+        CASE_SENSITIVE_KEYS.add(ACCESS_KEY_SECRET);
+        CASE_SENSITIVE_KEYS.add(SECURITY_TOKEN);
+    }
+
+    public static Map<String, String> convert(DataSourceOptions options) {
+        Map<String, String> newOptions = new HashMap<>(options.asMap());
+        CASE_SENSITIVE_KEYS.forEach(
+                key -> {
+                    String lowercaseKey = key.toLowerCase();
+                    if (newOptions.containsKey(lowercaseKey)) {
+                        newOptions.put(key, newOptions.remove(lowercaseKey));
+                    }
+                });
+        return newOptions;
+    }
+}
diff --git a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 0f8dd8f4..2050b63a 100644
--- a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,21 +19,26 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
 import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.SessionConfigSupport;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.types.StructType;
 
 /** The Spark source for table store. */
-public class SparkSource implements DataSourceRegister, ReadSupport {
+public class SparkSource implements DataSourceRegister, ReadSupport, SessionConfigSupport {
+
+    /** Not use 'table-store' here, the '-' is not allowed in SQL. */
+    private static final String SHORT_NAME = "tablestore";
 
     @Override
     public String shortName() {
-        // Not use 'table-store' here, the '-' is not allowed in SQL
-        return "tablestore";
+        return SHORT_NAME;
     }
 
     @Override
@@ -43,7 +48,15 @@ public class SparkSource implements DataSourceRegister, ReadSupport {
 
     @Override
     public DataSourceReader createReader(DataSourceOptions options) {
-        return new SparkDataSourceReader(
-                FileStoreTableFactory.create(Configuration.fromMap(options.asMap())));
+        Configuration configuration =
+                Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
+        FileSystem.initialize(
+                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
+        return new SparkDataSourceReader(FileStoreTableFactory.create(configuration));
+    }
+
+    @Override
+    public String keyPrefix() {
+        return SHORT_NAME;
     }
 }
diff --git a/pom.xml b/pom.xml
index b04f1dfc..f067f1be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@ under the License.
         <module>flink-table-store-dist</module>
         <module>flink-table-store-docs</module>
         <module>flink-table-store-e2e-tests</module>
+        <module>flink-table-store-filesystem</module>
         <module>flink-table-store-format</module>
         <module>flink-table-store-shade</module>
         <module>flink-table-store-hive</module>