You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/19 20:41:03 UTC

[GitHub] [spark] aokolnychyi commented on a diff in pull request #36995: [SPARK-39607][SQL][DSV2] Distribution and ordering support V2 function in writing

aokolnychyi commented on code in PR #36995:
URL: https://github.com/apache/spark/pull/36995#discussion_r950515089


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -48,6 +48,11 @@ case class DataSourceV2Relation(
 
   import DataSourceV2Implicits._
 
+  lazy val funCatalog: Option[FunctionCatalog] = catalog.flatMap {

Review Comment:
   nit: If you want to, you can probably use `collect` instead of `flatMap` to get rid of the extra layer of option and one branch.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala:
##########
@@ -17,22 +17,33 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, TypeCoercion}
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, SortOrder, TransformExpression, V2ExpressionUtils}
 import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, RepartitionByExpression, Sort}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.FunctionCatalog
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction
 import org.apache.spark.sql.connector.distributions._
 import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 
 object DistributionAndOrderingUtils {
 
-  def prepareQuery(write: Write, query: LogicalPlan): LogicalPlan = write match {
+  def prepareQuery(

Review Comment:
   I think @sunchao brings a valid point that is easy to overlook. We have to make sure Spark writes to Hive tables in the same way no matter whether the v1 or v2 path is being used.
   
   Would it be correct to say we have this issue because `partitionIdExpression` in `HashPartitioning` is used both for generating bucket IDs in Hive tables as well as for producing partition IDs for writing tasks? Can we use different mechanisms?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala:
##########
@@ -143,4 +150,53 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
     case V2NullOrdering.NULLS_FIRST => NullsFirst
     case V2NullOrdering.NULLS_LAST => NullsLast
   }
+
+  def resolveV2ScalarFunction(

Review Comment:
   Question: do we need explicit `v2` in the method name given that the class name is `V2ExpressionUtils`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org