You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "LuciferYang (via GitHub)" <gi...@apache.org> on 2023/03/15 10:22:17 UTC

[GitHub] [spark] LuciferYang opened a new pull request, #40438: [WIP][CONNECT] Catalog

LuciferYang opened a new pull request, #40438:
URL: https://github.com/apache/spark/pull/40438

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140593380


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -129,6 +130,9 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
+      // Catalog
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),

Review Comment:
   Sure we can followup once that PR is merged. No need to be blocked on that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40438:
URL: https://github.com/apache/spark/pull/40438#issuecomment-1487969530

   friendly ping @HyukjinKwon @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140540989


##########
connector/connect/common/src/test/resources/query-tests/explain-results/createTable_with_schema.explain:
##########
@@ -0,0 +1,2 @@
+SubqueryAlias spark_catalog.default.test_parquet_table

Review Comment:
   Do we need such the way to test `catalog.createTable` API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1156610416


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.5.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+    // catalog.
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.5.0
+   */
+  override def listDatabases(): Dataset[Database] = {
+    sparkSession.newDataset(CatalogImpl.databaseEncoder) { builder =>
+      builder.getCatalogBuilder.getListDatabasesBuilder
+    }
+  }
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.5.0
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    sparkSession.newDataset(CatalogImpl.tableEncoder) { builder =>
+      builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.5.0
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    sparkSession.newDataset(CatalogImpl.functionEncoder) { builder =>
+      builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder
+        .setTableName(tableName)
+        .setDbName(dbName)
+    }
+  }
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.5.0
+   */
+  override def getDatabase(dbName: String): Database = {
+    sparkSession
+      .newDataset(CatalogImpl.databaseEncoder) { builder =>
+        builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  override def getTable(tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.5.0
+   */
+  override def getTable(dbName: String, tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.5.0
+   */
+  override def getFunction(functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.5.0
+   */
+  override def getFunction(dbName: String, functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.5.0
+   */
+  override def databaseExists(dbName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDatabaseExistsBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  override def tableExists(tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.5.0
+   */
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.5.0
+   */
+  override def functionExists(functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.5.0
+   */
+  override def functionExists(dbName: String, functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(tableName: String, path: String): DataFrame = {

Review Comment:
   Also a side effect. This needs to be executed eagerly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138912177


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138348490


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")

Review Comment:
   `@throws[AnalysisException]` cannot be removed, otherwise it will be judged as incompatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140552011


##########
connector/connect/common/src/test/resources/query-tests/explain-results/createTable_with_schema.explain:
##########
@@ -0,0 +1,2 @@
+SubqueryAlias spark_catalog.default.test_parquet_table

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40438:
URL: https://github.com/apache/spark/pull/40438#issuecomment-1494305806

   @LuciferYang thanks for doing this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1147002969


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/interface.scala:
##########


Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140549152


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -129,6 +130,9 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
+      // Catalog
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),

Review Comment:
   Thanks for your information. If that PR is merged first, we can also add the support in this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138350222


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def getDatabase(dbName: String): Database
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def getTable(tableName: String): Table
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def getTable(dbName: String, tableName: String): Table
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("function does not exist")
+  def getFunction(functionName: String): Function
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or function does not exist")
+  def getFunction(dbName: String, functionName: String): Function
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.4.0
+   */
+  def databaseExists(dbName: String): Boolean
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  def tableExists(tableName: String): Boolean
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.4.0
+   */
+  def tableExists(dbName: String, tableName: String): Boolean
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  def functionExists(functionName: String): Boolean
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.4.0
+   */
+  def functionExists(dbName: String, functionName: String): Boolean
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.4.0
+   */
+  @deprecated("use createTable instead.", "2.2.0")

Review Comment:
   6. still need these `@deprecated` methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support
URL: https://github.com/apache/spark/pull/40438


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][SPARK-42806][CONNECT] Catalog

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1136832569


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import io.grpc.StatusRuntimeException
+
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+
+class CatalogSuite extends RemoteSparkSession with SQLHelper {
+
+  test("Database APIs") {
+    val currentDb = spark.catalog.currentDatabase
+    assert(currentDb == "default")
+    withTempDatabase { db =>
+      try {
+        spark.catalog.setCurrentDatabase(db)
+        assert(spark.catalog.currentDatabase == db)
+        val dbs = spark.catalog.listDatabases().collect().sortBy(_.name)
+        assert(dbs.length == 2)
+        assert(dbs.map(_.name) sameElements Array(db, currentDb))
+        assert(dbs.map(_.catalog).distinct sameElements Array("spark_catalog"))
+        val database = spark.catalog.getDatabase(db)
+        assert(database.name == db)
+        val message = intercept[StatusRuntimeException] {
+          spark.catalog.getDatabase("notExists")
+        }.getMessage
+        assert(message.contains("SCHEMA_NOT_FOUND"))
+        assert(spark.catalog.databaseExists(db))
+        assert(!spark.catalog.databaseExists("notExists"))
+      } finally {
+        spark.catalog.setCurrentDatabase(currentDb)
+        assert(spark.catalog.currentDatabase == currentDb)
+      }
+    }
+  }
+
+  test("CatalogMetadata APIs") {
+    val currentCatalog = spark.catalog.currentCatalog()
+    assert(currentCatalog == "spark_catalog")
+    try {
+      spark.catalog.setCurrentCatalog("spark_catalog")
+      val message = intercept[StatusRuntimeException] {
+        spark.catalog.setCurrentCatalog("notExists")
+      }.getMessage
+      assert(message.contains("plugin class not found"))
+      val catalogs = spark.catalog.listCatalogs().collect()
+      assert(catalogs.length == 1)
+      assert(catalogs.map(_.name) sameElements Array("spark_catalog"))
+    } finally {
+      spark.catalog.setCurrentCatalog(currentCatalog)
+    }
+  }
+}

Review Comment:
   will add more tests to tomorrow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140542893


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -129,6 +130,9 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
+      // Catalog
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),

Review Comment:
   As I said i pr description,  cacheTable with StorageLevel not implemented in this pr due to  a new  proto message needs to be added, which will be implemented in a follow up
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140543879


##########
connector/connect/common/src/test/resources/query-tests/explain-results/createTable_with_schema.explain:
##########
@@ -0,0 +1,2 @@
+SubqueryAlias spark_catalog.default.test_parquet_table

Review Comment:
   If not, let me remove this test case
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1145682342


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -129,6 +130,9 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
+      // Catalog
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),

Review Comment:
   Currently, all APIs have been implemented
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140540989


##########
connector/connect/common/src/test/resources/query-tests/explain-results/createTable_with_schema.explain:
##########
@@ -0,0 +1,2 @@
+SubqueryAlias spark_catalog.default.test_parquet_table

Review Comment:
   Do we need such the way to test `catalog.createTable` API?  Maybe just test in CatalogSuite?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140549152


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -129,6 +130,9 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
+      // Catalog
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),

Review Comment:
   Thanks for your information. If that PR is merged first, we can also add the support in this one.
   
   Of course, this is not coupled, and it can also be implemented in a followup
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138342205


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.StructType
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.4.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {

Review Comment:
   2. There are many `Unit` return type function, Is there a better way to trigger calculation except `.count()` or `.collect()`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138885942


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.StructType
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.4.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {

Review Comment:
   change to use `.withResult(_.length)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138926388


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.StructType
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.4.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+    // catalog.
+    sparkSession
+      .newDataFrame { builder =>
+        builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName)
+      }
+      .count()
+  }
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  override def listDatabases(): Dataset[Database] = {
+    sparkSession.newDataset(CatalogImpl.databaseEncoder) { builder =>
+      builder.getCatalogBuilder.getListDatabasesBuilder
+    }
+  }
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    sparkSession.newDataset(CatalogImpl.tableEncoder) { builder =>
+      builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    sparkSession.newDataset(CatalogImpl.functionEncoder) { builder =>
+      builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder
+        .setTableName(tableName)
+        .setDbName(dbName)
+    }
+  }
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  override def getDatabase(dbName: String): Database = {
+    sparkSession
+      .newDataset(CatalogImpl.databaseEncoder) { builder =>
+        builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  override def getTable(tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.4.0
+   */
+  override def getTable(dbName: String, tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  override def getFunction(functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.4.0
+   */
+  override def getFunction(dbName: String, functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.4.0
+   */
+  override def databaseExists(dbName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDatabaseExistsBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  override def tableExists(tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.4.0
+   */
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  override def functionExists(functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.4.0
+   */
+  override def functionExists(dbName: String, functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.4.0
+   */
+  override def createTable(tableName: String, path: String): DataFrame = {
+    sparkSession.newDataFrame { builder =>

Review Comment:
   Yeah, planner side has already done this
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40438:
URL: https://github.com/apache/spark/pull/40438#issuecomment-1493532096

   Thanks @HyukjinKwon @hvanhovell @amaliujia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #40438:
URL: https://github.com/apache/spark/pull/40438#issuecomment-1495119529

   BTW, @hvanhovell please take another look for a posthoc review. I just took a look to make sure the impl is matched with PySpark side so I might have missed few things.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][CONNECT] Catalog

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1136832569


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import io.grpc.StatusRuntimeException
+
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+
+class CatalogSuite extends RemoteSparkSession with SQLHelper {
+
+  test("Database APIs") {
+    val currentDb = spark.catalog.currentDatabase
+    assert(currentDb == "default")
+    withTempDatabase { db =>
+      try {
+        spark.catalog.setCurrentDatabase(db)
+        assert(spark.catalog.currentDatabase == db)
+        val dbs = spark.catalog.listDatabases().collect().sortBy(_.name)
+        assert(dbs.length == 2)
+        assert(dbs.map(_.name) sameElements Array(db, currentDb))
+        assert(dbs.map(_.catalog).distinct sameElements Array("spark_catalog"))
+        val database = spark.catalog.getDatabase(db)
+        assert(database.name == db)
+        val message = intercept[StatusRuntimeException] {
+          spark.catalog.getDatabase("notExists")
+        }.getMessage
+        assert(message.contains("SCHEMA_NOT_FOUND"))
+        assert(spark.catalog.databaseExists(db))
+        assert(!spark.catalog.databaseExists("notExists"))
+      } finally {
+        spark.catalog.setCurrentDatabase(currentDb)
+        assert(spark.catalog.currentDatabase == currentDb)
+      }
+    }
+  }
+
+  test("CatalogMetadata APIs") {
+    val currentCatalog = spark.catalog.currentCatalog()
+    assert(currentCatalog == "spark_catalog")
+    try {
+      spark.catalog.setCurrentCatalog("spark_catalog")
+      val message = intercept[StatusRuntimeException] {
+        spark.catalog.setCurrentCatalog("notExists")
+      }.getMessage
+      assert(message.contains("plugin class not found"))
+      val catalogs = spark.catalog.listCatalogs().collect()
+      assert(catalogs.length == 1)
+      assert(catalogs.map(_.name) sameElements Array("spark_catalog"))
+    } finally {
+      spark.catalog.setCurrentCatalog(currentCatalog)
+    }
+  }
+}

Review Comment:
   need more tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on PR #40438:
URL: https://github.com/apache/spark/pull/40438#issuecomment-1474260861

   Overall LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138481493


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -272,6 +274,14 @@ class SparkSession private[sql] (
    */
   def read: DataFrameReader = new DataFrameReader(this)
 
+  /**
+   * Interface through which the user may create, drop, alter or query underlying databases,
+   * tables, functions etc.
+   *
+   * @since 3.4.0

Review Comment:
   Let's make it 3.5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1146672726


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/interface.scala:
##########


Review Comment:
   Can you update the binary compatibility checks to include these classes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #40438:
URL: https://github.com/apache/spark/pull/40438#issuecomment-1493471258

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1156609934


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.5.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+    // catalog.
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.5.0
+   */
+  override def listDatabases(): Dataset[Database] = {
+    sparkSession.newDataset(CatalogImpl.databaseEncoder) { builder =>
+      builder.getCatalogBuilder.getListDatabasesBuilder
+    }
+  }
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.5.0
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    sparkSession.newDataset(CatalogImpl.tableEncoder) { builder =>
+      builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.5.0
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    sparkSession.newDataset(CatalogImpl.functionEncoder) { builder =>
+      builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder
+        .setTableName(tableName)
+        .setDbName(dbName)
+    }
+  }
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.5.0
+   */
+  override def getDatabase(dbName: String): Database = {
+    sparkSession
+      .newDataset(CatalogImpl.databaseEncoder) { builder =>
+        builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  override def getTable(tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.5.0
+   */
+  override def getTable(dbName: String, tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.5.0
+   */
+  override def getFunction(functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.5.0
+   */
+  override def getFunction(dbName: String, functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.5.0
+   */
+  override def databaseExists(dbName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDatabaseExistsBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  override def tableExists(tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.5.0
+   */
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.5.0
+   */
+  override def functionExists(functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.5.0
+   */
+  override def functionExists(dbName: String, functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(tableName: String, path: String): DataFrame = {
+    sparkSession.newDataFrame { builder =>
+      builder.getCatalogBuilder.getCreateTableBuilder
+        .setTableName(tableName)
+        .setSchema(DataTypeProtoConverter.toConnectProtoType(new StructType))
+        .setDescription("")
+        .putOptions("path", path)
+    }
+  }
+
+  /**
+   * Creates a table from the given path based on a data source and returns the corresponding
+   * DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(tableName: String, path: String, source: String): DataFrame = {
+    createTable(tableName, source, Map("path" -> path))
+  }
+
+  /**
+   * (Scala-specific) Creates a table based on the dataset in a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(
+      tableName: String,
+      source: String,
+      options: Map[String, String]): DataFrame = {
+    createTable(tableName, source, new StructType, options)
+  }
+
+  /**
+   * (Scala-specific) Creates a table based on the dataset in a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(
+      tableName: String,
+      source: String,
+      description: String,
+      options: Map[String, String]): DataFrame = {
+    createTable(tableName, source, new StructType, description, options)
+  }
+
+  /**
+   * (Scala-specific) Create a table based on the dataset in a data source, a schema and a set of
+   * options. Then, returns the corresponding DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): DataFrame = {
+    createTable(
+      tableName = tableName,
+      source = source,
+      schema = schema,
+      description = "",
+      options = options)
+  }
+
+  /**
+   * (Scala-specific) Create a table based on the dataset in a data source, a schema and a set of
+   * options. Then, returns the corresponding DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      description: String,
+      options: Map[String, String]): DataFrame = {
+    sparkSession.newDataFrame { builder =>
+      val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder
+        .setTableName(tableName)
+        .setSource(source)
+        .setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
+        .setDescription(description)
+      options.foreach { case (k, v) =>
+        createTableBuilder.putOptions(k, v)
+      }
+    }
+  }
+
+  /**
+   * Drops the local temporary view with the given view name in the catalog. If the view has been
+   * cached before, then it will also be uncached.
+   *
+   * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
+   * created it, i.e. it will be automatically dropped when the session terminates. It's not tied
+   * to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+   *
+   * Note that, the return type of this method was Unit in Spark 2.0, but changed to Boolean in
+   * Spark 2.1.
+   *
+   * @param viewName
+   *   the name of the temporary view to be dropped.
+   * @return
+   *   true if the view is dropped successfully, false otherwise.
+   * @since 3.5.0
+   */
+  override def dropTempView(viewName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDropTempViewBuilder.setViewName(viewName)
+      }
+      .head()
+  }
+
+  /**
+   * Drops the global temporary view with the given view name in the catalog. If the view has been
+   * cached before, then it will also be uncached.
+   *
+   * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark
+   * application,
+   * i.e. it will be automatically dropped when the application terminates. It's tied to a system
+   * preserved database `global_temp`, and we must use the qualified name to refer a global temp
+   * view, e.g. `SELECT * FROM global_temp.view1`.
+   *
+   * @param viewName
+   *   the unqualified name of the temporary view to be dropped.
+   * @return
+   *   true if the view is dropped successfully, false otherwise.
+   * @since 3.5.0
+   */
+  override def dropGlobalTempView(viewName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDropGlobalTempViewBuilder.setViewName(viewName)
+      }
+      .head()
+  }
+
+  /**
+   * Recovers all the partitions in the directory of a table and update the catalog. Only works
+   * with a partitioned table, and not a view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def recoverPartitions(tableName: String): Unit = {
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getRecoverPartitionsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns true if the table is currently cached in-memory.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. If no database
+   *   identifier is provided, it refers to a temporary view or a table/view in the current
+   *   database.
+   * @since 3.5.0
+   */
+  override def isCached(tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getIsCachedBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Caches the specified table in-memory.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. If no database
+   *   identifier is provided, it refers to a temporary view or a table/view in the current
+   *   database.
+   * @since 3.5.0
+   */
+  override def cacheTable(tableName: String): Unit = {
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getCacheTableBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Caches the specified table or view with the given storage level.
+   *
+   * @group cachemgmt
+   * @since 3.4.0
+   */
+  override def cacheTable(tableName: String, storageLevel: StorageLevel): Unit = {
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getCacheTableBuilder
+        .setTableName(tableName)
+        .setStorageLevel(StorageLevelProtoConverter.toConnectProtoType(storageLevel))
+    }
+  }
+
+  /**
+   * Removes the specified table from the in-memory cache.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. If no database
+   *   identifier is provided, it refers to a temporary view or a table/view in the current
+   *   database.
+   * @since 3.5.0
+   */
+  override def uncacheTable(tableName: String): Unit = {
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getUncacheTableBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Removes all cached tables from the in-memory cache.
+   *
+   * @since 3.5.0
+   */
+  override def clearCache(): Unit = {
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getClearCacheBuilder
+    }
+  }
+
+  /**
+   * Invalidates and refreshes all the cached data and metadata of the given table. For
+   * performance reasons, Spark SQL or the external data source library it uses might cache
+   * certain metadata about a table, such as the location of blocks. When those change outside of
+   * Spark SQL, users should call this function to invalidate the cache.
+   *
+   * If this table is cached as an InMemoryRelation, drop the original cached version and make the
+   * new version cached lazily.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. If no database
+   *   identifier is provided, it refers to a temporary view or a table/view in the current
+   *   database.
+   * @since 3.5.0
+   */
+  override def refreshTable(tableName: String): Unit = {
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getRefreshTableBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Invalidates and refreshes all the cached data (and the associated metadata) for any `Dataset`
+   * that contains the given data source path. Path matching is by prefix, i.e. "/" would
+   * invalidate everything that is cached.
+   *
+   * @since 3.5.0
+   */
+  override def refreshByPath(path: String): Unit = {
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getRefreshByPathBuilder.setPath(path)
+    }
+  }
+
+  /**
+   * Returns the current catalog in this session.
+   *
+   * @since 3.5.0
+   */
+  override def currentCatalog(): String = sparkSession
+    .newDataset(StringEncoder) { builder =>
+      builder.getCatalogBuilder.getCurrentCatalogBuilder
+    }
+    .head()
+
+  /**
+   * Sets the current catalog in this session.
+   *
+   * @since 3.5.0
+   */
+  override def setCurrentCatalog(catalogName: String): Unit =
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getSetCurrentCatalogBuilder.setCatalogName(catalogName)
+    }
+
+  /**
+   * Returns a list of catalogs available in this session.
+   *
+   * @since 3.5.0
+   */
+  override def listCatalogs(): Dataset[CatalogMetadata] =
+    sparkSession
+      .newDataset(CatalogImpl.catalogEncoder) { builder =>
+        builder.getCatalogBuilder.getListCatalogsBuilder
+      }
+}
+
+private object CatalogImpl {
+  private val databaseEncoder: AgnosticEncoder[Database] = ScalaReflection
+    .encoderFor(ScalaReflection.localTypeOf[Database])

Review Comment:
   Why doesn't `ScalaReflection.encoderFor[Database]` work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1156607869


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.5.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+    // catalog.
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.5.0
+   */
+  override def listDatabases(): Dataset[Database] = {
+    sparkSession.newDataset(CatalogImpl.databaseEncoder) { builder =>
+      builder.getCatalogBuilder.getListDatabasesBuilder
+    }
+  }
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.5.0
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    sparkSession.newDataset(CatalogImpl.tableEncoder) { builder =>
+      builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.5.0
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    sparkSession.newDataset(CatalogImpl.functionEncoder) { builder =>
+      builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder
+        .setTableName(tableName)
+        .setDbName(dbName)
+    }
+  }
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.5.0
+   */
+  override def getDatabase(dbName: String): Database = {
+    sparkSession
+      .newDataset(CatalogImpl.databaseEncoder) { builder =>
+        builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  override def getTable(tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.5.0
+   */
+  override def getTable(dbName: String, tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.5.0
+   */
+  override def getFunction(functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.5.0
+   */
+  override def getFunction(dbName: String, functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.5.0
+   */
+  override def databaseExists(dbName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDatabaseExistsBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  override def tableExists(tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.5.0
+   */
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.5.0
+   */
+  override def functionExists(functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.5.0
+   */
+  override def functionExists(dbName: String, functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(tableName: String, path: String): DataFrame = {
+    sparkSession.newDataFrame { builder =>
+      builder.getCatalogBuilder.getCreateTableBuilder
+        .setTableName(tableName)
+        .setSchema(DataTypeProtoConverter.toConnectProtoType(new StructType))
+        .setDescription("")
+        .putOptions("path", path)
+    }
+  }
+
+  /**
+   * Creates a table from the given path based on a data source and returns the corresponding
+   * DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(tableName: String, path: String, source: String): DataFrame = {
+    createTable(tableName, source, Map("path" -> path))
+  }
+
+  /**
+   * (Scala-specific) Creates a table based on the dataset in a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(
+      tableName: String,
+      source: String,
+      options: Map[String, String]): DataFrame = {
+    createTable(tableName, source, new StructType, options)
+  }
+
+  /**
+   * (Scala-specific) Creates a table based on the dataset in a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(
+      tableName: String,
+      source: String,
+      description: String,
+      options: Map[String, String]): DataFrame = {
+    createTable(tableName, source, new StructType, description, options)
+  }
+
+  /**
+   * (Scala-specific) Create a table based on the dataset in a data source, a schema and a set of
+   * options. Then, returns the corresponding DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): DataFrame = {
+    createTable(
+      tableName = tableName,
+      source = source,
+      schema = schema,
+      description = "",
+      options = options)
+  }
+
+  /**
+   * (Scala-specific) Create a table based on the dataset in a data source, a schema and a set of
+   * options. Then, returns the corresponding DataFrame.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.5.0
+   */
+  override def createTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      description: String,
+      options: Map[String, String]): DataFrame = {
+    sparkSession.newDataFrame { builder =>

Review Comment:
   @LuciferYang this is a side effect. This should be executed eagerly, and not lazily. Can you follow-up on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1156608735


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.5.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+    // catalog.
+    sparkSession.execute { builder =>
+      builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.5.0
+   */
+  override def listDatabases(): Dataset[Database] = {
+    sparkSession.newDataset(CatalogImpl.databaseEncoder) { builder =>
+      builder.getCatalogBuilder.getListDatabasesBuilder
+    }
+  }
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.5.0
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    sparkSession.newDataset(CatalogImpl.tableEncoder) { builder =>
+      builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.5.0
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    sparkSession.newDataset(CatalogImpl.functionEncoder) { builder =>
+      builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.5.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder
+        .setTableName(tableName)
+        .setDbName(dbName)
+    }
+  }
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.5.0
+   */
+  override def getDatabase(dbName: String): Database = {
+    sparkSession
+      .newDataset(CatalogImpl.databaseEncoder) { builder =>
+        builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.5.0
+   */
+  override def getTable(tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.5.0
+   */
+  override def getTable(dbName: String, tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.5.0
+   */
+  override def getFunction(functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.5.0
+   */
+  override def getFunction(dbName: String, functionName: String): Function = {
+    sparkSession

Review Comment:
   Can you create a version of execute that can return an actual result. Now we have the pattern `newDataset() { .. }.head()` everywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140546104


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -129,6 +130,9 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
+      // Catalog
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),

Review Comment:
   oops yes. I thought `StorageLevel` was solved but now I remember there is a PR to address that: https://github.com/apache/spark/pull/40015



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138348490


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")

Review Comment:
   5. `@throws[AnalysisException]` cannot be removed, otherwise it will be checked as incompatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138910434


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def getDatabase(dbName: String): Database
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def getTable(tableName: String): Table
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def getTable(dbName: String, tableName: String): Table
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("function does not exist")
+  def getFunction(functionName: String): Function
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or function does not exist")
+  def getFunction(dbName: String, functionName: String): Function
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.4.0
+   */
+  def databaseExists(dbName: String): Boolean
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  def tableExists(tableName: String): Boolean
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.4.0
+   */
+  def tableExists(dbName: String, tableName: String): Boolean
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  def functionExists(functionName: String): Boolean
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.4.0
+   */
+  def functionExists(dbName: String, functionName: String): Boolean
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.4.0
+   */
+  @deprecated("use createTable instead.", "2.2.0")

Review Comment:
   Yeah let's keep them for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138916077


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.StructType
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.4.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+    // catalog.
+    sparkSession
+      .newDataFrame { builder =>
+        builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName)
+      }
+      .count()
+  }
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  override def listDatabases(): Dataset[Database] = {
+    sparkSession.newDataset(CatalogImpl.databaseEncoder) { builder =>
+      builder.getCatalogBuilder.getListDatabasesBuilder
+    }
+  }
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    sparkSession.newDataset(CatalogImpl.tableEncoder) { builder =>
+      builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    sparkSession.newDataset(CatalogImpl.functionEncoder) { builder =>
+      builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder
+        .setTableName(tableName)
+        .setDbName(dbName)
+    }
+  }
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  override def getDatabase(dbName: String): Database = {
+    sparkSession
+      .newDataset(CatalogImpl.databaseEncoder) { builder =>
+        builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  override def getTable(tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.4.0
+   */
+  override def getTable(dbName: String, tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  override def getFunction(functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.4.0
+   */
+  override def getFunction(dbName: String, functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.4.0
+   */
+  override def databaseExists(dbName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDatabaseExistsBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  override def tableExists(tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.4.0
+   */
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  override def functionExists(functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.4.0
+   */
+  override def functionExists(dbName: String, functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.4.0
+   */
+  override def createTable(tableName: String, path: String): DataFrame = {
+    sparkSession.newDataFrame { builder =>

Review Comment:
   This is something that should be done on the planner side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138337999


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -272,6 +274,14 @@ class SparkSession private[sql] (
    */
   def read: DataFrameReader = new DataFrameReader(this)
 
+  /**
+   * Interface through which the user may create, drop, alter or query underlying databases,
+   * tables, functions etc.
+   *
+   * @since 3.4.0

Review Comment:
   1. still `@since 3.4.0` or `@since 3.5.0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138499467


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -272,6 +274,14 @@ class SparkSession private[sql] (
    */
   def read: DataFrameReader = new DataFrameReader(this)
 
+  /**
+   * Interface through which the user may create, drop, alter or query underlying databases,
+   * tables, functions etc.
+   *
+   * @since 3.4.0

Review Comment:
   ok



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.StructType
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.4.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {

Review Comment:
   2. There are many `Unit` return type function, Is there a better way to trigger compute except `.count()` or `.collect()`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138348490


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")

Review Comment:
   5. `@throws[AnalysisException]` cannot be removed, otherwise it will be judged as incompatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40438: [SPARK-42806][SPARK-42811][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40438:
URL: https://github.com/apache/spark/pull/40438#issuecomment-1487970449

   rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138911430


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def getDatabase(dbName: String): Database
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def getTable(tableName: String): Table
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def getTable(dbName: String, tableName: String): Table
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("function does not exist")
+  def getFunction(functionName: String): Function
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or function does not exist")
+  def getFunction(dbName: String, functionName: String): Function
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.4.0
+   */
+  def databaseExists(dbName: String): Boolean
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  def tableExists(tableName: String): Boolean
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.4.0
+   */
+  def tableExists(dbName: String, tableName: String): Boolean
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  def functionExists(functionName: String): Boolean
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.4.0
+   */
+  def functionExists(dbName: String, functionName: String): Boolean
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.4.0
+   */
+  @deprecated("use createTable instead.", "2.2.0")

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138909855


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {
+
+  /**
+   * Returns the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def currentDatabase: String
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  def setCurrentDatabase(dbName: String): Unit
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  def listDatabases(): Dataset[Database]
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  def listTables(): Dataset[Table]
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listTables(dbName: String): Dataset[Table]
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  def listFunctions(): Dataset[Function]
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def listFunctions(dbName: String): Dataset[Function]
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def listColumns(tableName: String): Dataset[Column]
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def listColumns(dbName: String, tableName: String): Dataset[Column]
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")

Review Comment:
   Yeah we will need to do something about this in a bit. It is fine for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138345170


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.StructType
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.4.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+    // catalog.
+    sparkSession
+      .newDataFrame { builder =>
+        builder.getCatalogBuilder.getSetCurrentDatabaseBuilder.setDbName(dbName)
+      }
+      .count()
+  }
+
+  /**
+   * Returns a list of databases (namespaces) available within the current catalog.
+   *
+   * @since 3.4.0
+   */
+  override def listDatabases(): Dataset[Database] = {
+    sparkSession.newDataset(CatalogImpl.databaseEncoder) { builder =>
+      builder.getCatalogBuilder.getListDatabasesBuilder
+    }
+  }
+
+  /**
+   * Returns a list of tables/views in the current database (namespace). This includes all
+   * temporary views.
+   *
+   * @since 3.4.0
+   */
+  override def listTables(): Dataset[Table] = {
+    listTables(currentDatabase)
+  }
+
+  /**
+   * Returns a list of tables/views in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all temporary views.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listTables(dbName: String): Dataset[Table] = {
+    sparkSession.newDataset(CatalogImpl.tableEncoder) { builder =>
+      builder.getCatalogBuilder.getListTablesBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of functions registered in the current database (namespace). This includes all
+   * temporary functions.
+   *
+   * @since 3.4.0
+   */
+  override def listFunctions(): Dataset[Function] = {
+    listFunctions(currentDatabase)
+  }
+
+  /**
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog). This includes all built-in and temporary functions.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listFunctions(dbName: String): Dataset[Function] = {
+    sparkSession.newDataset(CatalogImpl.functionEncoder) { builder =>
+      builder.getCatalogBuilder.getListFunctionsBuilder.setDbName(dbName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view or temporary view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  override def listColumns(tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder.setTableName(tableName)
+    }
+  }
+
+  /**
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
+   *
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table/view.
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
+    sparkSession.newDataset(CatalogImpl.columnEncoder) { builder =>
+      builder.getCatalogBuilder.getListColumnsBuilder
+        .setTableName(tableName)
+        .setDbName(dbName)
+    }
+  }
+
+  /**
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
+   *
+   * @since 3.4.0
+   */
+  override def getDatabase(dbName: String): Database = {
+    sparkSession
+      .newDataset(CatalogImpl.databaseEncoder) { builder =>
+        builder.getCatalogBuilder.getGetDatabaseBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name. This table can be a temporary view or a
+   * table/view. This throws an AnalysisException when no Table can be found.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  override def getTable(tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
+   *
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified
+   * table/view name instead.
+   *
+   * @since 3.4.0
+   */
+  override def getTable(dbName: String, tableName: String): Table = {
+    sparkSession
+      .newDataset(CatalogImpl.tableEncoder) { builder =>
+        builder.getCatalogBuilder.getGetTableBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name. This function can be a temporary function or a
+   * function. This throws an AnalysisException when the function cannot be found.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  override def getFunction(functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
+   *
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function in the specified database
+   * @since 3.4.0
+   */
+  override def getFunction(dbName: String, functionName: String): Function = {
+    sparkSession
+      .newDataset(CatalogImpl.functionEncoder) { builder =>
+        builder.getCatalogBuilder.getGetFunctionBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
+   *
+   * @since 3.4.0
+   */
+  override def databaseExists(dbName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getDatabaseExistsBuilder.setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists. This can either be a temporary
+   * view or a table/view.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table/view. It follows the same
+   *   resolution rule with SQL: search for temp views first then table/views in the current
+   *   database (namespace).
+   * @since 3.4.0
+   */
+  override def tableExists(tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder.setTableName(tableName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
+   *
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param tableName
+   *   is an unqualified name that designates a table.
+   * @since 3.4.0
+   */
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getTableExistsBuilder
+          .setTableName(tableName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function.
+   *
+   * @param functionName
+   *   is either a qualified or unqualified name that designates a function. It follows the same
+   *   resolution rule with SQL: search for built-in/temp functions first then functions in the
+   *   current database (namespace).
+   * @since 3.4.0
+   */
+  override def functionExists(functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder.setFunctionName(functionName)
+      }
+      .head()
+  }
+
+  /**
+   * Check if the function with the specified name exists in the specified database under the Hive
+   * Metastore.
+   *
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
+   *
+   * @param dbName
+   *   is an unqualified name that designates a database.
+   * @param functionName
+   *   is an unqualified name that designates a function.
+   * @since 3.4.0
+   */
+  override def functionExists(dbName: String, functionName: String): Boolean = {
+    sparkSession
+      .newDataset(PrimitiveBooleanEncoder) { builder =>
+        builder.getCatalogBuilder.getFunctionExistsBuilder
+          .setFunctionName(functionName)
+          .setDbName(dbName)
+      }
+      .head()
+  }
+
+  /**
+   * Creates a table from the given path and returns the corresponding DataFrame. It will use the
+   * default data source configured by spark.sql.sources.default.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table. If no database
+   *   identifier is provided, it refers to a table in the current database.
+   * @since 3.4.0
+   */
+  override def createTable(tableName: String, path: String): DataFrame = {
+    sparkSession.newDataFrame { builder =>

Review Comment:
   https://github.com/apache/spark/blob/3c7ef7d6135a33448e9b08902f4b5582ae2d60c4/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala#L518-L528
   
   There is no way to get `dataSourceName` from `sparkSession.sessionState.conf` now. Is the current implementation ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [WIP][SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1138347356


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala:
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
+ *
+ * @since 3.4.0
+ */
+abstract class Catalog {

Review Comment:
   4. `Catalog` and `interface.scala` are in the sql module. these are copy
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1139991361


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, StringEncoder}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.StructType
+
+class CatalogImpl(sparkSession: SparkSession) extends Catalog {
+
+  /**
+   * Returns the current default database in this session.
+   *
+   * @since 3.4.0
+   */
+  override def currentDatabase: String =
+    sparkSession
+      .newDataset(StringEncoder) { builder =>
+        builder.getCatalogBuilder.getCurrentDatabaseBuilder
+      }
+      .head()
+
+  /**
+   * Sets the current database (namespace) in this session.
+   *
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  override def setCurrentDatabase(dbName: String): Unit = {

Review Comment:
   For the `Unit` return type function, I add a new  `execute` function to `SparkSession` as follows:
   
   ```
     private[sql] def execute(f: proto.Relation.Builder => Unit): Unit = {
       val builder = proto.Relation.newBuilder()
       f(builder)
       builder.getCommonBuilder.setPlanId(planIdGenerator.getAndIncrement())
       val plan = proto.Plan.newBuilder().setRoot(builder).build()
       client.execute(plan).asScala.foreach(_ => ())
     }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1140539571


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##########
@@ -129,6 +130,9 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
+      // Catalog
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),

Review Comment:
   Why this is excluded? What is the incompatibility? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40438:
URL: https://github.com/apache/spark/pull/40438#issuecomment-1474261809

   Thanks @amaliujia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40438: [SPARK-42806][CONNECT] Add `Catalog` support

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40438:
URL: https://github.com/apache/spark/pull/40438#discussion_r1146672726


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/interface.scala:
##########


Review Comment:
   Can you update the binary compatibility checks to include this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org