You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/02/14 01:35:14 UTC

[GitHub] [spark] dtenedor commented on a diff in pull request #39942: [SPARK-42398][SQL] Refine default column value framework

dtenedor commented on code in PR #39942:
URL: https://github.com/apache/spark/pull/39942#discussion_r1105182862


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import javax.annotation.Nullable;
+
+import org.apache.spark.sql.connector.expressions.Literal;
+import org.apache.spark.sql.internal.connector.ColumnImpl;
+import org.apache.spark.sql.types.DataType;
+
+public interface Column {

Review Comment:
   it could help to have a high level comment describing what this interface represents, and who is expected to implement it?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1429,4 +1435,97 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
       case _ =>
     }
   }
+
+  def checkDefaultValuesInPlan(plan: LogicalPlan, isForV1: Boolean = false): Unit = {

Review Comment:
   This helper could go in the `ResolveDefaultColumns` object in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1429,4 +1435,97 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
       case _ =>
     }
   }
+
+  def checkDefaultValuesInPlan(plan: LogicalPlan, isForV1: Boolean = false): Unit = {
+    // Do not check anything if the children are not resolved yet.
+    if (!plan.childrenResolved) return
+    plan match {
+      case AlterColumn(t: ResolvedTable, col: ResolvedFieldName, _, _, _, _,
+          Some(default: DefaultValueExpression)) =>
+        checkTableProvider(t.catalog, t.name, getTableProviderFromProp(t.table.properties()))
+        checkDefaultValue(default, t.name, col.name, col.field.dataType, isForV1)
+
+      case cmd: V2CreateTablePlan if cmd.columns.exists(_.defaultValue.isDefined) =>
+        val ident = cmd.name.asInstanceOf[ResolvedIdentifier]

Review Comment:
   should we bind this in the pattern match instead of using `asInstanceOf`? Same on L1459 below.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1429,4 +1435,97 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
       case _ =>
     }
   }
+
+  def checkDefaultValuesInPlan(plan: LogicalPlan, isForV1: Boolean = false): Unit = {
+    // Do not check anything if the children are not resolved yet.
+    if (!plan.childrenResolved) return

Review Comment:
   optional: instead of the `return`, we could just add a `case _ if !plan.childrenResolved => return` below. This might be generally safer in the vicinity of Catalyst rules which accept partial functions as arguments.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -133,32 +135,33 @@ private[sql] object CatalogV2Util {
    */
   def applySchemaChanges(
       schema: StructType,
-      changes: Seq[TableChange],
-      tableProvider: Option[String],
-      statementType: String): StructType = {
+      changes: Seq[TableChange]): StructType = {
     changes.foldLeft(schema) { (schema, change) =>
       change match {
         case add: AddColumn =>
           add.fieldNames match {
             case Array(name) =>
               val field = StructField(name, add.dataType, nullable = add.isNullable)
-              val fieldWithDefault: StructField =
-                Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+              val fieldWithDefault: StructField = Option(add.defaultValue).map { default =>
+                val defaultSQL = getDefaultValueSQL(default)
+                field.withCurrentDefaultValue(defaultSQL).withExistenceDefaultValue(defaultSQL)
+              }.getOrElse(field)
               val fieldWithComment: StructField =
                 Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault)
-              addField(schema, fieldWithComment, add.position(), tableProvider, statementType, true)
+              addField(schema, fieldWithComment, add.position())
             case names =>
               replace(schema, names.init, parent => parent.dataType match {
                 case parentType: StructType =>
                   val field = StructField(names.last, add.dataType, nullable = add.isNullable)
-                  val fieldWithDefault: StructField =
-                    Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+                  val fieldWithDefault: StructField = Option(add.defaultValue).map { default =>

Review Comment:
   this is duplicated with L145-148 above, should we dedup into one place?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -3977,26 +3954,31 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     }
   }
 
+  override def visitDefaultExpression(ctx: DefaultExpressionContext): Expression = withOrigin(ctx) {
+    expression(ctx.expression())
+  }
+
   /**
    * Parse new column info from ADD COLUMN into a QualifiedColType.
    */
   override def visitQualifiedColTypeWithPosition(
       ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) {
     val name = typedVisit[Seq[String]](ctx.name)
-    // Add the 'DEFAULT expression' clause in the column definition, if any, to the column metadata.
     val defaultExpr = Option(ctx.defaultExpression()).map(visitDefaultExpression)
     if (defaultExpr.isDefined && !conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
       throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
     }
     QualifiedColType(
-      path = if (name.length > 1) Some(UnresolvedFieldName(name.init)) else None,
-      colName = name.last,
-      dataType = typedVisit[DataType](ctx.dataType),
-      nullable = ctx.NULL == null,
-      comment = Option(ctx.commentSpec()).map(visitCommentSpec),
+      path = if (name.length > 1) UnresolvedFieldName(name.init) else RootTableSchema,

Review Comment:
   Sounds good, it is convenient if `QualifiedColType` is an expression.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -431,4 +436,45 @@ private[sql] object CatalogV2Util {
       .getOrElse(catalogManager.v2SessionCatalog)
       .asTableCatalog
   }
+
+  def v2ColumnsToStructType(columns: Array[Column]): StructType = {
+    StructType(columns.map(v2ColumnToStructField))
+  }
+
+  def v2ColumnToStructField(col: Column): StructField = {
+    val metadata = Option(col.metadataInJSON()).map(Metadata.fromJson).getOrElse(Metadata.empty)
+    var f = StructField(col.name(), col.dataType(), col.nullable(), metadata)
+    Option(col.comment()).foreach { comment =>
+      f = f.withComment(comment)
+    }
+    Option(col.defaultValue()).foreach { default =>
+      val defaultSQL = CatalogV2Util.getDefaultValueSQL(default)
+      f = f.withExistenceDefaultValue(defaultSQL).withCurrentDefaultValue(defaultSQL)
+    }
+    f
+  }
+
+  def structTypeToV2Columns(schema: StructType): Array[Column] = {
+    schema.fields.map { f =>
+      // TODO: remove it when our testing v2 tables no longer implements and replies on the
+      //       deprecated `Table.schema`.
+      val defaultValue = f.getCurrentDefaultValue().map { sql =>
+        val e = CatalystSqlParser.parseExpression(sql)
+        if (!e.resolved || !e.foldable) {
+          // This should not happen as we do validation before setting column default values, but
+          // in case something goes wrong, we treat it as no default value.
+          null

Review Comment:
   we should throw an exception here instead of returning NULL, since that would comprise an incorrect result.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -4077,19 +4061,18 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     } else {
       None
     }
-    val setDefaultExpression: Option[String] =
-      if (action.defaultExpression != null) {
-        Option(action.defaultExpression()).map(visitDefaultExpression)
-      } else if (action.dropDefault != null) {
-        Some("")
-      } else {
-        None
-      }
-    if (setDefaultExpression.isDefined && !conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
+    val defaultExpression: Option[Expression] = if (action.defaultExpression != null) {
+      Some(visitDefaultExpression(action.defaultExpression))
+    } else if (action.dropDefault != null) {
+      Some(DropDefaultColumnValue)

Review Comment:
   Yes, this is more explicit.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1429,4 +1435,97 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
       case _ =>
     }
   }
+
+  def checkDefaultValuesInPlan(plan: LogicalPlan, isForV1: Boolean = false): Unit = {
+    // Do not check anything if the children are not resolved yet.
+    if (!plan.childrenResolved) return
+    plan match {
+      case AlterColumn(t: ResolvedTable, col: ResolvedFieldName, _, _, _, _,
+          Some(default: DefaultValueExpression)) =>
+        checkTableProvider(t.catalog, t.name, getTableProviderFromProp(t.table.properties()))
+        checkDefaultValue(default, t.name, col.name, col.field.dataType, isForV1)
+
+      case cmd: V2CreateTablePlan if cmd.columns.exists(_.defaultValue.isDefined) =>
+        val ident = cmd.name.asInstanceOf[ResolvedIdentifier]
+        checkTableProvider(ident.catalog, ident.name, cmd.tableSpec.provider)
+        cmd.columns.filter(_.defaultValue.isDefined).foreach { col =>
+          val Column(name, dataType, _, _, Some(default), _) = col
+          // CREATE/REPLACE TABLE only has top-level columns
+          val colName = Seq(name)
+          checkDefaultValue(default, ident.name, colName, dataType, isForV1)
+        }
+
+      case cmd: AlterTableCommand =>
+        val table = cmd.table.asInstanceOf[ResolvedTable]
+        cmd.transformExpressionsDown {
+          case q @ QualifiedColType(path, Column(name, dataType, _, _, Some(default), _), _)
+              if path.resolved =>
+            checkTableProvider(
+              table.catalog, table.name, getTableProviderFromProp(table.table.properties()))
+            checkDefaultValue(
+              default,
+              table.name,
+              path.name :+ name,
+              dataType,
+              isForV1)
+            q
+        }
+
+      case _ =>
+    }
+  }
+
+  private def getTableProviderFromProp(props: JMap[String, String]): Option[String] = {
+    Option(props.get(TableCatalog.PROP_PROVIDER))
+  }
+
+  private def checkTableProvider(
+      catalog: CatalogPlugin,
+      tableName: String,
+      provider: Option[String]): Unit = {
+    // We only need to check table provider for the session catalog. Other custom v2 catalogs
+    // can check table providers in their implementations of createTable, alterTable, etc.
+    if (!CatalogV2Util.isSessionCatalog(catalog)) return
+    val conf = SQLConf.get
+    val allowedProviders: Array[String] = conf.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
+      .toLowerCase().split(",").map(_.trim)
+    if (!allowedProviders.contains(provider.getOrElse(conf.defaultDataSourceName).toLowerCase())) {
+      throw QueryCompilationErrors.defaultReferencesNotAllowedInDataSource(tableName)
+    }
+  }
+
+  private def checkDefaultValue(
+      default: DefaultValueExpression,
+      tblName: String,
+      colName: Seq[String],
+      targetType: DataType,
+      isForV1: Boolean): Unit = {
+    if (default.resolved) {
+      if (!default.child.foldable) {
+        throw QueryCompilationErrors.notConstantDefaultValueError(
+          tblName, colName, default.originalSQL)
+      } else if (!Cast.canUpCast(default.child.dataType, targetType)) {

Review Comment:
   no need for `else` since we throw an error in the `if` block above.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3091,12 +3091,11 @@ object SQLConf {
     buildConf("spark.sql.defaultColumn.allowedProviders")
       .internal()
       .doc("List of table providers wherein SQL commands are permitted to assign DEFAULT column " +
-        "values. Comma-separated list, whitespace ignored, case-insensitive. If an asterisk " +
-        "appears after any table provider in this list, any command may assign DEFAULT column " +
-        "except `ALTER TABLE ... ADD COLUMN`. Otherwise, if no asterisk appears, all commands " +

Review Comment:
   The argument makes sense in spirit, but at least one data source has already used this functionality of the default column providers which would be broken with this change. Let's talk offline.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1429,4 +1435,97 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
       case _ =>
     }
   }
+
+  def checkDefaultValuesInPlan(plan: LogicalPlan, isForV1: Boolean = false): Unit = {
+    // Do not check anything if the children are not resolved yet.
+    if (!plan.childrenResolved) return
+    plan match {
+      case AlterColumn(t: ResolvedTable, col: ResolvedFieldName, _, _, _, _,
+          Some(default: DefaultValueExpression)) =>
+        checkTableProvider(t.catalog, t.name, getTableProviderFromProp(t.table.properties()))
+        checkDefaultValue(default, t.name, col.name, col.field.dataType, isForV1)
+
+      case cmd: V2CreateTablePlan if cmd.columns.exists(_.defaultValue.isDefined) =>
+        val ident = cmd.name.asInstanceOf[ResolvedIdentifier]
+        checkTableProvider(ident.catalog, ident.name, cmd.tableSpec.provider)
+        cmd.columns.filter(_.defaultValue.isDefined).foreach { col =>
+          val Column(name, dataType, _, _, Some(default), _) = col
+          // CREATE/REPLACE TABLE only has top-level columns
+          val colName = Seq(name)
+          checkDefaultValue(default, ident.name, colName, dataType, isForV1)
+        }
+
+      case cmd: AlterTableCommand =>
+        val table = cmd.table.asInstanceOf[ResolvedTable]
+        cmd.transformExpressionsDown {
+          case q @ QualifiedColType(path, Column(name, dataType, _, _, Some(default), _), _)
+              if path.resolved =>
+            checkTableProvider(
+              table.catalog, table.name, getTableProviderFromProp(table.table.properties()))
+            checkDefaultValue(
+              default,
+              table.name,
+              path.name :+ name,
+              dataType,
+              isForV1)
+            q
+        }
+
+      case _ =>
+    }
+  }
+
+  private def getTableProviderFromProp(props: JMap[String, String]): Option[String] = {
+    Option(props.get(TableCatalog.PROP_PROVIDER))
+  }
+
+  private def checkTableProvider(
+      catalog: CatalogPlugin,
+      tableName: String,
+      provider: Option[String]): Unit = {
+    // We only need to check table provider for the session catalog. Other custom v2 catalogs
+    // can check table providers in their implementations of createTable, alterTable, etc.
+    if (!CatalogV2Util.isSessionCatalog(catalog)) return

Review Comment:
   same as above, we should probably reverse the logic and put the below in the `if` block body instead.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1429,4 +1435,97 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
       case _ =>
     }
   }
+
+  def checkDefaultValuesInPlan(plan: LogicalPlan, isForV1: Boolean = false): Unit = {
+    // Do not check anything if the children are not resolved yet.
+    if (!plan.childrenResolved) return
+    plan match {
+      case AlterColumn(t: ResolvedTable, col: ResolvedFieldName, _, _, _, _,
+          Some(default: DefaultValueExpression)) =>
+        checkTableProvider(t.catalog, t.name, getTableProviderFromProp(t.table.properties()))
+        checkDefaultValue(default, t.name, col.name, col.field.dataType, isForV1)
+
+      case cmd: V2CreateTablePlan if cmd.columns.exists(_.defaultValue.isDefined) =>
+        val ident = cmd.name.asInstanceOf[ResolvedIdentifier]
+        checkTableProvider(ident.catalog, ident.name, cmd.tableSpec.provider)
+        cmd.columns.filter(_.defaultValue.isDefined).foreach { col =>
+          val Column(name, dataType, _, _, Some(default), _) = col
+          // CREATE/REPLACE TABLE only has top-level columns
+          val colName = Seq(name)
+          checkDefaultValue(default, ident.name, colName, dataType, isForV1)
+        }
+
+      case cmd: AlterTableCommand =>
+        val table = cmd.table.asInstanceOf[ResolvedTable]
+        cmd.transformExpressionsDown {
+          case q @ QualifiedColType(path, Column(name, dataType, _, _, Some(default), _), _)
+              if path.resolved =>
+            checkTableProvider(
+              table.catalog, table.name, getTableProviderFromProp(table.table.properties()))
+            checkDefaultValue(
+              default,
+              table.name,
+              path.name :+ name,
+              dataType,
+              isForV1)
+            q
+        }
+
+      case _ =>
+    }
+  }
+
+  private def getTableProviderFromProp(props: JMap[String, String]): Option[String] = {
+    Option(props.get(TableCatalog.PROP_PROVIDER))
+  }
+
+  private def checkTableProvider(
+      catalog: CatalogPlugin,
+      tableName: String,
+      provider: Option[String]): Unit = {
+    // We only need to check table provider for the session catalog. Other custom v2 catalogs
+    // can check table providers in their implementations of createTable, alterTable, etc.
+    if (!CatalogV2Util.isSessionCatalog(catalog)) return
+    val conf = SQLConf.get
+    val allowedProviders: Array[String] = conf.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
+      .toLowerCase().split(",").map(_.trim)
+    if (!allowedProviders.contains(provider.getOrElse(conf.defaultDataSourceName).toLowerCase())) {
+      throw QueryCompilationErrors.defaultReferencesNotAllowedInDataSource(tableName)
+    }
+  }
+
+  private def checkDefaultValue(
+      default: DefaultValueExpression,
+      tblName: String,
+      colName: Seq[String],
+      targetType: DataType,
+      isForV1: Boolean): Unit = {
+    if (default.resolved) {
+      if (!default.child.foldable) {
+        throw QueryCompilationErrors.notConstantDefaultValueError(
+          tblName, colName, default.originalSQL)
+      } else if (!Cast.canUpCast(default.child.dataType, targetType)) {
+        throw QueryCompilationErrors.incompatibleTypeDefaultValueError(
+          tblName, colName, targetType, default.child, default.originalSQL)
+      } else {
+        // OK.
+      }
+    } else {
+      // Ideally we should let the rest of `CheckAnalysis` to report errors about why the default
+      // expression is unresolved. But we should report a better error here if the default
+      // expression references columns or contains subquery expressions, which means it's not a
+      // constant for sure.
+      if (default.references.nonEmpty || default.containsPattern(PLAN_EXPRESSION)) {

Review Comment:
   you can combine this with the `else` above as `else if`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1429,4 +1435,97 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
       case _ =>
     }
   }
+
+  def checkDefaultValuesInPlan(plan: LogicalPlan, isForV1: Boolean = false): Unit = {
+    // Do not check anything if the children are not resolved yet.
+    if (!plan.childrenResolved) return
+    plan match {
+      case AlterColumn(t: ResolvedTable, col: ResolvedFieldName, _, _, _, _,
+          Some(default: DefaultValueExpression)) =>
+        checkTableProvider(t.catalog, t.name, getTableProviderFromProp(t.table.properties()))
+        checkDefaultValue(default, t.name, col.name, col.field.dataType, isForV1)
+
+      case cmd: V2CreateTablePlan if cmd.columns.exists(_.defaultValue.isDefined) =>
+        val ident = cmd.name.asInstanceOf[ResolvedIdentifier]
+        checkTableProvider(ident.catalog, ident.name, cmd.tableSpec.provider)
+        cmd.columns.filter(_.defaultValue.isDefined).foreach { col =>
+          val Column(name, dataType, _, _, Some(default), _) = col
+          // CREATE/REPLACE TABLE only has top-level columns
+          val colName = Seq(name)
+          checkDefaultValue(default, ident.name, colName, dataType, isForV1)
+        }
+
+      case cmd: AlterTableCommand =>
+        val table = cmd.table.asInstanceOf[ResolvedTable]
+        cmd.transformExpressionsDown {
+          case q @ QualifiedColType(path, Column(name, dataType, _, _, Some(default), _), _)
+              if path.resolved =>
+            checkTableProvider(
+              table.catalog, table.name, getTableProviderFromProp(table.table.properties()))
+            checkDefaultValue(
+              default,
+              table.name,
+              path.name :+ name,
+              dataType,
+              isForV1)
+            q
+        }
+
+      case _ =>
+    }
+  }
+
+  private def getTableProviderFromProp(props: JMap[String, String]): Option[String] = {
+    Option(props.get(TableCatalog.PROP_PROVIDER))
+  }
+
+  private def checkTableProvider(
+      catalog: CatalogPlugin,
+      tableName: String,
+      provider: Option[String]): Unit = {
+    // We only need to check table provider for the session catalog. Other custom v2 catalogs
+    // can check table providers in their implementations of createTable, alterTable, etc.
+    if (!CatalogV2Util.isSessionCatalog(catalog)) return
+    val conf = SQLConf.get
+    val allowedProviders: Array[String] = conf.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
+      .toLowerCase().split(",").map(_.trim)
+    if (!allowedProviders.contains(provider.getOrElse(conf.defaultDataSourceName).toLowerCase())) {
+      throw QueryCompilationErrors.defaultReferencesNotAllowedInDataSource(tableName)
+    }
+  }
+
+  private def checkDefaultValue(
+      default: DefaultValueExpression,
+      tblName: String,
+      colName: Seq[String],
+      targetType: DataType,
+      isForV1: Boolean): Unit = {
+    if (default.resolved) {
+      if (!default.child.foldable) {
+        throw QueryCompilationErrors.notConstantDefaultValueError(
+          tblName, colName, default.originalSQL)
+      } else if (!Cast.canUpCast(default.child.dataType, targetType)) {
+        throw QueryCompilationErrors.incompatibleTypeDefaultValueError(
+          tblName, colName, targetType, default.child, default.originalSQL)
+      } else {

Review Comment:
   same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org