You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/15 07:03:03 UTC

[flink-table-store] branch master updated: [FLINK-28560] Support Spark 3.3 profile for SparkSource

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 15ddb221 [FLINK-28560] Support Spark 3.3 profile for SparkSource
15ddb221 is described below

commit 15ddb22174fb94bd36b176f0eb487f5a1b39d443
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jul 15 15:02:59 2022 +0800

    [FLINK-28560] Support Spark 3.3 profile for SparkSource
    
    This closes #217
---
 docs/content/docs/engines/overview.md              |  1 +
 flink-table-store-spark/pom.xml                    |  6 ++++
 .../flink/table/store/spark/SparkCatalog.java      | 28 +++++++++++++++++-
 .../flink/table/store/spark/SparkTypeTest.java     | 34 +++++++++++-----------
 4 files changed, 51 insertions(+), 18 deletions(-)

diff --git a/docs/content/docs/engines/overview.md b/docs/content/docs/engines/overview.md
index dc4a3f1c..0bbc14ad 100644
--- a/docs/content/docs/engines/overview.md
+++ b/docs/content/docs/engines/overview.md
@@ -41,5 +41,6 @@ Apache Hive and Apache Spark.
 | Spark     | 3.0      | read                                                 | Projection, Filter |
 | Spark     | 3.1      | read                                                 | Projection, Filter |
 | Spark     | 3.2      | read                                                 | Projection, Filter |
+| Spark     | 3.3      | read                                                 | Projection, Filter |
 | Trino     | 358      | read                                                 | Projection, Filter |
 | Trino     | 388      | read                                                 | Projection, Filter |
\ No newline at end of file
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
index 66609af6..1a29f56a 100644
--- a/flink-table-store-spark/pom.xml
+++ b/flink-table-store-spark/pom.xml
@@ -76,6 +76,12 @@ under the License.
 
     <!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions -->
     <profiles>
+        <profile>
+            <id>spark-3.3</id>
+            <properties>
+                <spark.version>3.3.0</spark.version>
+            </properties>
+        </profile>
         <profile>
             <id>spark-3.2</id>
             <properties>
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index b53cb33f..65f1387b 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -185,8 +185,34 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
     }
 
-    @Override
+    /**
+     * Drop a namespace from the catalog, recursively dropping all objects within the namespace.
+     * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
+     *
+     * <p>If the catalog implementation does not support this operation, it may throw {@link
+     * UnsupportedOperationException}.
+     *
+     * @param namespace a multi-part namespace
+     * @return true if the namespace was dropped
+     * @throws UnsupportedOperationException If drop is not a supported operation
+     */
     public boolean dropNamespace(String[] namespace) {
+        return dropNamespace(namespace, true);
+    }
+
+    /**
+     * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within
+     * the namespace if cascade is true. This interface implementation supports the Spark 3.3+.
+     *
+     * <p>If the catalog implementation does not support this operation, it may throw {@link
+     * UnsupportedOperationException}.
+     *
+     * @param namespace a multi-part namespace
+     * @param cascade When true, deletes all objects under the namespace
+     * @return true if the namespace was dropped
+     * @throws UnsupportedOperationException If drop is not a supported operation
+     */
+    public boolean dropNamespace(String[] namespace, boolean cascade) {
         throw new UnsupportedOperationException("Drop namespace in Spark is not supported yet.");
     }
 
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
index abae4e2d..8ccea4f8 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -73,29 +73,29 @@ public class SparkTypeTest {
         String nestedRowMapType =
                 "StructField(locations,MapType("
                         + "StringType,"
-                        + "StructType(StructField(posX,DoubleType,false), StructField(posY,DoubleType,false)),true),true)";
+                        + "StructType(StructField(posX,DoubleType,false),StructField(posY,DoubleType,false)),true),true)";
         String expected =
                 "StructType("
-                        + "StructField(id,IntegerType,false), "
-                        + "StructField(name,StringType,true), "
-                        + "StructField(salary,DoubleType,false), "
+                        + "StructField(id,IntegerType,false),"
+                        + "StructField(name,StringType,true),"
+                        + "StructField(salary,DoubleType,false),"
                         + nestedRowMapType
-                        + ", "
-                        + "StructField(strArray,ArrayType(StringType,true),true), "
-                        + "StructField(intArray,ArrayType(IntegerType,true),true), "
-                        + "StructField(boolean,BooleanType,true), "
-                        + "StructField(tinyint,ByteType,true), "
-                        + "StructField(smallint,ShortType,true), "
-                        + "StructField(bigint,LongType,true), "
-                        + "StructField(bytes,BinaryType,true), "
-                        + "StructField(timestamp,TimestampType,true), "
-                        + "StructField(date,DateType,true), "
-                        + "StructField(decimal,DecimalType(2,2),true), "
-                        + "StructField(decimal2,DecimalType(38,2),true), "
+                        + ","
+                        + "StructField(strArray,ArrayType(StringType,true),true),"
+                        + "StructField(intArray,ArrayType(IntegerType,true),true),"
+                        + "StructField(boolean,BooleanType,true),"
+                        + "StructField(tinyint,ByteType,true),"
+                        + "StructField(smallint,ShortType,true),"
+                        + "StructField(bigint,LongType,true),"
+                        + "StructField(bytes,BinaryType,true),"
+                        + "StructField(timestamp,TimestampType,true),"
+                        + "StructField(date,DateType,true),"
+                        + "StructField(decimal,DecimalType(2,2),true),"
+                        + "StructField(decimal2,DecimalType(38,2),true),"
                         + "StructField(decimal3,DecimalType(10,1),true))";
 
         StructType sparkType = fromFlinkRowType(ALL_TYPES);
-        assertThat(sparkType.toString()).isEqualTo(expected);
+        assertThat(sparkType.toString().replace(", ", ",")).isEqualTo(expected);
 
         assertThat(toFlinkType(sparkType)).isEqualTo(ALL_TYPES);
     }