You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/15 07:03:03 UTC
[flink-table-store] branch master updated: [FLINK-28560] Support Spark 3.3 profile for SparkSource
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 15ddb221 [FLINK-28560] Support Spark 3.3 profile for SparkSource
15ddb221 is described below
commit 15ddb22174fb94bd36b176f0eb487f5a1b39d443
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jul 15 15:02:59 2022 +0800
[FLINK-28560] Support Spark 3.3 profile for SparkSource
This closes #217
---
docs/content/docs/engines/overview.md | 1 +
flink-table-store-spark/pom.xml | 6 ++++
.../flink/table/store/spark/SparkCatalog.java | 28 +++++++++++++++++-
.../flink/table/store/spark/SparkTypeTest.java | 34 +++++++++++-----------
4 files changed, 51 insertions(+), 18 deletions(-)
diff --git a/docs/content/docs/engines/overview.md b/docs/content/docs/engines/overview.md
index dc4a3f1c..0bbc14ad 100644
--- a/docs/content/docs/engines/overview.md
+++ b/docs/content/docs/engines/overview.md
@@ -41,5 +41,6 @@ Apache Hive and Apache Spark.
| Spark | 3.0 | read | Projection, Filter |
| Spark | 3.1 | read | Projection, Filter |
| Spark | 3.2 | read | Projection, Filter |
+| Spark | 3.3 | read | Projection, Filter |
| Trino | 358 | read | Projection, Filter |
| Trino | 388 | read | Projection, Filter |
\ No newline at end of file
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
index 66609af6..1a29f56a 100644
--- a/flink-table-store-spark/pom.xml
+++ b/flink-table-store-spark/pom.xml
@@ -76,6 +76,12 @@ under the License.
<!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions -->
<profiles>
+ <profile>
+ <id>spark-3.3</id>
+ <properties>
+ <spark.version>3.3.0</spark.version>
+ </properties>
+ </profile>
<profile>
<id>spark-3.2</id>
<properties>
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index b53cb33f..65f1387b 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -185,8 +185,34 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
}
- @Override
+ /**
+ * Drop a namespace from the catalog, recursively dropping all objects within the namespace.
+ * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
+ *
+ * <p>If the catalog implementation does not support this operation, it may throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @return true if the namespace was dropped
+ * @throws UnsupportedOperationException If drop is not a supported operation
+ */
public boolean dropNamespace(String[] namespace) {
+ return dropNamespace(namespace, true);
+ }
+
+ /**
+ * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within
+ * the namespace if cascade is true. This interface implementation supports the Spark 3.3+.
+ *
+ * <p>If the catalog implementation does not support this operation, it may throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @param cascade When true, deletes all objects under the namespace
+ * @return true if the namespace was dropped
+ * @throws UnsupportedOperationException If drop is not a supported operation
+ */
+ public boolean dropNamespace(String[] namespace, boolean cascade) {
throw new UnsupportedOperationException("Drop namespace in Spark is not supported yet.");
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
index abae4e2d..8ccea4f8 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -73,29 +73,29 @@ public class SparkTypeTest {
String nestedRowMapType =
"StructField(locations,MapType("
+ "StringType,"
- + "StructType(StructField(posX,DoubleType,false), StructField(posY,DoubleType,false)),true),true)";
+ + "StructType(StructField(posX,DoubleType,false),StructField(posY,DoubleType,false)),true),true)";
String expected =
"StructType("
- + "StructField(id,IntegerType,false), "
- + "StructField(name,StringType,true), "
- + "StructField(salary,DoubleType,false), "
+ + "StructField(id,IntegerType,false),"
+ + "StructField(name,StringType,true),"
+ + "StructField(salary,DoubleType,false),"
+ nestedRowMapType
- + ", "
- + "StructField(strArray,ArrayType(StringType,true),true), "
- + "StructField(intArray,ArrayType(IntegerType,true),true), "
- + "StructField(boolean,BooleanType,true), "
- + "StructField(tinyint,ByteType,true), "
- + "StructField(smallint,ShortType,true), "
- + "StructField(bigint,LongType,true), "
- + "StructField(bytes,BinaryType,true), "
- + "StructField(timestamp,TimestampType,true), "
- + "StructField(date,DateType,true), "
- + "StructField(decimal,DecimalType(2,2),true), "
- + "StructField(decimal2,DecimalType(38,2),true), "
+ + ","
+ + "StructField(strArray,ArrayType(StringType,true),true),"
+ + "StructField(intArray,ArrayType(IntegerType,true),true),"
+ + "StructField(boolean,BooleanType,true),"
+ + "StructField(tinyint,ByteType,true),"
+ + "StructField(smallint,ShortType,true),"
+ + "StructField(bigint,LongType,true),"
+ + "StructField(bytes,BinaryType,true),"
+ + "StructField(timestamp,TimestampType,true),"
+ + "StructField(date,DateType,true),"
+ + "StructField(decimal,DecimalType(2,2),true),"
+ + "StructField(decimal2,DecimalType(38,2),true),"
+ "StructField(decimal3,DecimalType(10,1),true))";
StructType sparkType = fromFlinkRowType(ALL_TYPES);
- assertThat(sparkType.toString()).isEqualTo(expected);
+ assertThat(sparkType.toString().replace(", ", ",")).isEqualTo(expected);
assertThat(toFlinkType(sparkType)).isEqualTo(ALL_TYPES);
}