You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/03 06:34:37 UTC

[GitHub] [spark] huaxingao opened a new pull request, #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

huaxingao opened a new pull request, #37393:
URL: https://github.com/apache/spark/pull/37393

   
   
   ### What changes were proposed in this pull request?
   Migrate SupportsDelete to use V2 Filter
   
   
   ### Why are the changes needed?
   this is part of the V2Filter migration work
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes
   add `SupportsDeleteV2`
   
   
   ### How was this patch tested?
   new tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942670631


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,16 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {

Review Comment:
   NVM, I will change to this way, and check `if (v1Filters.length < predicates.length)` and throw Exception in `SupportsOverwrite.overwrite`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937383095


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();

Review Comment:
   `new ArrayList<>()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937269735


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDeleteV2.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+
+/**
+ * A mix-in interface for {@link Table} delete support. Data sources can implement this
+ * interface to provide the ability to delete data from tables that matches filter expressions.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface SupportsDeleteV2 extends TruncatableTable {
+
+  /**
+   * Checks whether it is possible to delete data from a data source table that matches filter
+   * expressions.
+   * <p>
+   * Rows should be deleted from the data source iff all of the filter expressions match.
+   * That is, the expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
+   * would reject the delete operation because it requires significant effort. If this method
+   * returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
+   * the delete operation and produce row-level changes if the data source table supports deleting
+   * individual records.
+   *
+   * @param predicates V2 filter expressions, used to select rows to delete when all expressions
+   *                  match
+   * @return true if the delete operation can be performed
+   *
+   * @since 3.4.0
+   */
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    return true;
+  }
+
+  /**
+   * Delete data from a data source table that matches filter expressions. Note that this method
+   * will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
+   * <p>
+   * Rows are deleted from the data source iff all of the filter expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Implementations may reject a delete operation if the delete isn't possible without significant
+   * effort. For example, partitioned data sources may reject deletes that do not filter by
+   * partition columns because the filter may require rewriting files without deleted records.
+   * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   *
+   * @param predicates predicate expressions, used to select rows to delete when all expressions
+   *                  match
+   * @throws IllegalArgumentException If the delete is rejected due to required effort
+   */
+  void deleteWhere(Predicate[] predicates);
+
+  @Override
+  default boolean truncateTable() {
+    Predicate[] filters = new Predicate[] { new AlwaysTrue() };

Review Comment:
   I think we not need  `new AlwaysTrue()`.
   So we need adjust `canDeleteWhere` of `InMemoryTable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942055045


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,18 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {
+    predicates.flatMap { predicate =>
+      val filter = toV1(predicate)
+      if (filter.isEmpty) {
+        if (!skipIfNotConvertible) {
+          throw QueryCompilationErrors.unsupportedPredicateToFilterConversionError(predicate.name())
+        }
+      }
+      filter
+    }.toArray

Review Comment:
   Yes, I double checked that Scala 2.13 doesn't need this `toArray`.  Moreover, the redundant `toArray` will make the performance worse when using Scala 2.13 due  to `ArrayOps.toArray` will always do a memory copy even if the item Type has not changed , but Scala 2.12 will directly return the original array when the item Type is unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r941553431


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,18 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {
+    predicates.flatMap { predicate =>
+      val filter = toV1(predicate)
+      if (filter.isEmpty) {
+        if (!skipIfNotConvertible) {

Review Comment:
   should we merge these two if as `if (filter.isEmpty && !skipIfNotConvertible)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942669719


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,16 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {

Review Comment:
   shall we add `canOverwrite`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r941881351


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,18 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {
+    predicates.flatMap { predicate =>
+      val filter = toV1(predicate)
+      if (filter.isEmpty) {
+        if (!skipIfNotConvertible) {
+          throw QueryCompilationErrors.unsupportedPredicateToFilterConversionError(predicate.name())
+        }
+      }
+      filter
+    }.toArray

Review Comment:
   I thought Scala 2.13 needs this `toArray` but it turned out not to be the case. I have it removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r938441639


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();
+    for (int i = 0; i < predicates.length; i++) {

Review Comment:
   Thanks for the comments. I added a toV1 method that takes Array of Filters in the Util class to eliminate the duplicated code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r940968890


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala:
##########
@@ -30,7 +30,17 @@ class InMemoryTableWithV2Filter(
     schema: StructType,
     partitioning: Array[Transform],
     properties: util.Map[String, String])
-  extends InMemoryTable(name, schema, partitioning, properties) {
+  extends InMemoryBaseTable(name, schema, partitioning, properties) with SupportsDeleteV2 {
+
+  override def canDeleteWhere(filters: Array[Predicate]): Boolean = {
+    InMemoryTableWithV2Filter.supportsFilters(filters)
+  }
+
+  override def deleteWhere(filters: Array[Predicate]): Unit = dataMap.synchronized {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+    dataMap --= InMemoryTableWithV2Filter
+      .filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted), filters)

Review Comment:
   Importing the class vs. explicitly specifying the class doesn't seem to make much difference to me. I prefer to keep this as is. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942055045


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,18 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {
+    predicates.flatMap { predicate =>
+      val filter = toV1(predicate)
+      if (filter.isEmpty) {
+        if (!skipIfNotConvertible) {
+          throw QueryCompilationErrors.unsupportedPredicateToFilterConversionError(predicate.name())
+        }
+      }
+      filter
+    }.toArray

Review Comment:
   Yes, I double checked that Scala 2.13 doesn't need this `toArray`.  Moreover, the redundant `toArray` will make the performance worse when using Scala 2.13 due `ArrayOps.toArray` will always do a memory copy even if the item Type has not changed , but Scala 2.12 will directly return the original array when the item Type is unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r941252008


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,18 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {
+    predicates.flatMap { predicate =>
+      val filter = toV1(predicate)
+      if (filter.isEmpty) {
+        if (!skipIfNotConvertible) {
+          throw QueryCompilationErrors.unsupportedPredicateToFilterConversionError(predicate.name())
+        }
+      }
+      filter
+    }.toArray

Review Comment:
   This `.toArray` seems a little redundant, or is it for Scala 2.13 compatibility?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937401762


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();
+    for (int i = 0; i < predicates.length; i++) {
+      Option filter = PredicateUtils.toV1(predicates[i]);
+      if (filter.nonEmpty()) {
+        filterList.add((Filter)filter.get());
+      }
+    }
+
+    Filter[] filters = new Filter[filterList.size()];
+    filterList.toArray(filters);
+    return this.canDeleteWhere(filters);
+  }
+
+  default void deleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();

Review Comment:
   line 95 ~ 104 looks like duplicated in `canDeleteWhere` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937396523


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();
+    for (int i = 0; i < predicates.length; i++) {

Review Comment:
   > I suggest using Java 8 lambda expression.
   
   +1, If there is no strong performance demand due to `Arrays.stream` is much slower than the current implementation. I have tested it before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942633040


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,16 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {

Review Comment:
   thinking about this more, I think we can just have
   ```
   def toV1(predicates: Array[Predicate]): Array[Filter] = ...
   ```
   In `SupportsDelete`, we can check the number of returned v1 filter
   ```
     default boolean canDeleteWhere(Predicate[] predicates) {
       Filter[] v1Filters = PredicateUtils.toV1(predicates);
       if (v1Filters.length < predicates) return false;
       return this.canDeleteWhere(v1Filters);
     }
   
   
     default void deleteWhere(Predicate[] predicates) {
       this.deleteWhere(PredicateUtils.toV1(predicates));
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942676428


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,16 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {

Review Comment:
   Replied without seeing your previous message.
   
   Will add `canOverwrite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937269735


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDeleteV2.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+
+/**
+ * A mix-in interface for {@link Table} delete support. Data sources can implement this
+ * interface to provide the ability to delete data from tables that matches filter expressions.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface SupportsDeleteV2 extends TruncatableTable {
+
+  /**
+   * Checks whether it is possible to delete data from a data source table that matches filter
+   * expressions.
+   * <p>
+   * Rows should be deleted from the data source iff all of the filter expressions match.
+   * That is, the expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
+   * would reject the delete operation because it requires significant effort. If this method
+   * returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
+   * the delete operation and produce row-level changes if the data source table supports deleting
+   * individual records.
+   *
+   * @param predicates V2 filter expressions, used to select rows to delete when all expressions
+   *                  match
+   * @return true if the delete operation can be performed
+   *
+   * @since 3.4.0
+   */
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    return true;
+  }
+
+  /**
+   * Delete data from a data source table that matches filter expressions. Note that this method
+   * will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
+   * <p>
+   * Rows are deleted from the data source iff all of the filter expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Implementations may reject a delete operation if the delete isn't possible without significant
+   * effort. For example, partitioned data sources may reject deletes that do not filter by
+   * partition columns because the filter may require rewriting files without deleted records.
+   * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   *
+   * @param predicates predicate expressions, used to select rows to delete when all expressions
+   *                  match
+   * @throws IllegalArgumentException If the delete is rejected due to required effort
+   */
+  void deleteWhere(Predicate[] predicates);
+
+  @Override
+  default boolean truncateTable() {
+    Predicate[] filters = new Predicate[] { new AlwaysTrue() };

Review Comment:
   I think `new AlwaysTrue()` is no need.
   So we need adjust `canDeleteWhere` of `InMemoryTableWithV2Filter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #37393:
URL: https://github.com/apache/spark/pull/37393#issuecomment-1204679988

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937307297


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDeleteV2.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+
+/**
+ * A mix-in interface for {@link Table} delete support. Data sources can implement this
+ * interface to provide the ability to delete data from tables that matches filter expressions.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface SupportsDeleteV2 extends TruncatableTable {
+
+  /**
+   * Checks whether it is possible to delete data from a data source table that matches filter
+   * expressions.
+   * <p>
+   * Rows should be deleted from the data source iff all of the filter expressions match.
+   * That is, the expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
+   * would reject the delete operation because it requires significant effort. If this method
+   * returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
+   * the delete operation and produce row-level changes if the data source table supports deleting
+   * individual records.
+   *
+   * @param predicates V2 filter expressions, used to select rows to delete when all expressions
+   *                  match
+   * @return true if the delete operation can be performed
+   *
+   * @since 3.4.0
+   */
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    return true;
+  }
+
+  /**
+   * Delete data from a data source table that matches filter expressions. Note that this method
+   * will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
+   * <p>
+   * Rows are deleted from the data source iff all of the filter expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Implementations may reject a delete operation if the delete isn't possible without significant
+   * effort. For example, partitioned data sources may reject deletes that do not filter by
+   * partition columns because the filter may require rewriting files without deleted records.
+   * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   *
+   * @param predicates predicate expressions, used to select rows to delete when all expressions
+   *                  match
+   * @throws IllegalArgumentException If the delete is rejected due to required effort
+   */
+  void deleteWhere(Predicate[] predicates);
+
+  @Override
+  default boolean truncateTable() {
+    Predicate[] filters = new Predicate[] { new AlwaysTrue() };

Review Comment:
   I know it. It seems we can simplify them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937426216


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.time.{Instant, ZoneId}
+import java.time.temporal.ChronoUnit
+import java.util
+import java.util.OptionalLong
+
+import scala.collection.mutable
+
+import org.scalatest.Assertions._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
+import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
+import org.apache.spark.sql.connector.expressions._
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A simple in-memory table. Rows are stored as a buffered group produced by each output task.
+ */
+class InMemoryBaseTable(
+    val name: String,
+    val schema: StructType,
+    override val partitioning: Array[Transform],
+    override val properties: util.Map[String, String],
+    val distribution: Distribution = Distributions.unspecified(),
+    val ordering: Array[SortOrder] = Array.empty,
+    val numPartitions: Option[Int] = None,
+    val isDistributionStrictlyRequired: Boolean = true)
+  extends Table with SupportsRead with SupportsWrite with SupportsMetadataColumns {
+
+  protected object PartitionKeyColumn extends MetadataColumn {
+    override def name: String = "_partition"
+    override def dataType: DataType = StringType
+    override def comment: String = "Partition key used to store the row"
+  }
+
+  private object IndexColumn extends MetadataColumn {
+    override def name: String = "index"
+    override def dataType: DataType = IntegerType
+    override def comment: String = "Metadata column used to conflict with a data column"
+  }
+
+  // purposely exposes a metadata column that conflicts with a data column in some tests
+  override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
+  private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)
+
+  private val allowUnsupportedTransforms =
+    properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
+
+  partitioning.foreach {
+    case _: IdentityTransform =>
+    case _: YearsTransform =>
+    case _: MonthsTransform =>
+    case _: DaysTransform =>
+    case _: HoursTransform =>
+    case _: BucketTransform =>
+    case _: SortedBucketTransform =>
+    case t if !allowUnsupportedTransforms =>
+      throw new IllegalArgumentException(s"Transform $t is not a supported transform")
+  }
+
+  // The key `Seq[Any]` is the partition values.
+  val dataMap: mutable.Map[Seq[Any], BufferedRows] = mutable.Map.empty
+
+  def data: Array[BufferedRows] = dataMap.values.toArray
+
+  def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
+
+  val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref =>
+    schema.findNestedField(ref.fieldNames(), includeCollections = false) match {
+      case Some(_) => ref.fieldNames()
+      case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.")
+    }
+  }
+
+  private val UTC = ZoneId.of("UTC")
+  private val EPOCH_LOCAL_DATE = Instant.EPOCH.atZone(UTC).toLocalDate
+
+  protected def getKey(row: InternalRow): Seq[Any] = {
+    getKey(row, schema)
+  }
+
+  protected def getKey(row: InternalRow, rowSchema: StructType): Seq[Any] = {
+    @scala.annotation.tailrec
+    def extractor(
+        fieldNames: Array[String],
+        schema: StructType,
+        row: InternalRow): (Any, DataType) = {
+      val index = schema.fieldIndex(fieldNames(0))
+      val value = row.toSeq(schema).apply(index)
+      if (fieldNames.length > 1) {
+        (value, schema(index).dataType) match {
+          case (row: InternalRow, nestedSchema: StructType) =>
+            extractor(fieldNames.drop(1), nestedSchema, row)
+          case (_, dataType) =>
+            throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}")
+        }
+      } else {
+        (value, schema(index).dataType)
+      }
+    }
+
+    val cleanedSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(rowSchema)
+    partitioning.map {
+      case IdentityTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row)._1
+      case YearsTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days: Int, DateType) =>
+            ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days))
+          case (micros: Long, TimestampType) =>
+            val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate
+            ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate)
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case MonthsTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days: Int, DateType) =>
+            ChronoUnit.MONTHS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days))
+          case (micros: Long, TimestampType) =>
+            val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate
+            ChronoUnit.MONTHS.between(EPOCH_LOCAL_DATE, localDate)
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case DaysTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days, DateType) =>
+            days
+          case (micros: Long, TimestampType) =>
+            ChronoUnit.DAYS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros))
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case HoursTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (micros: Long, TimestampType) =>
+            ChronoUnit.HOURS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros))
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case BucketTransform(numBuckets, cols, _) =>
+        val valueTypePairs = cols.map(col => extractor(col.fieldNames, cleanedSchema, row))
+        var valueHashCode = 0
+        valueTypePairs.foreach( pair =>
+          if ( pair._1 != null) valueHashCode += pair._1.hashCode()
+        )
+        var dataTypeHashCode = 0
+        valueTypePairs.foreach(dataTypeHashCode += _._2.hashCode())
+        ((valueHashCode + 31 * dataTypeHashCode) & Integer.MAX_VALUE) % numBuckets
+    }
+  }
+
+  protected def addPartitionKey(key: Seq[Any]): Unit = {}
+
+  protected def renamePartitionKey(
+      partitionSchema: StructType,
+      from: Seq[Any],
+      to: Seq[Any]): Boolean = {
+    val rows = dataMap.remove(from).getOrElse(new BufferedRows(from))
+    val newRows = new BufferedRows(to)
+    rows.rows.foreach { r =>
+      val newRow = new GenericInternalRow(r.numFields)
+      for (i <- 0 until r.numFields) newRow.update(i, r.get(i, schema(i).dataType))
+      for (i <- 0 until partitionSchema.length) {
+        val j = schema.fieldIndex(partitionSchema(i).name)
+        newRow.update(j, to(i))
+      }
+      newRows.withRow(newRow)
+    }
+    dataMap.put(to, newRows).foreach { _ =>
+      throw new IllegalStateException(
+        s"The ${to.mkString("[", ", ", "]")} partition exists already")
+    }
+    true
+  }
+
+  protected def removePartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    dataMap.remove(key)
+  }
+
+  protected def createPartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    if (!dataMap.contains(key)) {
+      val emptyRows = new BufferedRows(key)
+      val rows = if (key.length == schema.length) {
+        emptyRows.withRow(InternalRow.fromSeq(key))
+      } else emptyRows
+      dataMap.put(key, rows)
+    }
+  }
+
+  protected def clearPartition(key: Seq[Any]): Unit = dataMap.synchronized {
+    assert(dataMap.contains(key))
+    dataMap(key).clear()
+  }
+
+  def withData(data: Array[BufferedRows]): InMemoryBaseTable = {
+    withData(data, schema)
+  }
+
+  def withData(
+      data: Array[BufferedRows],
+      writeSchema: StructType): InMemoryBaseTable = dataMap.synchronized {
+    data.foreach(_.rows.foreach { row =>
+      val key = getKey(row, writeSchema)
+      dataMap += dataMap.get(key)
+        .map(key -> _.withRow(row))
+        .getOrElse(key -> new BufferedRows(key).withRow(row))
+      addPartitionKey(key)
+    })
+    this
+  }
+
+  override def capabilities: util.Set[TableCapability] = util.EnumSet.of(
+    TableCapability.BATCH_READ,
+    TableCapability.BATCH_WRITE,
+    TableCapability.STREAMING_WRITE,
+    TableCapability.OVERWRITE_BY_FILTER,
+    TableCapability.OVERWRITE_DYNAMIC,
+    TableCapability.TRUNCATE)
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    new InMemoryScanBuilder(schema)
+  }
+
+  class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
+      with SupportsPushDownRequiredColumns {
+    private var schema: StructType = tableSchema
+
+    override def build: Scan =
+      new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)

Review Comment:
   redundant `new`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942657907


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,16 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {

Review Comment:
   I actually have thought about this, this works OK for `SupportsDelete`, but we will have to throw Exception later on for `SupportsOverwrite`, because we don't have an API `canOverwrite`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937393530


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();
+    for (int i = 0; i < predicates.length; i++) {

Review Comment:
   I suggest using Java 8 lambda expression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r939767014


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDeleteV2.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+
+/**
+ * A mix-in interface for {@link Table} delete support. Data sources can implement this
+ * interface to provide the ability to delete data from tables that matches filter expressions.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface SupportsDeleteV2 extends TruncatableTable {
+
+  /**
+   * Checks whether it is possible to delete data from a data source table that matches filter
+   * expressions.
+   * <p>
+   * Rows should be deleted from the data source iff all of the filter expressions match.
+   * That is, the expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
+   * would reject the delete operation because it requires significant effort. If this method
+   * returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
+   * the delete operation and produce row-level changes if the data source table supports deleting
+   * individual records.
+   *
+   * @param predicates V2 filter expressions, used to select rows to delete when all expressions
+   *                  match
+   * @return true if the delete operation can be performed
+   *
+   * @since 3.4.0
+   */
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    return true;
+  }
+
+  /**
+   * Delete data from a data source table that matches filter expressions. Note that this method
+   * will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
+   * <p>
+   * Rows are deleted from the data source iff all of the filter expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Implementations may reject a delete operation if the delete isn't possible without significant
+   * effort. For example, partitioned data sources may reject deletes that do not filter by
+   * partition columns because the filter may require rewriting files without deleted records.
+   * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   *
+   * @param predicates predicate expressions, used to select rows to delete when all expressions
+   *                  match
+   * @throws IllegalArgumentException If the delete is rejected due to required effort
+   */
+  void deleteWhere(Predicate[] predicates);
+
+  @Override
+  default boolean truncateTable() {
+    Predicate[] filters = new Predicate[] { new AlwaysTrue() };
+    boolean canDelete = canDeleteWhere(filters);
+    if (canDelete) {
+      deleteWhere(filters);

Review Comment:
   ```suggestion
       Predicate[] predicates = new Predicate[] { new AlwaysTrue() };
       boolean canDelete = canDeleteWhere(predicates);
       if (canDelete) {
         deleteWhere(predicates);
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -20,41 +20,28 @@ package org.apache.spark.sql.internal.connector
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
 import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate}
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
 import org.apache.spark.sql.types.StringType
 
 private[sql] object PredicateUtils {
 
   def toV1(predicate: Predicate): Option[Filter] = {
 
-    def isValidBinaryPredicate(): Boolean = {
-      if (predicate.children().length == 2 &&
-        predicate.children()(0).isInstanceOf[NamedReference] &&
-        predicate.children()(1).isInstanceOf[LiteralValue[_]]) {
-        true
-      } else {
-        false
-      }
-    }
-
     predicate.name() match {
-      case "IN" if predicate.children()(0).isInstanceOf[NamedReference] =>
+      case "IN" if isValidPredicate(predicate) =>
         val attribute = predicate.children()(0).toString
         val values = predicate.children().drop(1)
         if (values.length > 0) {
-          if (!values.forall(_.isInstanceOf[LiteralValue[_]])) return None
           val dataType = values(0).asInstanceOf[LiteralValue[_]].dataType
-          if (!values.forall(_.asInstanceOf[LiteralValue[_]].dataType.sameType(dataType))) {
-            return None
-          }
           val inValues = values.map(v =>
             CatalystTypeConverters.convertToScala(v.asInstanceOf[LiteralValue[_]].value, dataType))
           Some(In(attribute, inValues))
         } else {
           Some(In(attribute, Array.empty[Any]))
         }
 
-      case "=" | "<=>" | ">" | "<" | ">=" | "<=" if isValidBinaryPredicate =>

Review Comment:
   It seems the origin code looks better. The new refactor have duplicate match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942083085


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.time.{Instant, ZoneId}
+import java.time.temporal.ChronoUnit
+import java.util
+import java.util.OptionalLong
+
+import scala.collection.mutable
+
+import org.scalatest.Assertions._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
+import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
+import org.apache.spark.sql.connector.expressions._
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A simple in-memory table. Rows are stored as a buffered group produced by each output task.
+ */
+class InMemoryBaseTable(
+    val name: String,
+    val schema: StructType,
+    override val partitioning: Array[Transform],
+    override val properties: util.Map[String, String],
+    val distribution: Distribution = Distributions.unspecified(),
+    val ordering: Array[SortOrder] = Array.empty,
+    val numPartitions: Option[Int] = None,
+    val isDistributionStrictlyRequired: Boolean = true)
+  extends Table with SupportsRead with SupportsWrite with SupportsMetadataColumns {
+
+  protected object PartitionKeyColumn extends MetadataColumn {
+    override def name: String = "_partition"
+    override def dataType: DataType = StringType
+    override def comment: String = "Partition key used to store the row"
+  }
+
+  private object IndexColumn extends MetadataColumn {
+    override def name: String = "index"
+    override def dataType: DataType = IntegerType
+    override def comment: String = "Metadata column used to conflict with a data column"
+  }
+
+  // purposely exposes a metadata column that conflicts with a data column in some tests
+  override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
+  private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)
+
+  private val allowUnsupportedTransforms =
+    properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
+
+  partitioning.foreach {
+    case _: IdentityTransform =>
+    case _: YearsTransform =>
+    case _: MonthsTransform =>
+    case _: DaysTransform =>
+    case _: HoursTransform =>
+    case _: BucketTransform =>
+    case _: SortedBucketTransform =>
+    case t if !allowUnsupportedTransforms =>
+      throw new IllegalArgumentException(s"Transform $t is not a supported transform")
+  }
+
+  // The key `Seq[Any]` is the partition values.
+  val dataMap: mutable.Map[Seq[Any], BufferedRows] = mutable.Map.empty
+
+  def data: Array[BufferedRows] = dataMap.values.toArray
+
+  def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
+
+  val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref =>
+    schema.findNestedField(ref.fieldNames(), includeCollections = false) match {
+      case Some(_) => ref.fieldNames()
+      case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.")
+    }
+  }
+
+  private val UTC = ZoneId.of("UTC")
+  private val EPOCH_LOCAL_DATE = Instant.EPOCH.atZone(UTC).toLocalDate
+
+  protected def getKey(row: InternalRow): Seq[Any] = {
+    getKey(row, schema)
+  }
+
+  protected def getKey(row: InternalRow, rowSchema: StructType): Seq[Any] = {
+    @scala.annotation.tailrec
+    def extractor(
+        fieldNames: Array[String],
+        schema: StructType,
+        row: InternalRow): (Any, DataType) = {
+      val index = schema.fieldIndex(fieldNames(0))
+      val value = row.toSeq(schema).apply(index)
+      if (fieldNames.length > 1) {
+        (value, schema(index).dataType) match {
+          case (row: InternalRow, nestedSchema: StructType) =>
+            extractor(fieldNames.drop(1), nestedSchema, row)
+          case (_, dataType) =>
+            throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}")
+        }
+      } else {
+        (value, schema(index).dataType)
+      }
+    }
+
+    val cleanedSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(rowSchema)
+    partitioning.map {
+      case IdentityTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row)._1
+      case YearsTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days: Int, DateType) =>
+            ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days))
+          case (micros: Long, TimestampType) =>
+            val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate
+            ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate)
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case MonthsTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days: Int, DateType) =>
+            ChronoUnit.MONTHS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days))
+          case (micros: Long, TimestampType) =>
+            val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate
+            ChronoUnit.MONTHS.between(EPOCH_LOCAL_DATE, localDate)
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case DaysTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days, DateType) =>
+            days
+          case (micros: Long, TimestampType) =>
+            ChronoUnit.DAYS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros))
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case HoursTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (micros: Long, TimestampType) =>
+            ChronoUnit.HOURS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros))
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case BucketTransform(numBuckets, cols, _) =>
+        val valueTypePairs = cols.map(col => extractor(col.fieldNames, cleanedSchema, row))
+        var valueHashCode = 0
+        valueTypePairs.foreach( pair =>
+          if ( pair._1 != null) valueHashCode += pair._1.hashCode()
+        )
+        var dataTypeHashCode = 0
+        valueTypePairs.foreach(dataTypeHashCode += _._2.hashCode())
+        ((valueHashCode + 31 * dataTypeHashCode) & Integer.MAX_VALUE) % numBuckets
+    }
+  }
+
+  protected def addPartitionKey(key: Seq[Any]): Unit = {}
+
+  protected def renamePartitionKey(
+      partitionSchema: StructType,
+      from: Seq[Any],
+      to: Seq[Any]): Boolean = {
+    val rows = dataMap.remove(from).getOrElse(new BufferedRows(from))
+    val newRows = new BufferedRows(to)
+    rows.rows.foreach { r =>
+      val newRow = new GenericInternalRow(r.numFields)
+      for (i <- 0 until r.numFields) newRow.update(i, r.get(i, schema(i).dataType))
+      for (i <- 0 until partitionSchema.length) {
+        val j = schema.fieldIndex(partitionSchema(i).name)
+        newRow.update(j, to(i))
+      }
+      newRows.withRow(newRow)
+    }
+    dataMap.put(to, newRows).foreach { _ =>
+      throw new IllegalStateException(
+        s"The ${to.mkString("[", ", ", "]")} partition exists already")
+    }
+    true
+  }
+
+  protected def removePartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    dataMap.remove(key)
+  }
+
+  protected def createPartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    if (!dataMap.contains(key)) {
+      val emptyRows = new BufferedRows(key)
+      val rows = if (key.length == schema.length) {
+        emptyRows.withRow(InternalRow.fromSeq(key))
+      } else emptyRows
+      dataMap.put(key, rows)
+    }
+  }
+
+  protected def clearPartition(key: Seq[Any]): Unit = dataMap.synchronized {
+    assert(dataMap.contains(key))
+    dataMap(key).clear()
+  }
+
+  def withData(data: Array[BufferedRows]): InMemoryBaseTable = {
+    withData(data, schema)
+  }
+
+  def withData(
+      data: Array[BufferedRows],
+      writeSchema: StructType): InMemoryBaseTable = dataMap.synchronized {
+    data.foreach(_.rows.foreach { row =>
+      val key = getKey(row, writeSchema)
+      dataMap += dataMap.get(key)
+        .map(key -> _.withRow(row))
+        .getOrElse(key -> new BufferedRows(key).withRow(row))
+      addPartitionKey(key)
+    })
+    this
+  }
+
+  override def capabilities: util.Set[TableCapability] = util.EnumSet.of(
+    TableCapability.BATCH_READ,
+    TableCapability.BATCH_WRITE,
+    TableCapability.STREAMING_WRITE,
+    TableCapability.OVERWRITE_BY_FILTER,
+    TableCapability.OVERWRITE_DYNAMIC,
+    TableCapability.TRUNCATE)
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    new InMemoryScanBuilder(schema)
+  }
+
+  class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
+      with SupportsPushDownRequiredColumns {
+    private var schema: StructType = tableSchema
+
+    override def build: Scan =
+      InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
+
+    override def pruneColumns(requiredSchema: StructType): Unit = {
+      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
+      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+    }
+  }
+
+  case class InMemoryStats(sizeInBytes: OptionalLong, numRows: OptionalLong) extends Statistics
+
+  abstract class BatchScanBaseClass(
+      var data: Seq[InputPartition],
+      readSchema: StructType,
+      tableSchema: StructType)
+    extends Scan with Batch with SupportsReportStatistics with SupportsReportPartitioning {
+
+    override def toBatch: Batch = this
+
+    override def estimateStatistics(): Statistics = {
+      if (data.isEmpty) {
+        return InMemoryStats(OptionalLong.of(0L), OptionalLong.of(0L))
+      }
+
+      val inputPartitions = data.map(_.asInstanceOf[BufferedRows])
+      val numRows = inputPartitions.map(_.rows.size).sum
+      // we assume an average object header is 12 bytes
+      val objectHeaderSizeInBytes = 12L
+      val rowSizeInBytes = objectHeaderSizeInBytes + schema.defaultSize
+      val sizeInBytes = numRows * rowSizeInBytes
+      InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows))
+    }
+
+    override def outputPartitioning(): Partitioning = {
+      if (InMemoryBaseTable.this.partitioning.nonEmpty) {
+        new KeyGroupedPartitioning(
+          InMemoryBaseTable.this.partitioning.map(_.asInstanceOf[Expression]),
+          data.size)
+      } else {
+        new UnknownPartitioning(data.size)
+      }
+    }
+
+    override def planInputPartitions(): Array[InputPartition] = data.toArray
+
+    override def createReaderFactory(): PartitionReaderFactory = {
+      val metadataColumns = readSchema.map(_.name).filter(metadataColumnNames.contains)
+      val nonMetadataColumns = readSchema.filterNot(f => metadataColumns.contains(f.name))
+      new BufferedRowsReaderFactory(metadataColumns, nonMetadataColumns, tableSchema)
+    }
+  }
+
+  case class InMemoryBatchScan(
+      var _data: Seq[InputPartition],
+      readSchema: StructType,
+      tableSchema: StructType)
+    extends BatchScanBaseClass(_data, readSchema, tableSchema) with SupportsRuntimeFiltering {
+
+    override def filterAttributes(): Array[NamedReference] = {
+      val scanFields = readSchema.fields.map(_.name).toSet
+      partitioning.flatMap(_.references)
+        .filter(ref => scanFields.contains(ref.fieldNames.mkString(".")))
+    }
+
+    override def filter(filters: Array[Filter]): Unit = {
+      if (partitioning.length == 1 && partitioning.head.references().length == 1) {
+        val ref = partitioning.head.references().head
+        filters.foreach {
+          case In(attrName, values) if attrName == ref.toString =>
+            val matchingKeys = values.map(_.toString).toSet
+            data = data.filter(partition => {
+              val key = partition.asInstanceOf[BufferedRows].keyString
+              matchingKeys.contains(key)
+            })
+
+          case _ => // skip
+        }
+      }
+    }
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    InMemoryBaseTable.maybeSimulateFailedTableWrite(new CaseInsensitiveStringMap(properties))
+    InMemoryBaseTable.maybeSimulateFailedTableWrite(info.options)
+
+    new WriteBuilder with SupportsTruncate with SupportsOverwrite
+      with SupportsDynamicOverwrite with SupportsStreamingUpdateAsAppend {
+
+      private var writer: BatchWrite = Append
+      private var streamingWriter: StreamingWrite = StreamingAppend
+
+      override def truncate(): WriteBuilder = {
+        assert(writer == Append)
+        writer = TruncateAndAppend
+        streamingWriter = StreamingTruncateAndAppend
+        this
+      }
+
+      override def overwrite(filters: Array[Filter]): WriteBuilder = {
+        assert(writer == Append)
+        writer = new Overwrite(filters)
+        streamingWriter = new StreamingNotSupportedOperation(
+          s"overwrite (${filters.mkString("filters(", ", ", ")")})")
+        this
+      }
+
+      override def overwriteDynamicPartitions(): WriteBuilder = {
+        assert(writer == Append)
+        writer = DynamicOverwrite
+        streamingWriter = new StreamingNotSupportedOperation("overwriteDynamicPartitions")
+        this
+      }
+
+      override def build(): Write = new Write with RequiresDistributionAndOrdering {
+        override def requiredDistribution: Distribution = distribution
+
+        override def distributionStrictlyRequired: Boolean = isDistributionStrictlyRequired
+
+        override def requiredOrdering: Array[SortOrder] = ordering
+
+        override def requiredNumPartitions(): Int = {
+          numPartitions.getOrElse(0)
+        }
+
+        override def toBatch: BatchWrite = writer
+
+        override def toStreaming: StreamingWrite = streamingWriter match {
+          case exc: StreamingNotSupportedOperation => exc.throwsException()
+          case s => s
+        }
+
+        override def supportedCustomMetrics(): Array[CustomMetric] = {
+          Array(new InMemorySimpleCustomMetric)
+        }
+      }
+    }
+  }
+
+  protected abstract class TestBatchWrite extends BatchWrite {
+    override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
+      BufferedRowsWriterFactory
+    }
+
+    override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private object Append extends TestBatchWrite {
+    override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
+      withData(messages.map(_.asInstanceOf[BufferedRows]))
+    }
+  }
+
+  private object DynamicOverwrite extends TestBatchWrite {
+    override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
+      val newData = messages.map(_.asInstanceOf[BufferedRows])
+      dataMap --= newData.flatMap(_.rows.map(getKey))
+      withData(newData)
+    }
+  }
+
+  private class Overwrite(filters: Array[Filter]) extends TestBatchWrite {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+    override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
+      val deleteKeys = InMemoryBaseTable.filtersToKeys(
+        dataMap.keys, partCols.map(_.toSeq.quoted), filters)
+      dataMap --= deleteKeys
+      withData(messages.map(_.asInstanceOf[BufferedRows]))
+    }
+  }
+
+  private object TruncateAndAppend extends TestBatchWrite {
+    override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
+      dataMap.clear
+      withData(messages.map(_.asInstanceOf[BufferedRows]))
+    }
+  }
+
+  private abstract class TestStreamingWrite extends StreamingWrite {
+    def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory = {
+      BufferedRowsWriterFactory
+    }
+
+    def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+  }
+
+  private class StreamingNotSupportedOperation(operation: String) extends TestStreamingWrite {
+    override def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory =
+      throwsException()
+
+    override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit =
+      throwsException()
+
+    override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit =
+      throwsException()
+
+    def throwsException[T](): T = throw new IllegalStateException("The operation " +
+      s"${operation} isn't supported for streaming query.")
+  }
+
+  private object StreamingAppend extends TestStreamingWrite {
+    override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
+      dataMap.synchronized {
+        withData(messages.map(_.asInstanceOf[BufferedRows]))
+      }
+    }
+  }
+
+  private object StreamingTruncateAndAppend extends TestStreamingWrite {
+    override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
+      dataMap.synchronized {
+        dataMap.clear
+        withData(messages.map(_.asInstanceOf[BufferedRows]))
+      }
+    }
+  }
+}
+
+object InMemoryBaseTable {
+  val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+
+  def filtersToKeys(
+      keys: Iterable[Seq[Any]],
+      partitionNames: Seq[String],
+      filters: Array[Filter]): Iterable[Seq[Any]] = {

Review Comment:
   This method is currently used by ```class Overwrite(filters: Array[Filter])``` at line 422, so I will leave this method in this base class for now. When I work on my next PR `SupportsOverWriteV2`, I will clean up this base class, move this method and all the `Filter` related methods to `InMemoryTable`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942055045


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,18 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {
+    predicates.flatMap { predicate =>
+      val filter = toV1(predicate)
+      if (filter.isEmpty) {
+        if (!skipIfNotConvertible) {
+          throw QueryCompilationErrors.unsupportedPredicateToFilterConversionError(predicate.name())
+        }
+      }
+      filter
+    }.toArray

Review Comment:
   Yes, I double checked that Scala 2.13 doesn't need this `toArray`.  Moreover, the redundant 'toArray' will make the performance worse when using Scala 2.13 due `ArrayOps.toArray` will always do a memory copy even if the item Type has not changed , but Scala 2.12 will directly return the original array when the item Type is unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #37393:
URL: https://github.com/apache/spark/pull/37393#issuecomment-1209990638

   @cloud-fan Could you check one more time, please?
   
   The test failure is not relevant. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r941881487


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,18 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {
+    predicates.flatMap { predicate =>
+      val filter = toV1(predicate)
+      if (filter.isEmpty) {
+        if (!skipIfNotConvertible) {

Review Comment:
   Fixed. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r940853812


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala:
##########
@@ -30,7 +30,17 @@ class InMemoryTableWithV2Filter(
     schema: StructType,
     partitioning: Array[Transform],
     properties: util.Map[String, String])
-  extends InMemoryTable(name, schema, partitioning, properties) {
+  extends InMemoryBaseTable(name, schema, partitioning, properties) with SupportsDeleteV2 {
+
+  override def canDeleteWhere(filters: Array[Predicate]): Boolean = {
+    InMemoryTableWithV2Filter.supportsFilters(filters)
+  }
+
+  override def deleteWhere(filters: Array[Predicate]): Unit = dataMap.synchronized {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+    dataMap --= InMemoryTableWithV2Filter
+      .filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted), filters)

Review Comment:
   `import ...InMemoryTableWithV2Filter._`
   Then we can simplify the code.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala:
##########
@@ -17,218 +17,42 @@
 
 package org.apache.spark.sql.connector.catalog
 
-import java.time.{Instant, ZoneId}
-import java.time.temporal.ChronoUnit
 import java.util
-import java.util.OptionalLong
 
-import scala.collection.mutable
-
-import org.scalatest.Assertions._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
 import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
-import org.apache.spark.sql.connector.expressions._
-import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning}
-import org.apache.spark.sql.connector.write._
-import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
-import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
 
 /**
  * A simple in-memory table. Rows are stored as a buffered group produced by each output task.
  */
 class InMemoryTable(
-    val name: String,
-    val schema: StructType,
+    val _name: String,
+    val _schema: StructType,

Review Comment:
   Why change the name ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r939825908


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -20,41 +20,28 @@ package org.apache.spark.sql.internal.connector
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
 import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate}
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
 import org.apache.spark.sql.types.StringType
 
 private[sql] object PredicateUtils {
 
   def toV1(predicate: Predicate): Option[Filter] = {
 
-    def isValidBinaryPredicate(): Boolean = {
-      if (predicate.children().length == 2 &&
-        predicate.children()(0).isInstanceOf[NamedReference] &&
-        predicate.children()(1).isInstanceOf[LiteralValue[_]]) {
-        true
-      } else {
-        false
-      }
-    }
-
     predicate.name() match {
-      case "IN" if predicate.children()(0).isInstanceOf[NamedReference] =>
+      case "IN" if isValidPredicate(predicate) =>
         val attribute = predicate.children()(0).toString
         val values = predicate.children().drop(1)
         if (values.length > 0) {
-          if (!values.forall(_.isInstanceOf[LiteralValue[_]])) return None
           val dataType = values(0).asInstanceOf[LiteralValue[_]].dataType
-          if (!values.forall(_.asInstanceOf[LiteralValue[_]].dataType.sameType(dataType))) {
-            return None
-          }
           val inValues = values.map(v =>
             CatalystTypeConverters.convertToScala(v.asInstanceOf[LiteralValue[_]].value, dataType))
           Some(In(attribute, inValues))
         } else {
           Some(In(attribute, Array.empty[Any]))
         }
 
-      case "=" | "<=>" | ">" | "<" | ">=" | "<=" if isValidBinaryPredicate =>

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937384579


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();
+    for (int i = 0; i < predicates.length; i++) {
+      Option filter = PredicateUtils.toV1(predicates[i]);

Review Comment:
   `Option<Filter>`? can avoid forced type conversion of line 85



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937427781


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.time.{Instant, ZoneId}
+import java.time.temporal.ChronoUnit
+import java.util
+import java.util.OptionalLong
+
+import scala.collection.mutable
+
+import org.scalatest.Assertions._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
+import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
+import org.apache.spark.sql.connector.expressions._
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A simple in-memory table. Rows are stored as a buffered group produced by each output task.
+ */
+class InMemoryBaseTable(
+    val name: String,
+    val schema: StructType,
+    override val partitioning: Array[Transform],
+    override val properties: util.Map[String, String],
+    val distribution: Distribution = Distributions.unspecified(),
+    val ordering: Array[SortOrder] = Array.empty,
+    val numPartitions: Option[Int] = None,
+    val isDistributionStrictlyRequired: Boolean = true)
+  extends Table with SupportsRead with SupportsWrite with SupportsMetadataColumns {
+
+  protected object PartitionKeyColumn extends MetadataColumn {
+    override def name: String = "_partition"
+    override def dataType: DataType = StringType
+    override def comment: String = "Partition key used to store the row"
+  }
+
+  private object IndexColumn extends MetadataColumn {
+    override def name: String = "index"
+    override def dataType: DataType = IntegerType
+    override def comment: String = "Metadata column used to conflict with a data column"
+  }
+
+  // purposely exposes a metadata column that conflicts with a data column in some tests
+  override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
+  private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)
+
+  private val allowUnsupportedTransforms =
+    properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
+
+  partitioning.foreach {
+    case _: IdentityTransform =>
+    case _: YearsTransform =>
+    case _: MonthsTransform =>
+    case _: DaysTransform =>
+    case _: HoursTransform =>
+    case _: BucketTransform =>
+    case _: SortedBucketTransform =>
+    case t if !allowUnsupportedTransforms =>
+      throw new IllegalArgumentException(s"Transform $t is not a supported transform")
+  }
+
+  // The key `Seq[Any]` is the partition values.
+  val dataMap: mutable.Map[Seq[Any], BufferedRows] = mutable.Map.empty
+
+  def data: Array[BufferedRows] = dataMap.values.toArray
+
+  def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
+
+  val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref =>
+    schema.findNestedField(ref.fieldNames(), includeCollections = false) match {
+      case Some(_) => ref.fieldNames()
+      case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.")
+    }
+  }
+
+  private val UTC = ZoneId.of("UTC")
+  private val EPOCH_LOCAL_DATE = Instant.EPOCH.atZone(UTC).toLocalDate
+
+  protected def getKey(row: InternalRow): Seq[Any] = {
+    getKey(row, schema)
+  }
+
+  protected def getKey(row: InternalRow, rowSchema: StructType): Seq[Any] = {
+    @scala.annotation.tailrec
+    def extractor(
+        fieldNames: Array[String],
+        schema: StructType,
+        row: InternalRow): (Any, DataType) = {
+      val index = schema.fieldIndex(fieldNames(0))
+      val value = row.toSeq(schema).apply(index)
+      if (fieldNames.length > 1) {
+        (value, schema(index).dataType) match {
+          case (row: InternalRow, nestedSchema: StructType) =>
+            extractor(fieldNames.drop(1), nestedSchema, row)
+          case (_, dataType) =>
+            throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}")
+        }
+      } else {
+        (value, schema(index).dataType)
+      }
+    }
+
+    val cleanedSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(rowSchema)
+    partitioning.map {
+      case IdentityTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row)._1
+      case YearsTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days: Int, DateType) =>
+            ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days))
+          case (micros: Long, TimestampType) =>
+            val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate
+            ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate)
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case MonthsTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days: Int, DateType) =>
+            ChronoUnit.MONTHS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days))
+          case (micros: Long, TimestampType) =>
+            val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate
+            ChronoUnit.MONTHS.between(EPOCH_LOCAL_DATE, localDate)
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case DaysTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days, DateType) =>
+            days
+          case (micros: Long, TimestampType) =>
+            ChronoUnit.DAYS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros))
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case HoursTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (micros: Long, TimestampType) =>
+            ChronoUnit.HOURS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros))
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case BucketTransform(numBuckets, cols, _) =>
+        val valueTypePairs = cols.map(col => extractor(col.fieldNames, cleanedSchema, row))
+        var valueHashCode = 0
+        valueTypePairs.foreach( pair =>
+          if ( pair._1 != null) valueHashCode += pair._1.hashCode()
+        )
+        var dataTypeHashCode = 0
+        valueTypePairs.foreach(dataTypeHashCode += _._2.hashCode())
+        ((valueHashCode + 31 * dataTypeHashCode) & Integer.MAX_VALUE) % numBuckets
+    }
+  }
+
+  protected def addPartitionKey(key: Seq[Any]): Unit = {}
+
+  protected def renamePartitionKey(
+      partitionSchema: StructType,
+      from: Seq[Any],
+      to: Seq[Any]): Boolean = {
+    val rows = dataMap.remove(from).getOrElse(new BufferedRows(from))
+    val newRows = new BufferedRows(to)
+    rows.rows.foreach { r =>
+      val newRow = new GenericInternalRow(r.numFields)
+      for (i <- 0 until r.numFields) newRow.update(i, r.get(i, schema(i).dataType))
+      for (i <- 0 until partitionSchema.length) {
+        val j = schema.fieldIndex(partitionSchema(i).name)
+        newRow.update(j, to(i))
+      }
+      newRows.withRow(newRow)
+    }
+    dataMap.put(to, newRows).foreach { _ =>
+      throw new IllegalStateException(
+        s"The ${to.mkString("[", ", ", "]")} partition exists already")
+    }
+    true
+  }
+
+  protected def removePartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    dataMap.remove(key)
+  }
+
+  protected def createPartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    if (!dataMap.contains(key)) {
+      val emptyRows = new BufferedRows(key)
+      val rows = if (key.length == schema.length) {
+        emptyRows.withRow(InternalRow.fromSeq(key))
+      } else emptyRows
+      dataMap.put(key, rows)
+    }
+  }
+
+  protected def clearPartition(key: Seq[Any]): Unit = dataMap.synchronized {
+    assert(dataMap.contains(key))
+    dataMap(key).clear()
+  }
+
+  def withData(data: Array[BufferedRows]): InMemoryBaseTable = {
+    withData(data, schema)
+  }
+
+  def withData(
+      data: Array[BufferedRows],
+      writeSchema: StructType): InMemoryBaseTable = dataMap.synchronized {
+    data.foreach(_.rows.foreach { row =>
+      val key = getKey(row, writeSchema)
+      dataMap += dataMap.get(key)
+        .map(key -> _.withRow(row))
+        .getOrElse(key -> new BufferedRows(key).withRow(row))
+      addPartitionKey(key)
+    })
+    this
+  }
+
+  override def capabilities: util.Set[TableCapability] = util.EnumSet.of(
+    TableCapability.BATCH_READ,
+    TableCapability.BATCH_WRITE,
+    TableCapability.STREAMING_WRITE,
+    TableCapability.OVERWRITE_BY_FILTER,
+    TableCapability.OVERWRITE_DYNAMIC,
+    TableCapability.TRUNCATE)
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    new InMemoryScanBuilder(schema)
+  }
+
+  class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
+      with SupportsPushDownRequiredColumns {
+    private var schema: StructType = tableSchema
+
+    override def build: Scan =
+      new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
+
+    override def pruneColumns(requiredSchema: StructType): Unit = {
+      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
+      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+    }
+  }
+
+  case class InMemoryStats(sizeInBytes: OptionalLong, numRows: OptionalLong) extends Statistics
+
+  abstract class BatchScanBaseClass(
+      var data: Seq[InputPartition],
+      readSchema: StructType,
+      tableSchema: StructType)
+    extends Scan with Batch with SupportsReportStatistics with SupportsReportPartitioning {
+
+    override def toBatch: Batch = this
+
+    override def estimateStatistics(): Statistics = {
+      if (data.isEmpty) {
+        return InMemoryStats(OptionalLong.of(0L), OptionalLong.of(0L))
+      }
+
+      val inputPartitions = data.map(_.asInstanceOf[BufferedRows])
+      val numRows = inputPartitions.map(_.rows.size).sum
+      // we assume an average object header is 12 bytes
+      val objectHeaderSizeInBytes = 12L
+      val rowSizeInBytes = objectHeaderSizeInBytes + schema.defaultSize
+      val sizeInBytes = numRows * rowSizeInBytes
+      InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows))
+    }
+
+    override def outputPartitioning(): Partitioning = {
+      if (InMemoryBaseTable.this.partitioning.nonEmpty) {
+        new KeyGroupedPartitioning(
+          InMemoryBaseTable.this.partitioning.map(_.asInstanceOf[Expression]),
+          data.size)
+      } else {
+        new UnknownPartitioning(data.size)
+      }
+    }
+
+    override def planInputPartitions(): Array[InputPartition] = data.toArray
+
+    override def createReaderFactory(): PartitionReaderFactory = {
+      val metadataColumns = readSchema.map(_.name).filter(metadataColumnNames.contains)
+      val nonMetadataColumns = readSchema.filterNot(f => metadataColumns.contains(f.name))
+      new BufferedRowsReaderFactory(metadataColumns, nonMetadataColumns, tableSchema)
+    }
+  }
+
+  case class InMemoryBatchScan(
+      var _data: Seq[InputPartition],
+      readSchema: StructType,
+      tableSchema: StructType)
+    extends BatchScanBaseClass (_data, readSchema, tableSchema) with SupportsRuntimeFiltering {
+
+    override def filterAttributes(): Array[NamedReference] = {
+      val scanFields = readSchema.fields.map(_.name).toSet
+      partitioning.flatMap(_.references)
+        .filter(ref => scanFields.contains(ref.fieldNames.mkString(".")))
+    }
+
+    override def filter(filters: Array[Filter]): Unit = {
+      if (partitioning.length == 1 && partitioning.head.references().length == 1) {
+        val ref = partitioning.head.references().head
+        filters.foreach {
+          case In(attrName, values) if attrName == ref.toString =>
+            val matchingKeys = values.map(_.toString).toSet
+            data = data.filter(partition => {
+              val key = partition.asInstanceOf[BufferedRows].keyString
+              matchingKeys.contains(key)
+            })
+
+          case _ => // skip
+        }
+      }
+    }
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    InMemoryBaseTable.maybeSimulateFailedTableWrite(new CaseInsensitiveStringMap(properties))
+    InMemoryBaseTable.maybeSimulateFailedTableWrite(info.options)
+
+    new WriteBuilder with SupportsTruncate with SupportsOverwrite
+      with SupportsDynamicOverwrite with SupportsStreamingUpdateAsAppend {
+
+      private var writer: BatchWrite = Append
+      private var streamingWriter: StreamingWrite = StreamingAppend
+
+      override def truncate(): WriteBuilder = {
+        assert(writer == Append)
+        writer = TruncateAndAppend
+        streamingWriter = StreamingTruncateAndAppend
+        this
+      }
+
+      override def overwrite(filters: Array[Filter]): WriteBuilder = {
+        assert(writer == Append)
+        writer = new Overwrite(filters)
+        streamingWriter = new StreamingNotSupportedOperation(
+          s"overwrite (${filters.mkString("filters(", ", ", ")")})")
+        this
+      }
+
+      override def overwriteDynamicPartitions(): WriteBuilder = {
+        assert(writer == Append)
+        writer = DynamicOverwrite
+        streamingWriter = new StreamingNotSupportedOperation("overwriteDynamicPartitions")
+        this
+      }
+
+      override def build(): Write = new Write with RequiresDistributionAndOrdering {
+        override def requiredDistribution: Distribution = distribution
+
+        override def distributionStrictlyRequired = isDistributionStrictlyRequired

Review Comment:
   should have type annotation due to this is a public function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937269735


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDeleteV2.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+
+/**
+ * A mix-in interface for {@link Table} delete support. Data sources can implement this
+ * interface to provide the ability to delete data from tables that matches filter expressions.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface SupportsDeleteV2 extends TruncatableTable {
+
+  /**
+   * Checks whether it is possible to delete data from a data source table that matches filter
+   * expressions.
+   * <p>
+   * Rows should be deleted from the data source iff all of the filter expressions match.
+   * That is, the expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
+   * would reject the delete operation because it requires significant effort. If this method
+   * returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
+   * the delete operation and produce row-level changes if the data source table supports deleting
+   * individual records.
+   *
+   * @param predicates V2 filter expressions, used to select rows to delete when all expressions
+   *                  match
+   * @return true if the delete operation can be performed
+   *
+   * @since 3.4.0
+   */
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    return true;
+  }
+
+  /**
+   * Delete data from a data source table that matches filter expressions. Note that this method
+   * will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
+   * <p>
+   * Rows are deleted from the data source iff all of the filter expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Implementations may reject a delete operation if the delete isn't possible without significant
+   * effort. For example, partitioned data sources may reject deletes that do not filter by
+   * partition columns because the filter may require rewriting files without deleted records.
+   * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   *
+   * @param predicates predicate expressions, used to select rows to delete when all expressions
+   *                  match
+   * @throws IllegalArgumentException If the delete is rejected due to required effort
+   */
+  void deleteWhere(Predicate[] predicates);
+
+  @Override
+  default boolean truncateTable() {
+    Predicate[] filters = new Predicate[] { new AlwaysTrue() };

Review Comment:
   I think `new AlwaysTrue()` is no need.
   So we need adjust `canDeleteWhere` of `InMemoryTable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937269735


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDeleteV2.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+
+/**
+ * A mix-in interface for {@link Table} delete support. Data sources can implement this
+ * interface to provide the ability to delete data from tables that matches filter expressions.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface SupportsDeleteV2 extends TruncatableTable {
+
+  /**
+   * Checks whether it is possible to delete data from a data source table that matches filter
+   * expressions.
+   * <p>
+   * Rows should be deleted from the data source iff all of the filter expressions match.
+   * That is, the expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
+   * would reject the delete operation because it requires significant effort. If this method
+   * returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
+   * the delete operation and produce row-level changes if the data source table supports deleting
+   * individual records.
+   *
+   * @param predicates V2 filter expressions, used to select rows to delete when all expressions
+   *                  match
+   * @return true if the delete operation can be performed
+   *
+   * @since 3.4.0
+   */
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    return true;
+  }
+
+  /**
+   * Delete data from a data source table that matches filter expressions. Note that this method
+   * will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
+   * <p>
+   * Rows are deleted from the data source iff all of the filter expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Implementations may reject a delete operation if the delete isn't possible without significant
+   * effort. For example, partitioned data sources may reject deletes that do not filter by
+   * partition columns because the filter may require rewriting files without deleted records.
+   * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   *
+   * @param predicates predicate expressions, used to select rows to delete when all expressions
+   *                  match
+   * @throws IllegalArgumentException If the delete is rejected due to required effort
+   */
+  void deleteWhere(Predicate[] predicates);
+
+  @Override
+  default boolean truncateTable() {
+    Predicate[] filters = new Predicate[] { new AlwaysTrue() };

Review Comment:
   I think no need `new AlwaysTrue()`.
   So we need adjust `canDeleteWhere` of `InMemoryTable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937385387


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();
+    for (int i = 0; i < predicates.length; i++) {

Review Comment:
   I prefer to use enhanced `for` loop
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937396523


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();
+    for (int i = 0; i < predicates.length; i++) {

Review Comment:
   > I suggest using Java 8 lambda expression.
   
   +1, If there is no strong performance demand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r939826754


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +117,66 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      throwExceptionIfNotConvertible: Boolean): Array[Filter] = {

Review Comment:
   `skipIfNotConvertible` looks shorter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r942670128


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -132,4 +133,16 @@ private[sql] object PredicateUtils {
       case _ => None
     }
   }
+
+  def toV1(
+      predicates: Array[Predicate],
+      skipIfNotConvertible: Boolean): Array[Filter] = {

Review Comment:
   at least we can still throw error if we want to, by checking the number of returned v1 filters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937288540


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDeleteV2.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.AlwaysTrue;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+
+/**
+ * A mix-in interface for {@link Table} delete support. Data sources can implement this
+ * interface to provide the ability to delete data from tables that matches filter expressions.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface SupportsDeleteV2 extends TruncatableTable {
+
+  /**
+   * Checks whether it is possible to delete data from a data source table that matches filter
+   * expressions.
+   * <p>
+   * Rows should be deleted from the data source iff all of the filter expressions match.
+   * That is, the expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
+   * would reject the delete operation because it requires significant effort. If this method
+   * returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
+   * the delete operation and produce row-level changes if the data source table supports deleting
+   * individual records.
+   *
+   * @param predicates V2 filter expressions, used to select rows to delete when all expressions
+   *                  match
+   * @return true if the delete operation can be performed
+   *
+   * @since 3.4.0
+   */
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    return true;
+  }
+
+  /**
+   * Delete data from a data source table that matches filter expressions. Note that this method
+   * will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
+   * <p>
+   * Rows are deleted from the data source iff all of the filter expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed together.
+   * <p>
+   * Implementations may reject a delete operation if the delete isn't possible without significant
+   * effort. For example, partitioned data sources may reject deletes that do not filter by
+   * partition columns because the filter may require rewriting files without deleted records.
+   * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   *
+   * @param predicates predicate expressions, used to select rows to delete when all expressions
+   *                  match
+   * @throws IllegalArgumentException If the delete is rejected due to required effort
+   */
+  void deleteWhere(Predicate[] predicates);
+
+  @Override
+  default boolean truncateTable() {
+    Predicate[] filters = new Predicate[] { new AlwaysTrue() };

Review Comment:
   I am trying to implement this exactly the same way as `SupportsDelete.truncateTable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r937392105


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -70,6 +77,34 @@ default boolean canDeleteWhere(Filter[] filters) {
    */
   void deleteWhere(Filter[] filters);
 
+  default boolean canDeleteWhere(Predicate[] predicates) {
+    List<Filter> filterList = new ArrayList();
+    for (int i = 0; i < predicates.length; i++) {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r938782700


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala:
##########
@@ -277,14 +277,14 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
           // correctness depends on removing all matching data.
           val filters = DataSourceStrategy.normalizeExprs(Seq(condition), output)
               .flatMap(splitConjunctivePredicates(_).map {
-                f => DataSourceStrategy.translateFilter(f, true).getOrElse(
+                f => DataSourceV2Strategy.translateFilterV2(f).getOrElse(
                   throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(f))

Review Comment:
   `SupportsDelete` is different from runtime filtering. For runtime filtering, we need to push as many predicates as possible, but this is just a perf improvement. `SupportsDelete` must take all predicates, otherwise the source can't delete enough data which leads to wrong query result later. We need to follow the same behavior in https://github.com/apache/spark/pull/37393/files#diff-dc485c81773a73a5a462994af50e17a5043a8d66c47399cf29b0a3cb56c85591R80



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r938778661


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java:
##########
@@ -28,7 +30,7 @@
  * @since 3.0.0
  */
 @Evolving
-public interface SupportsDelete extends TruncatableTable {
+public interface SupportsDelete extends TruncatableTable, SupportsDeleteV2 {

Review Comment:
   ```suggestion
   public interface SupportsDelete extends SupportsDeleteV2 {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao closed pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao closed pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete
URL: https://github.com/apache/spark/pull/37393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #37393:
URL: https://github.com/apache/spark/pull/37393#issuecomment-1212110609

   Merged to master. Thanks @cloud-fan @beliefer @LuciferYang for reviewing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #37393: [SPARK-39966][SQL] Use V2 Filter in SupportsDelete

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #37393:
URL: https://github.com/apache/spark/pull/37393#discussion_r940803579


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.time.{Instant, ZoneId}
+import java.time.temporal.ChronoUnit
+import java.util
+import java.util.OptionalLong
+
+import scala.collection.mutable
+
+import org.scalatest.Assertions._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
+import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
+import org.apache.spark.sql.connector.expressions._
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A simple in-memory table. Rows are stored as a buffered group produced by each output task.
+ */
+class InMemoryBaseTable(
+    val name: String,
+    val schema: StructType,
+    override val partitioning: Array[Transform],
+    override val properties: util.Map[String, String],
+    val distribution: Distribution = Distributions.unspecified(),
+    val ordering: Array[SortOrder] = Array.empty,
+    val numPartitions: Option[Int] = None,
+    val isDistributionStrictlyRequired: Boolean = true)
+  extends Table with SupportsRead with SupportsWrite with SupportsMetadataColumns {
+
+  protected object PartitionKeyColumn extends MetadataColumn {
+    override def name: String = "_partition"
+    override def dataType: DataType = StringType
+    override def comment: String = "Partition key used to store the row"
+  }
+
+  private object IndexColumn extends MetadataColumn {
+    override def name: String = "index"
+    override def dataType: DataType = IntegerType
+    override def comment: String = "Metadata column used to conflict with a data column"
+  }
+
+  // purposely exposes a metadata column that conflicts with a data column in some tests
+  override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
+  private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)
+
+  private val allowUnsupportedTransforms =
+    properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
+
+  partitioning.foreach {
+    case _: IdentityTransform =>
+    case _: YearsTransform =>
+    case _: MonthsTransform =>
+    case _: DaysTransform =>
+    case _: HoursTransform =>
+    case _: BucketTransform =>
+    case _: SortedBucketTransform =>
+    case t if !allowUnsupportedTransforms =>
+      throw new IllegalArgumentException(s"Transform $t is not a supported transform")
+  }
+
+  // The key `Seq[Any]` is the partition values.
+  val dataMap: mutable.Map[Seq[Any], BufferedRows] = mutable.Map.empty
+
+  def data: Array[BufferedRows] = dataMap.values.toArray
+
+  def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
+
+  val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref =>
+    schema.findNestedField(ref.fieldNames(), includeCollections = false) match {
+      case Some(_) => ref.fieldNames()
+      case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.")
+    }
+  }
+
+  private val UTC = ZoneId.of("UTC")
+  private val EPOCH_LOCAL_DATE = Instant.EPOCH.atZone(UTC).toLocalDate
+
+  protected def getKey(row: InternalRow): Seq[Any] = {
+    getKey(row, schema)
+  }
+
+  protected def getKey(row: InternalRow, rowSchema: StructType): Seq[Any] = {
+    @scala.annotation.tailrec
+    def extractor(
+        fieldNames: Array[String],
+        schema: StructType,
+        row: InternalRow): (Any, DataType) = {
+      val index = schema.fieldIndex(fieldNames(0))
+      val value = row.toSeq(schema).apply(index)
+      if (fieldNames.length > 1) {
+        (value, schema(index).dataType) match {
+          case (row: InternalRow, nestedSchema: StructType) =>
+            extractor(fieldNames.drop(1), nestedSchema, row)
+          case (_, dataType) =>
+            throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}")
+        }
+      } else {
+        (value, schema(index).dataType)
+      }
+    }
+
+    val cleanedSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(rowSchema)
+    partitioning.map {
+      case IdentityTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row)._1
+      case YearsTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days: Int, DateType) =>
+            ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days))
+          case (micros: Long, TimestampType) =>
+            val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate
+            ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate)
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case MonthsTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days: Int, DateType) =>
+            ChronoUnit.MONTHS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days))
+          case (micros: Long, TimestampType) =>
+            val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate
+            ChronoUnit.MONTHS.between(EPOCH_LOCAL_DATE, localDate)
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case DaysTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (days, DateType) =>
+            days
+          case (micros: Long, TimestampType) =>
+            ChronoUnit.DAYS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros))
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case HoursTransform(ref) =>
+        extractor(ref.fieldNames, cleanedSchema, row) match {
+          case (micros: Long, TimestampType) =>
+            ChronoUnit.HOURS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros))
+          case (v, t) =>
+            throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
+        }
+      case BucketTransform(numBuckets, cols, _) =>
+        val valueTypePairs = cols.map(col => extractor(col.fieldNames, cleanedSchema, row))
+        var valueHashCode = 0
+        valueTypePairs.foreach( pair =>
+          if ( pair._1 != null) valueHashCode += pair._1.hashCode()
+        )
+        var dataTypeHashCode = 0
+        valueTypePairs.foreach(dataTypeHashCode += _._2.hashCode())
+        ((valueHashCode + 31 * dataTypeHashCode) & Integer.MAX_VALUE) % numBuckets
+    }
+  }
+
+  protected def addPartitionKey(key: Seq[Any]): Unit = {}
+
+  protected def renamePartitionKey(
+      partitionSchema: StructType,
+      from: Seq[Any],
+      to: Seq[Any]): Boolean = {
+    val rows = dataMap.remove(from).getOrElse(new BufferedRows(from))
+    val newRows = new BufferedRows(to)
+    rows.rows.foreach { r =>
+      val newRow = new GenericInternalRow(r.numFields)
+      for (i <- 0 until r.numFields) newRow.update(i, r.get(i, schema(i).dataType))
+      for (i <- 0 until partitionSchema.length) {
+        val j = schema.fieldIndex(partitionSchema(i).name)
+        newRow.update(j, to(i))
+      }
+      newRows.withRow(newRow)
+    }
+    dataMap.put(to, newRows).foreach { _ =>
+      throw new IllegalStateException(
+        s"The ${to.mkString("[", ", ", "]")} partition exists already")
+    }
+    true
+  }
+
+  protected def removePartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    dataMap.remove(key)
+  }
+
+  protected def createPartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    if (!dataMap.contains(key)) {
+      val emptyRows = new BufferedRows(key)
+      val rows = if (key.length == schema.length) {
+        emptyRows.withRow(InternalRow.fromSeq(key))
+      } else emptyRows
+      dataMap.put(key, rows)
+    }
+  }
+
+  protected def clearPartition(key: Seq[Any]): Unit = dataMap.synchronized {
+    assert(dataMap.contains(key))
+    dataMap(key).clear()
+  }
+
+  def withData(data: Array[BufferedRows]): InMemoryBaseTable = {
+    withData(data, schema)
+  }
+
+  def withData(
+      data: Array[BufferedRows],
+      writeSchema: StructType): InMemoryBaseTable = dataMap.synchronized {
+    data.foreach(_.rows.foreach { row =>
+      val key = getKey(row, writeSchema)
+      dataMap += dataMap.get(key)
+        .map(key -> _.withRow(row))
+        .getOrElse(key -> new BufferedRows(key).withRow(row))
+      addPartitionKey(key)
+    })
+    this
+  }
+
+  override def capabilities: util.Set[TableCapability] = util.EnumSet.of(
+    TableCapability.BATCH_READ,
+    TableCapability.BATCH_WRITE,
+    TableCapability.STREAMING_WRITE,
+    TableCapability.OVERWRITE_BY_FILTER,
+    TableCapability.OVERWRITE_DYNAMIC,
+    TableCapability.TRUNCATE)
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    new InMemoryScanBuilder(schema)
+  }
+
+  class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
+      with SupportsPushDownRequiredColumns {
+    private var schema: StructType = tableSchema
+
+    override def build: Scan =
+      InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
+
+    override def pruneColumns(requiredSchema: StructType): Unit = {
+      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
+      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+    }
+  }
+
+  case class InMemoryStats(sizeInBytes: OptionalLong, numRows: OptionalLong) extends Statistics
+
+  abstract class BatchScanBaseClass(
+      var data: Seq[InputPartition],
+      readSchema: StructType,
+      tableSchema: StructType)
+    extends Scan with Batch with SupportsReportStatistics with SupportsReportPartitioning {
+
+    override def toBatch: Batch = this
+
+    override def estimateStatistics(): Statistics = {
+      if (data.isEmpty) {
+        return InMemoryStats(OptionalLong.of(0L), OptionalLong.of(0L))
+      }
+
+      val inputPartitions = data.map(_.asInstanceOf[BufferedRows])
+      val numRows = inputPartitions.map(_.rows.size).sum
+      // we assume an average object header is 12 bytes
+      val objectHeaderSizeInBytes = 12L
+      val rowSizeInBytes = objectHeaderSizeInBytes + schema.defaultSize
+      val sizeInBytes = numRows * rowSizeInBytes
+      InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows))
+    }
+
+    override def outputPartitioning(): Partitioning = {
+      if (InMemoryBaseTable.this.partitioning.nonEmpty) {
+        new KeyGroupedPartitioning(
+          InMemoryBaseTable.this.partitioning.map(_.asInstanceOf[Expression]),
+          data.size)
+      } else {
+        new UnknownPartitioning(data.size)
+      }
+    }
+
+    override def planInputPartitions(): Array[InputPartition] = data.toArray
+
+    override def createReaderFactory(): PartitionReaderFactory = {
+      val metadataColumns = readSchema.map(_.name).filter(metadataColumnNames.contains)
+      val nonMetadataColumns = readSchema.filterNot(f => metadataColumns.contains(f.name))
+      new BufferedRowsReaderFactory(metadataColumns, nonMetadataColumns, tableSchema)
+    }
+  }
+
+  case class InMemoryBatchScan(
+      var _data: Seq[InputPartition],
+      readSchema: StructType,
+      tableSchema: StructType)
+    extends BatchScanBaseClass (_data, readSchema, tableSchema) with SupportsRuntimeFiltering {

Review Comment:
   ```suggestion
       extends BatchScanBaseClass(_data, readSchema, tableSchema) with SupportsRuntimeFiltering {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org