You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/27 02:13:13 UTC
[flink-table-store] branch release-0.3 updated: [FLINK-30504] Fix UnsupportedFileSystemSchemeException when writing Table Store on OSS with Spark
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.3 by this push:
new fe8de4c3 [FLINK-30504] Fix UnsupportedFileSystemSchemeException when writing Table Store on OSS with Spark
fe8de4c3 is described below
commit fe8de4c32c148bb87f5a40649ca2373e88f321d8
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Dec 27 10:09:44 2022 +0800
[FLINK-30504] Fix UnsupportedFileSystemSchemeException when writing Table Store on OSS with Spark
This closes #444.
(cherry picked from commit 929c1110bdbf521ad0a9eb09a65d33f76a2b5990)
---
docs/content/docs/filesystems/oss.md | 18 +++++------
.../apache/flink/table/store/table/DataTable.java | 3 --
.../org/apache/flink/table/store/table/Table.java | 3 ++
.../table/store/table/system/OptionsTable.java | 5 ++++
.../table/store/table/system/SchemasTable.java | 5 ++++
.../table/store/table/system/SnapshotsTable.java | 5 ++++
.../flink/table/store/spark/SparkCatalog.java | 12 ++++----
.../table/store/spark/SparkReaderFactory.java | 14 ++++++++-
.../apache/flink/table/store/spark/SparkScan.java | 8 +++--
.../flink/table/store/spark/SparkScanBuilder.java | 7 +++--
.../flink/table/store/spark/SparkSource.java | 4 ++-
.../apache/flink/table/store/spark/SparkTable.java | 9 ++++--
.../apache/flink/table/store/spark/SparkWrite.java | 35 ++++++++++++++++++----
.../flink/table/store/spark/SparkWriteBuilder.java | 8 +++--
14 files changed, 103 insertions(+), 33 deletions(-)
diff --git a/docs/content/docs/filesystems/oss.md b/docs/content/docs/filesystems/oss.md
index 39d54c70..2423ff16 100644
--- a/docs/content/docs/filesystems/oss.md
+++ b/docs/content/docs/filesystems/oss.md
@@ -48,9 +48,9 @@ You can find the shaded jars under
[Prepare OSS jar](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/oss/#shaded-hadoop-oss-file-system), then configure `flink-conf.yaml` like
```yaml
-fs.oss.endpoint: oss-cn-hangzhou.aliyun.cs.com
-fs.oss.accessKey: xxx
-fs.oss.accessSecret: yyy
+fs.oss.endpoint: oss-cn-hangzhou.aliyuncs.com
+fs.oss.accessKeyId: xxx
+fs.oss.accessKeySecret: yyy
```
{{< /tab >}}
@@ -63,9 +63,9 @@ Place `flink-table-store-oss-{{< version >}}.jar` together with `flink-table-sto
spark-sql \
--conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
--conf spark.sql.catalog.tablestore.warehouse=oss://<bucket-name>/ \
- --conf spark.sql.catalog.tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com \
- --conf spark.sql.catalog.tablestore.fs.oss.accessKey=xxx \
- --conf spark.sql.catalog.tablestore.fs.oss.accessSecret=yyy
+ --conf spark.sql.catalog.tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com \
+ --conf spark.sql.catalog.tablestore.fs.oss.accessKeyId=xxx \
+ --conf spark.sql.catalog.tablestore.fs.oss.accessKeySecret=yyy
```
{{< /tab >}}
@@ -75,9 +75,9 @@ spark-sql \
Place `flink-table-store-oss-{{< version >}}.jar` together with `flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
```sql
-SET tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyun.cs.com;
-SET tablestore.fs.oss.accessKey=xxx;
-SET tablestore.fs.oss.accessSecret=yyy;
+SET tablestore.fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com;
+SET tablestore.fs.oss.accessKeyId=xxx;
+SET tablestore.fs.oss.accessKeySecret=yyy;
CREATE EXTERNAL TABLE external_test_table
STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
index 60e62685..8a992d01 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.table;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.source.DataTableScan;
@@ -31,7 +30,5 @@ public interface DataTable extends Table {
CoreOptions options();
- Path location();
-
SnapshotManager snapshotManager();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
index 3a6b705d..7227a690 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.types.logical.RowType;
@@ -32,6 +33,8 @@ public interface Table extends Serializable {
RowType rowType();
+ Path location();
+
TableScan newScan();
TableRead newRead();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
index c5901267..9b4820e3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
@@ -74,6 +74,11 @@ public class OptionsTable implements Table {
return TABLE_TYPE;
}
+ @Override
+ public Path location() {
+ return location;
+ }
+
@Override
public TableScan newScan() {
return new OptionsScan();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
index 30619b6f..3164caca 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
@@ -85,6 +85,11 @@ public class SchemasTable implements Table {
return TABLE_TYPE;
}
+ @Override
+ public Path location() {
+ return location;
+ }
+
@Override
public TableScan newScan() {
return new SchemasScan();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
index 8890431e..712b634a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
@@ -87,6 +87,11 @@ public class SnapshotsTable implements Table {
return TABLE_TYPE;
}
+ @Override
+ public Path location() {
+ return location;
+ }
+
@Override
public TableScan newScan() {
return new SnapshotsScan();
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 9712b709..c6b59b42 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -69,14 +69,14 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
private String name = null;
private Catalog catalog = null;
+ private Configuration conf = null;
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.name = name;
- Configuration configuration =
- Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
- FileSystems.initialize(CatalogFactory.warehouse(configuration), configuration);
- this.catalog = CatalogFactory.createCatalog(configuration);
+ conf = Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
+ FileSystems.initialize(CatalogFactory.warehouse(conf), conf);
+ this.catalog = CatalogFactory.createCatalog(conf);
}
@Override
@@ -204,7 +204,9 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
try {
ObjectPath path = objectPath(ident);
return new SparkTable(
- catalog.getTable(path), Lock.factory(catalog.lockFactory().orElse(null), path));
+ catalog.getTable(path),
+ Lock.factory(catalog.lockFactory().orElse(null), path),
+ conf);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
index 54b119ea..0a1c4b80 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -17,10 +17,12 @@
package org.apache.flink.table.store.spark;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.store.table.Table;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.utils.TypeUtils;
@@ -45,11 +47,14 @@ public class SparkReaderFactory implements PartitionReaderFactory {
private final Table table;
private final int[] projectedFields;
private final List<Predicate> predicates;
+ private final Configuration conf;
- public SparkReaderFactory(Table table, int[] projectedFields, List<Predicate> predicates) {
+ public SparkReaderFactory(
+ Table table, int[] projectedFields, List<Predicate> predicates, Configuration conf) {
this.table = table;
this.projectedFields = projectedFields;
this.predicates = predicates;
+ this.conf = conf;
}
private RowType readRowType() {
@@ -96,4 +101,11 @@ public class SparkReaderFactory implements PartitionReaderFactory {
}
};
}
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ // TODO move file system initialization into common modules
+ FileSystems.initialize(table.location(), conf);
+ }
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
index 576373d9..38998790 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.spark;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.table.Table;
import org.apache.flink.table.store.table.source.Split;
@@ -45,13 +46,16 @@ public class SparkScan implements Scan, SupportsReportStatistics {
protected final Table table;
private final List<Predicate> predicates;
private final int[] projectedFields;
+ private final Configuration conf;
private List<Split> splits;
- public SparkScan(Table table, List<Predicate> predicates, int[] projectedFields) {
+ public SparkScan(
+ Table table, List<Predicate> predicates, int[] projectedFields, Configuration conf) {
this.table = table;
this.predicates = predicates;
this.projectedFields = projectedFields;
+ this.conf = conf;
}
@Override
@@ -77,7 +81,7 @@ public class SparkScan implements Scan, SupportsReportStatistics {
@Override
public PartitionReaderFactory createReaderFactory() {
- return new SparkReaderFactory(table, projectedFields, predicates);
+ return new SparkReaderFactory(table, projectedFields, predicates, conf);
}
};
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
index ae7faaea..880c1832 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.spark;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.table.Table;
@@ -36,13 +37,15 @@ public class SparkScanBuilder
implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns {
private final Table table;
+ private final Configuration conf;
private List<Predicate> predicates = new ArrayList<>();
private Filter[] pushedFilters;
private int[] projectedFields;
- public SparkScanBuilder(Table table) {
+ public SparkScanBuilder(Table table, Configuration conf) {
this.table = table;
+ this.conf = conf;
}
@Override
@@ -80,6 +83,6 @@ public class SparkScanBuilder
@Override
public Scan build() {
- return new SparkScan(table, predicates, projectedFields);
+ return new SparkScan(table, predicates, projectedFields, conf);
}
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 67e3c7ec..bb14238c 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -70,7 +70,9 @@ public class SparkSource implements DataSourceRegister, SessionConfigSupport {
Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
FileSystems.initialize(CoreOptions.path(options), configuration);
return new SparkTable(
- FileStoreTableFactory.create(Configuration.fromMap(options)), Lock.emptyFactory());
+ FileStoreTableFactory.create(Configuration.fromMap(options)),
+ Lock.emptyFactory(),
+ configuration);
}
@Override
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index 426c7476..0984dba1 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.spark;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.table.SupportsPartition;
@@ -52,16 +53,18 @@ public class SparkTable
private final Table table;
private final Lock.Factory lockFactory;
+ private final Configuration conf;
- public SparkTable(Table table, Lock.Factory lockFactory) {
+ public SparkTable(Table table, Lock.Factory lockFactory, Configuration conf) {
this.table = table;
this.lockFactory = lockFactory;
+ this.conf = conf;
}
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
// options is already merged into table
- return new SparkScanBuilder(table);
+ return new SparkScanBuilder(table, conf);
}
@Override
@@ -96,7 +99,7 @@ public class SparkTable
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
- return new SparkWriteBuilder(castToWritable(table), info.queryId(), lockFactory);
+ return new SparkWriteBuilder(castToWritable(table), info.queryId(), lockFactory, conf);
}
@Override
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
index 504dfd7e..2f4886b2 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
@@ -18,7 +18,9 @@
package org.apache.flink.table.store.spark;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.store.table.SupportsWrite;
import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.store.table.sink.FileCommittable;
@@ -33,6 +35,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.connector.write.V1Write;
import org.apache.spark.sql.sources.InsertableRelation;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -43,11 +46,14 @@ public class SparkWrite implements V1Write {
private final SupportsWrite table;
private final String queryId;
private final Lock.Factory lockFactory;
+ private final Configuration conf;
- public SparkWrite(SupportsWrite table, String queryId, Lock.Factory lockFactory) {
+ public SparkWrite(
+ SupportsWrite table, String queryId, Lock.Factory lockFactory, Configuration conf) {
this.table = table;
this.queryId = queryId;
this.lockFactory = lockFactory;
+ this.conf = conf;
}
@Override
@@ -60,8 +66,8 @@ public class SparkWrite implements V1Write {
long identifier = 0;
List<SerializableCommittable> committables =
data.toJavaRDD()
- .groupBy(new ComputeBucket(table))
- .mapValues(new WriteRecords(table, queryId, identifier))
+ .groupBy(new ComputeBucket(table, conf))
+ .mapValues(new WriteRecords(table, queryId, identifier, conf))
.values()
.reduce(new ListConcat<>());
try (TableCommit tableCommit =
@@ -81,12 +87,14 @@ public class SparkWrite implements V1Write {
private final SupportsWrite table;
private final RowType type;
+ private final Configuration conf;
private transient BucketComputer lazyComputer;
- private ComputeBucket(SupportsWrite table) {
+ private ComputeBucket(SupportsWrite table, Configuration conf) {
this.table = table;
this.type = table.rowType();
+ this.conf = conf;
}
private BucketComputer computer() {
@@ -100,6 +108,13 @@ public class SparkWrite implements V1Write {
public Integer call(Row row) {
return computer().bucket(new SparkRowData(type, row));
}
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ // TODO move file system initialization into common modules
+ FileSystems.initialize(table.location(), conf);
+ }
}
private static class WriteRecords
@@ -109,12 +124,15 @@ public class SparkWrite implements V1Write {
private final RowType type;
private final String queryId;
private final long commitIdentifier;
+ private final Configuration conf;
- private WriteRecords(SupportsWrite table, String queryId, long commitIdentifier) {
+ private WriteRecords(
+ SupportsWrite table, String queryId, long commitIdentifier, Configuration conf) {
this.table = table;
this.type = table.rowType();
this.queryId = queryId;
this.commitIdentifier = commitIdentifier;
+ this.conf = conf;
}
@Override
@@ -129,6 +147,13 @@ public class SparkWrite implements V1Write {
.collect(Collectors.toList());
}
}
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ // TODO move file system initialization into common modules
+ FileSystems.initialize(table.location(), conf);
+ }
}
private static class ListConcat<T> implements Function2<List<T>, List<T>, List<T>> {
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
index aef473f0..179ca29e 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.spark;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.SupportsWrite;
@@ -34,15 +35,18 @@ public class SparkWriteBuilder implements WriteBuilder {
private final SupportsWrite table;
private final String queryId;
private final Lock.Factory lockFactory;
+ private final Configuration conf;
- public SparkWriteBuilder(SupportsWrite table, String queryId, Lock.Factory lockFactory) {
+ public SparkWriteBuilder(
+ SupportsWrite table, String queryId, Lock.Factory lockFactory, Configuration conf) {
this.table = table;
this.queryId = queryId;
this.lockFactory = lockFactory;
+ this.conf = conf;
}
@Override
public Write build() {
- return new SparkWrite(table, queryId, lockFactory);
+ return new SparkWrite(table, queryId, lockFactory, conf);
}
}