You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2022/11/16 17:05:35 UTC

[spark] branch master updated: [SPARK-39799][SQL] DataSourceV2: View catalog interface

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d12abff7e83 [SPARK-39799][SQL] DataSourceV2: View catalog interface
d12abff7e83 is described below

commit d12abff7e83705fb0e187a79f86b45f99e4b7abb
Author: John Zhuge <jz...@apache.org>
AuthorDate: Wed Nov 16 09:05:14 2022 -0800

    [SPARK-39799][SQL] DataSourceV2: View catalog interface
    
    ### What changes were proposed in this pull request?
    
    ViewCatalog API described in [SPIP](https://docs.google.com/document/d/1XOxFtloiMuW24iqJ-zJnDzHl2KMxipTjJoxleJFz66A/edit?usp=sharing).
    
    ### Why are the changes needed?
    
    First step towards DataSourceV2 view support.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    N/A
    
    Closes #37556 from jzhuge/SPARK-39799.
    
    Authored-by: John Zhuge <jz...@apache.org>
    Signed-off-by: Holden Karau <hk...@netflix.com>
---
 core/src/main/resources/error/error-classes.json   |  15 ++
 .../apache/spark/sql/connector/catalog/View.java   |  74 ++++++++
 .../spark/sql/connector/catalog/ViewCatalog.java   | 188 +++++++++++++++++++++
 .../spark/sql/connector/catalog/ViewChange.java    |  79 +++++++++
 .../catalyst/analysis/AlreadyExistException.scala  |  10 ++
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   2 +-
 .../catalyst/analysis/NoSuchItemException.scala    |   8 +
 7 files changed, 375 insertions(+), 1 deletion(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index d5d6e938ad1..c96cb11874c 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1428,6 +1428,21 @@
       "3. set \"spark.sql.legacy.allowUntypedScalaUDF\" to \"true\" and use this API with caution"
     ]
   },
+  "VIEW_ALREADY_EXISTS" : {
+    "message" : [
+      "Cannot create view <relationName> because it already exists.",
+      "Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects."
+    ],
+    "sqlState" : "42000"
+  },
+  "VIEW_NOT_FOUND" : {
+    "message" : [
+      "The view <relationName> cannot be found. Verify the spelling and correctness of the schema and catalog.",
+      "If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.",
+      "To tolerate the error on drop use DROP VIEW IF EXISTS."
+    ],
+    "sqlState" : "42000"
+  },
   "_LEGACY_ERROR_TEMP_0001" : {
     "message" : [
       "Invalid InsertIntoContext"
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java
new file mode 100644
index 00000000000..a4dc5f2f2d2
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String query();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the view when the view is created after applying column aliases.
+   */
+  StructType schema();
+
+  /**
+   * The output column names of the query that creates this view.
+   */
+  String[] queryColumnNames();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+
+  /**
+   * The view column comments.
+   */
+  String[] columnComments();
+
+  /**
+   * The view properties.
+   */
+  Map<String, String> properties();
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java
new file mode 100644
index 00000000000..eb67b990486
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {
+
+  /**
+   * A reserved property to specify the description of the view.
+   */
+  String PROP_COMMENT = "comment";
+
+  /**
+   * A reserved property to specify the owner of the view.
+   */
+  String PROP_OWNER = "owner";
+
+  /**
+   * A reserved property to specify the software version used to create the view.
+   */
+  String PROP_CREATE_ENGINE_VERSION = "create_engine_version";
+
+  /**
+   * A reserved property to specify the software version used to change the view.
+   */
+  String PROP_ENGINE_VERSION = "engine_version";
+
+  /**
+   * All reserved properties of the view.
+   */
+  List<String> RESERVED_PROPERTIES = Arrays.asList(
+        PROP_COMMENT,
+        PROP_OWNER,
+        PROP_CREATE_ENGINE_VERSION,
+        PROP_ENGINE_VERSION);
+
+  /**
+   * List the views in a namespace from the catalog.
+   * <p>
+   * If the catalog supports tables, this must return identifiers for only views and not tables.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for views
+   * @throws NoSuchNamespaceException If the namespace does not exist (optional).
+   */
+  Identifier[] listViews(String... namespace) throws NoSuchNamespaceException;
+
+  /**
+   * Load view metadata by {@link Identifier ident} from the catalog.
+   * <p>
+   * If the catalog supports tables and contains a table for the identifier and not a view,
+   * this must throw {@link NoSuchViewException}.
+   *
+   * @param ident a view identifier
+   * @return the view description
+   * @throws NoSuchViewException If the view doesn't exist or is a table
+   */
+  View loadView(Identifier ident) throws NoSuchViewException;
+
+  /**
+   * Invalidate cached view metadata for an {@link Identifier identifier}.
+   * <p>
+   * If the view is already loaded or cached, drop cached data. If the view does not exist or is
+   * not cached, do nothing. Calling this method should not query remote services.
+   *
+   * @param ident a view identifier
+   */
+  default void invalidateView(Identifier ident) {
+  }
+
+  /**
+   * Test whether a view exists using an {@link Identifier identifier} from the catalog.
+   * <p>
+   * If the catalog supports views and contains a view for the identifier and not a table,
+   * this must return false.
+   *
+   * @param ident a view identifier
+   * @return true if the view exists, false otherwise
+   */
+  default boolean viewExists(Identifier ident) {
+    try {
+      return loadView(ident) != null;
+    } catch (NoSuchViewException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Create a view in the catalog.
+   *
+   * @param ident a view identifier
+   * @param sql the SQL text that defines the view
+   * @param currentCatalog the current catalog
+   * @param currentNamespace the current namespace
+   * @param schema the view query output schema
+   * @param queryColumnNames the query column names
+   * @param columnAliases the column aliases
+   * @param columnComments the column comments
+   * @param properties the view properties
+   * @return the view created
+   * @throws ViewAlreadyExistsException If a view or table already exists for the identifier
+   * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
+   */
+  View createView(
+      Identifier ident,
+      String sql,
+      String currentCatalog,
+      String[] currentNamespace,
+      StructType schema,
+      String[] queryColumnNames,
+      String[] columnAliases,
+      String[] columnComments,
+      Map<String, String> properties) throws ViewAlreadyExistsException, NoSuchNamespaceException;
+
+  /**
+   * Apply {@link ViewChange changes} to a view in the catalog.
+   * <p>
+   * Implementations may reject the requested changes. If any change is rejected, none of the
+   * changes should be applied to the view.
+   *
+   * @param ident a view identifier
+   * @param changes an array of changes to apply to the view
+   * @return the view altered
+   * @throws NoSuchViewException If the view doesn't exist or is a table.
+   * @throws IllegalArgumentException If any change is rejected by the implementation.
+   */
+  View alterView(Identifier ident, ViewChange... changes)
+      throws NoSuchViewException, IllegalArgumentException;
+
+  /**
+   * Drop a view in the catalog.
+   * <p>
+   * If the catalog supports tables and contains a table for the identifier and not a view, this
+   * must not drop the table and must return false.
+   *
+   * @param ident a view identifier
+   * @return true if a view was deleted, false if no view exists for the identifier
+   */
+  boolean dropView(Identifier ident);
+
+  /**
+   * Rename a view in the catalog.
+   * <p>
+   * If the catalog supports tables and contains a table with the old identifier, this throws
+   * {@link NoSuchViewException}. Additionally, if it contains a table with the new identifier,
+   * this throws {@link ViewAlreadyExistsException}.
+   * <p>
+   * If the catalog does not support view renames between namespaces, it throws
+   * {@link UnsupportedOperationException}.
+   *
+   * @param oldIdent the view identifier of the existing view to rename
+   * @param newIdent the new view identifier of the view
+   * @throws NoSuchViewException If the view to rename doesn't exist or is a table
+   * @throws ViewAlreadyExistsException If the new view name already exists or is a table
+   * @throws UnsupportedOperationException If the namespaces of old and new identifiers do not
+   * match (optional)
+   */
+  void renameView(Identifier oldIdent, Identifier newIdent)
+      throws NoSuchViewException, ViewAlreadyExistsException;
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewChange.java
new file mode 100644
index 00000000000..c94933beed7
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewChange.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * ViewChange subclasses represent requested changes to a view.
+ * These are passed to {@link ViewCatalog#alterView}.
+ */
+@DeveloperApi
+public interface ViewChange {
+
+  /**
+   * Create a ViewChange for setting a table property.
+   *
+   * @param property the property name
+   * @param value the new property value
+   * @return a ViewChange
+   */
+  static ViewChange setProperty(String property, String value) {
+    return new SetProperty(property, value);
+  }
+
+  /**
+   * Create a ViewChange for removing a table property.
+   *
+   * @param property the property name
+   * @return a ViewChange
+   */
+  static ViewChange removeProperty(String property) {
+    return new RemoveProperty(property);
+  }
+
+  final class SetProperty implements ViewChange {
+    private final String property;
+    private final String value;
+
+    private SetProperty(String property, String value) {
+      this.property = property;
+      this.value = value;
+    }
+
+    public String property() {
+      return property;
+    }
+
+    public String value() {
+      return value;
+    }
+  }
+
+  final class RemoveProperty implements ViewChange {
+    private final String property;
+
+    private RemoveProperty(String property) {
+      this.property = property;
+    }
+
+    public String property() {
+      return property;
+    }
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
index a047b187dbf..1b5dca840d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -21,6 +21,8 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.util.{quoteIdentifier, quoteNameParts }
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -71,6 +73,14 @@ class TempTableAlreadyExistsException(errorClass: String, messageParameters: Map
   }
 }
 
+class ViewAlreadyExistsException(errorClass: String, messageParameters: Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {
+
+  def this(ident: Identifier) =
+    this(errorClass = "VIEW_ALREADY_EXISTS",
+      messageParameters = Map("relationName" -> ident.quoted))
+}
+
 class PartitionAlreadyExistsException(errorClass: String, messageParameters: Map[String, String])
   extends AnalysisException(errorClass, messageParameters) {
   def this(db: String, table: String, spec: TablePartitionSpec) = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 40338a40e99..b84a03e77d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, StringUtils}
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.connector.catalog.{View => _, _}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition}
 import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, ScalarFunction, UnboundFunction}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index e8f8556fec9..f6624126e94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -62,6 +62,14 @@ class NoSuchTableException(errorClass: String, messageParameters: Map[String, St
   }
 }
 
+class NoSuchViewException(errorClass: String, messageParameters: Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {
+
+  def this(ident: Identifier) =
+    this(errorClass = "VIEW_NOT_FOUND",
+      messageParameters = Map("relationName" -> ident.quoted))
+}
+
 class NoSuchPartitionException(errorClass: String, messageParameters: Map[String, String])
   extends AnalysisException(errorClass, messageParameters) {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org