You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/27 02:13:13 UTC

[flink-table-store] branch release-0.3 updated: [FLINK-30504] Fix UnsupportedFileSystemSchemeException when writing Table Store on OSS with Spark

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.3 by this push:
     new fe8de4c3 [FLINK-30504] Fix UnsupportedFileSystemSchemeException when writing Table Store on OSS with Spark
fe8de4c3 is described below

commit fe8de4c32c148bb87f5a40649ca2373e88f321d8
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Dec 27 10:09:44 2022 +0800

    [FLINK-30504] Fix UnsupportedFileSystemSchemeException when writing Table Store on OSS with Spark
    
    This closes #444.
    
    (cherry picked from commit 929c1110bdbf521ad0a9eb09a65d33f76a2b5990)
---
 docs/content/docs/filesystems/oss.md               | 18 +++++------
 .../apache/flink/table/store/table/DataTable.java  |  3 --
 .../org/apache/flink/table/store/table/Table.java  |  3 ++
 .../table/store/table/system/OptionsTable.java     |  5 ++++
 .../table/store/table/system/SchemasTable.java     |  5 ++++
 .../table/store/table/system/SnapshotsTable.java   |  5 ++++
 .../flink/table/store/spark/SparkCatalog.java      | 12 ++++----
 .../table/store/spark/SparkReaderFactory.java      | 14 ++++++++-
 .../apache/flink/table/store/spark/SparkScan.java  |  8 +++--
 .../flink/table/store/spark/SparkScanBuilder.java  |  7 +++--
 .../flink/table/store/spark/SparkSource.java       |  4 ++-
 .../apache/flink/table/store/spark/SparkTable.java |  9 ++++--
 .../apache/flink/table/store/spark/SparkWrite.java | 35 ++++++++++++++++++----
 .../flink/table/store/spark/SparkWriteBuilder.java |  8 +++--
 14 files changed, 103 insertions(+), 33 deletions(-)

diff --git a/docs/content/docs/filesystems/oss.md b/docs/content/docs/filesystems/oss.md
index 39d54c70..2423ff16 100644
--- a/docs/content/docs/filesystems/oss.md
+++ b/docs/content/docs/filesystems/oss.md
@@ -48,9 +48,9 @@ You can find the shaded jars under
 [Prepare OSS jar](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/oss/#shaded-hadoop-oss-file-system), then configure `flink-conf.yaml` like
 
 ```yaml
-fs.oss.endpoint: oss-cn-hangzhou.aliyun.cs.com
-fs.oss.accessKey: xxx
-fs.oss.accessSecret: yyy
+fs.oss.endpoint: oss-cn-hangzhou.aliyuncs.com
+fs.oss.accessKeyId: xxx
+fs.oss.accessKeySecret: yyy
 ```
 
 {{< /tab >}}
@@ -63,9 +63,9 @@ Place `flink-table-store-oss-{{< version >}}.jar` together with `flink-table-sto
 spark-sql \ 
   --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
   --conf spark.sql.catalog.tablestore.warehouse=oss://<bucket-name>/ \
-  --conf spark.sql.catalog.tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com \
-  --conf spark.sql.catalog.tablestore.fs.oss.accessKey=xxx \
-  --conf spark.sql.catalog.tablestore.fs.oss.accessSecret=yyy
+  --conf spark.sql.catalog.tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com \
+  --conf spark.sql.catalog.tablestore.fs.oss.accessKeyId=xxx \
+  --conf spark.sql.catalog.tablestore.fs.oss.accessKeySecret=yyy
 ```
 
 {{< /tab >}}
@@ -75,9 +75,9 @@ spark-sql \
 Place `flink-table-store-oss-{{< version >}}.jar` together with `flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
 
 ```sql
-SET tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com;
-SET tablestore.fs.oss.accessKey=xxx;
-SET tablestore.fs.oss.accessSecret=yyy;
+SET tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com;
+SET tablestore.fs.oss.accessKeyId=xxx;
+SET tablestore.fs.oss.accessKeySecret=yyy;
 
 CREATE EXTERNAL TABLE external_test_table
 STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
index 60e62685..8a992d01 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.table;
 
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.source.DataTableScan;
@@ -31,7 +30,5 @@ public interface DataTable extends Table {
 
     CoreOptions options();
 
-    Path location();
-
     SnapshotManager snapshotManager();
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
index 3a6b705d..7227a690 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.table.types.logical.RowType;
@@ -32,6 +33,8 @@ public interface Table extends Serializable {
 
     RowType rowType();
 
+    Path location();
+
     TableScan newScan();
 
     TableRead newRead();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
index c5901267..9b4820e3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
@@ -74,6 +74,11 @@ public class OptionsTable implements Table {
         return TABLE_TYPE;
     }
 
+    @Override
+    public Path location() {
+        return location;
+    }
+
     @Override
     public TableScan newScan() {
         return new OptionsScan();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
index 30619b6f..3164caca 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
@@ -85,6 +85,11 @@ public class SchemasTable implements Table {
         return TABLE_TYPE;
     }
 
+    @Override
+    public Path location() {
+        return location;
+    }
+
     @Override
     public TableScan newScan() {
         return new SchemasScan();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
index 8890431e..712b634a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
@@ -87,6 +87,11 @@ public class SnapshotsTable implements Table {
         return TABLE_TYPE;
     }
 
+    @Override
+    public Path location() {
+        return location;
+    }
+
     @Override
     public TableScan newScan() {
         return new SnapshotsScan();
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 9712b709..c6b59b42 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -69,14 +69,14 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
 
     private String name = null;
     private Catalog catalog = null;
+    private Configuration conf = null;
 
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
         this.name = name;
-        Configuration configuration =
-                Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
-        FileSystems.initialize(CatalogFactory.warehouse(configuration), configuration);
-        this.catalog = CatalogFactory.createCatalog(configuration);
+        conf = Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
+        FileSystems.initialize(CatalogFactory.warehouse(conf), conf);
+        this.catalog = CatalogFactory.createCatalog(conf);
     }
 
     @Override
@@ -204,7 +204,9 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         try {
             ObjectPath path = objectPath(ident);
             return new SparkTable(
-                    catalog.getTable(path), Lock.factory(catalog.lockFactory().orElse(null), path));
+                    catalog.getTable(path),
+                    Lock.factory(catalog.lockFactory().orElse(null), path),
+                    conf);
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
         }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
index 54b119ea..0a1c4b80 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -17,10 +17,12 @@
 
 package org.apache.flink.table.store.spark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.filesystem.FileSystems;
 import org.apache.flink.table.store.table.Table;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.utils.TypeUtils;
@@ -45,11 +47,14 @@ public class SparkReaderFactory implements PartitionReaderFactory {
     private final Table table;
     private final int[] projectedFields;
     private final List<Predicate> predicates;
+    private final Configuration conf;
 
-    public SparkReaderFactory(Table table, int[] projectedFields, List<Predicate> predicates) {
+    public SparkReaderFactory(
+            Table table, int[] projectedFields, List<Predicate> predicates, Configuration conf) {
         this.table = table;
         this.projectedFields = projectedFields;
         this.predicates = predicates;
+        this.conf = conf;
     }
 
     private RowType readRowType() {
@@ -96,4 +101,11 @@ public class SparkReaderFactory implements PartitionReaderFactory {
             }
         };
     }
+
+    private void readObject(java.io.ObjectInputStream in)
+            throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        // TODO move file system initialization into common modules
+        FileSystems.initialize(table.location(), conf);
+    }
 }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
index 576373d9..38998790 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.spark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.table.Table;
 import org.apache.flink.table.store.table.source.Split;
@@ -45,13 +46,16 @@ public class SparkScan implements Scan, SupportsReportStatistics {
     protected final Table table;
     private final List<Predicate> predicates;
     private final int[] projectedFields;
+    private final Configuration conf;
 
     private List<Split> splits;
 
-    public SparkScan(Table table, List<Predicate> predicates, int[] projectedFields) {
+    public SparkScan(
+            Table table, List<Predicate> predicates, int[] projectedFields, Configuration conf) {
         this.table = table;
         this.predicates = predicates;
         this.projectedFields = projectedFields;
+        this.conf = conf;
     }
 
     @Override
@@ -77,7 +81,7 @@ public class SparkScan implements Scan, SupportsReportStatistics {
 
             @Override
             public PartitionReaderFactory createReaderFactory() {
-                return new SparkReaderFactory(table, projectedFields, predicates);
+                return new SparkReaderFactory(table, projectedFields, predicates, conf);
             }
         };
     }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
index ae7faaea..880c1832 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.spark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.table.Table;
 
@@ -36,13 +37,15 @@ public class SparkScanBuilder
         implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns {
 
     private final Table table;
+    private final Configuration conf;
 
     private List<Predicate> predicates = new ArrayList<>();
     private Filter[] pushedFilters;
     private int[] projectedFields;
 
-    public SparkScanBuilder(Table table) {
+    public SparkScanBuilder(Table table, Configuration conf) {
         this.table = table;
+        this.conf = conf;
     }
 
     @Override
@@ -80,6 +83,6 @@ public class SparkScanBuilder
 
     @Override
     public Scan build() {
-        return new SparkScan(table, predicates, projectedFields);
+        return new SparkScan(table, predicates, projectedFields, conf);
     }
 }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 67e3c7ec..bb14238c 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -70,7 +70,9 @@ public class SparkSource implements DataSourceRegister, SessionConfigSupport {
                 Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
         FileSystems.initialize(CoreOptions.path(options), configuration);
         return new SparkTable(
-                FileStoreTableFactory.create(Configuration.fromMap(options)), Lock.emptyFactory());
+                FileStoreTableFactory.create(Configuration.fromMap(options)),
+                Lock.emptyFactory(),
+                configuration);
     }
 
     @Override
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index 426c7476..0984dba1 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.spark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.table.SupportsPartition;
@@ -52,16 +53,18 @@ public class SparkTable
 
     private final Table table;
     private final Lock.Factory lockFactory;
+    private final Configuration conf;
 
-    public SparkTable(Table table, Lock.Factory lockFactory) {
+    public SparkTable(Table table, Lock.Factory lockFactory, Configuration conf) {
         this.table = table;
         this.lockFactory = lockFactory;
+        this.conf = conf;
     }
 
     @Override
     public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         // options is already merged into table
-        return new SparkScanBuilder(table);
+        return new SparkScanBuilder(table, conf);
     }
 
     @Override
@@ -96,7 +99,7 @@ public class SparkTable
 
     @Override
     public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
-        return new SparkWriteBuilder(castToWritable(table), info.queryId(), lockFactory);
+        return new SparkWriteBuilder(castToWritable(table), info.queryId(), lockFactory, conf);
     }
 
     @Override
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
index 504dfd7e..2f4886b2 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.store.spark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.filesystem.FileSystems;
 import org.apache.flink.table.store.table.SupportsWrite;
 import org.apache.flink.table.store.table.sink.BucketComputer;
 import org.apache.flink.table.store.table.sink.FileCommittable;
@@ -33,6 +35,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.connector.write.V1Write;
 import org.apache.spark.sql.sources.InsertableRelation;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -43,11 +46,14 @@ public class SparkWrite implements V1Write {
     private final SupportsWrite table;
     private final String queryId;
     private final Lock.Factory lockFactory;
+    private final Configuration conf;
 
-    public SparkWrite(SupportsWrite table, String queryId, Lock.Factory lockFactory) {
+    public SparkWrite(
+            SupportsWrite table, String queryId, Lock.Factory lockFactory, Configuration conf) {
         this.table = table;
         this.queryId = queryId;
         this.lockFactory = lockFactory;
+        this.conf = conf;
     }
 
     @Override
@@ -60,8 +66,8 @@ public class SparkWrite implements V1Write {
             long identifier = 0;
             List<SerializableCommittable> committables =
                     data.toJavaRDD()
-                            .groupBy(new ComputeBucket(table))
-                            .mapValues(new WriteRecords(table, queryId, identifier))
+                            .groupBy(new ComputeBucket(table, conf))
+                            .mapValues(new WriteRecords(table, queryId, identifier, conf))
                             .values()
                             .reduce(new ListConcat<>());
             try (TableCommit tableCommit =
@@ -81,12 +87,14 @@ public class SparkWrite implements V1Write {
 
         private final SupportsWrite table;
         private final RowType type;
+        private final Configuration conf;
 
         private transient BucketComputer lazyComputer;
 
-        private ComputeBucket(SupportsWrite table) {
+        private ComputeBucket(SupportsWrite table, Configuration conf) {
             this.table = table;
             this.type = table.rowType();
+            this.conf = conf;
         }
 
         private BucketComputer computer() {
@@ -100,6 +108,13 @@ public class SparkWrite implements V1Write {
         public Integer call(Row row) {
             return computer().bucket(new SparkRowData(type, row));
         }
+
+        private void readObject(java.io.ObjectInputStream in)
+                throws IOException, ClassNotFoundException {
+            in.defaultReadObject();
+            // TODO move file system initialization into common modules
+            FileSystems.initialize(table.location(), conf);
+        }
     }
 
     private static class WriteRecords
@@ -109,12 +124,15 @@ public class SparkWrite implements V1Write {
         private final RowType type;
         private final String queryId;
         private final long commitIdentifier;
+        private final Configuration conf;
 
-        private WriteRecords(SupportsWrite table, String queryId, long commitIdentifier) {
+        private WriteRecords(
+                SupportsWrite table, String queryId, long commitIdentifier, Configuration conf) {
             this.table = table;
             this.type = table.rowType();
             this.queryId = queryId;
             this.commitIdentifier = commitIdentifier;
+            this.conf = conf;
         }
 
         @Override
@@ -129,6 +147,13 @@ public class SparkWrite implements V1Write {
                         .collect(Collectors.toList());
             }
         }
+
+        private void readObject(java.io.ObjectInputStream in)
+                throws IOException, ClassNotFoundException {
+            in.defaultReadObject();
+            // TODO move file system initialization into common modules
+            FileSystems.initialize(table.location(), conf);
+        }
     }
 
     private static class ListConcat<T> implements Function2<List<T>, List<T>, List<T>> {
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
index aef473f0..179ca29e 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.spark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.SupportsWrite;
 
@@ -34,15 +35,18 @@ public class SparkWriteBuilder implements WriteBuilder {
     private final SupportsWrite table;
     private final String queryId;
     private final Lock.Factory lockFactory;
+    private final Configuration conf;
 
-    public SparkWriteBuilder(SupportsWrite table, String queryId, Lock.Factory lockFactory) {
+    public SparkWriteBuilder(
+            SupportsWrite table, String queryId, Lock.Factory lockFactory, Configuration conf) {
         this.table = table;
         this.queryId = queryId;
         this.lockFactory = lockFactory;
+        this.conf = conf;
     }
 
     @Override
     public Write build() {
-        return new SparkWrite(table, queryId, lockFactory);
+        return new SparkWrite(table, queryId, lockFactory, conf);
     }
 }