You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/06/20 05:58:28 UTC

[GitHub] [spark] huaxingao opened a new pull request, #36918: Support runtime V2 filtering

huaxingao opened a new pull request, #36918:
URL: https://github.com/apache/spark/pull/36918

   ### What changes were proposed in this pull request?
   Use V2 Filter in run time filtering for V2 Table
   
   
   ### Why are the changes needed?
   We should use V2 Filter in DS V2.
   https://github.com/apache/spark/pull/32921#discussion_r653321905
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes
   new interface `SupportsRuntimeV2Filtering`
   
   ### How was this patch tested?
   new test suite


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r931736466


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter. Predicate
+import org.apache.spark.sql.sources.{Filter, In}
+
+private[sql] object PredicateUtils {
+
+  def toV1(predicate: Predicate): Option[Filter] = {
+    predicate.name() match {
+      // Todo: add conversion for other V2 Predicate

Review Comment:
   ```suggestion
         // TODO: add conversion for other V2 Predicate
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LorenzoMartini commented on pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
LorenzoMartini commented on PR #36918:
URL: https://github.com/apache/spark/pull/36918#issuecomment-1377123165

   Hi @huaxingao.
   
   We are trying to use spark datasourceV2 and noticed that the spark v2 built-in data sources (eg parquet one, looking at `ParquetScan`) don't support this (`SupportsRuntimeFiltering` nor `SupportsRuntimeV2Filtering`) by default, creating a large performance difference between using v1 and v2 datasource ootb.
   
   Is there a plan to have them support this? It would be really beneficial for the file scans to be able to do this and given they already benefit of some push downs we were wondering why the runtime filtering is not implemented. Or maybe I am missing something? And in that case it would be great to understand how to have spark file sources take advantage of dpp.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r930791167


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsRuntimeFiltering.java:
##########
@@ -57,4 +59,30 @@ public interface SupportsRuntimeFiltering extends Scan {
    * @param filters data source filters used to filter the scan at runtime
    */
   void filter(Filter[] filters);
+
+  /**
+   * Filters this scan using runtime predicates.
+   * <p>
+   * The provided expressions must be interpreted as a set of predicates that are ANDed together.
+   * Implementations may use the predicates to prune initially planned {@link InputPartition}s.
+   * <p>
+   * If the scan also implements {@link SupportsReportPartitioning}, it must preserve
+   * the originally reported partitioning during runtime filtering. While applying runtime
+   * predicates, the scan may detect that some {@link InputPartition}s have no matching data. It
+   * can omit such partitions entirely only if it does not report a specific partitioning.
+   * Otherwise, the scan can replace the initially planned {@link InputPartition}s that have no
+   * matching data with empty {@link InputPartition}s but must preserve the overall number of
+   * partitions.
+   * <p>
+   * Note that Spark will call {@link Scan#toBatch()} again after filtering the scan at runtime.
+   *
+   * @param predicates data source V2 predicates used to filter the scan at runtime
+   */
+  default void filter(Predicate[] predicates) {
+    Filter[] filters = new Filter[predicates.length];
+    for (int i = 0; i < predicates.length; i++) {
+      filters[i] = PredicateUtils.toV1(predicates[i]).get();

Review Comment:
   We should drop the predicates that can't be translated to v1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #36918:
URL: https://github.com/apache/spark/pull/36918#issuecomment-1189897627

   cc @cloud-fan Could you please take a look when you have a moment? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r930795074


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/util/PredicateUtils.scala:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.sources.{Filter, In}
+
+private[sql] object PredicateUtils {
+
+  def toV1(predicate: Predicate): Option[Filter] = {
+    predicate.name() match {
+      // Todo: add conversion for other V2 Predicate
+      case "IN" =>

Review Comment:
   shall we at least support binary comparisons?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r930791712


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsRuntimeFiltering.java:
##########
@@ -57,4 +59,30 @@ public interface SupportsRuntimeFiltering extends Scan {
    * @param filters data source filters used to filter the scan at runtime
    */
   void filter(Filter[] filters);
+
+  /**
+   * Filters this scan using runtime predicates.
+   * <p>
+   * The provided expressions must be interpreted as a set of predicates that are ANDed together.
+   * Implementations may use the predicates to prune initially planned {@link InputPartition}s.
+   * <p>
+   * If the scan also implements {@link SupportsReportPartitioning}, it must preserve
+   * the originally reported partitioning during runtime filtering. While applying runtime
+   * predicates, the scan may detect that some {@link InputPartition}s have no matching data. It
+   * can omit such partitions entirely only if it does not report a specific partitioning.
+   * Otherwise, the scan can replace the initially planned {@link InputPartition}s that have no
+   * matching data with empty {@link InputPartition}s but must preserve the overall number of
+   * partitions.
+   * <p>
+   * Note that Spark will call {@link Scan#toBatch()} again after filtering the scan at runtime.
+   *
+   * @param predicates data source V2 predicates used to filter the scan at runtime

Review Comment:
   We can skip the api doc here as this method is inherited.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r925296507


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -85,6 +85,13 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
         } else {
           None
         }
+      case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) =>

Review Comment:
   I'm not very sure, is it possible to make `SupportsRuntimeFiltering` extends `SupportsRuntimeV2Filtering`? And inside `SupportsRuntimeFiltering` we translate v2 filter to v1. Then Spark only needs to deal with `SupportsRuntimeV2Filtering`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r953292454


##########
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala:
##########
@@ -1805,3 +1805,21 @@ class DynamicPartitionPruningV2SuiteAEOff extends DynamicPartitionPruningV2Suite
 
 class DynamicPartitionPruningV2SuiteAEOn extends DynamicPartitionPruningV2Suite
   with EnableAdaptiveExecutionSuite
+
+abstract class DynamicPartitionPruningV2FilterSuite
+    extends DynamicPartitionPruningDataSourceSuiteBase {

Review Comment:
   shall we extend `DynamicPartitionPruningV2Suite` here? then we can save the `override protected def runAnalyzeColumnCommands: Boolean = false`, and catalog configs will be overwritten.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #36918:
URL: https://github.com/apache/spark/pull/36918#issuecomment-1198240325

   Thanks @cloud-fan @zinking 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r931782549


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.sources.{Filter, In}
+
+private[sql] object PredicateUtils {
+
+  def toV1(predicate: Predicate): Option[Filter] = {
+    predicate.name() match {
+      // TODO: add conversion for other V2 Predicate
+      case "IN" if (predicate.children()(0).isInstanceOf[NamedReference]) =>
+        val attribute = predicate.children()(0)
+          .asInstanceOf[NamedReference].fieldNames().mkString(".")

Review Comment:
   ```suggestion
           val attribute = predicate.children()(0).toString
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r930795680


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/util/PredicateUtils.scala:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.sources.{Filter, In}
+
+private[sql] object PredicateUtils {
+
+  def toV1(predicate: Predicate): Option[Filter] = {
+    predicate.name() match {
+      // Todo: add conversion for other V2 Predicate
+      case "IN" =>
+        val attribute = predicate.children()(0)
+          .asInstanceOf[NamedReference].fieldNames().mkString(".")
+        val values = predicate.children().drop(1)
+        if (values.length > 0) {
+          val dataType = values(0).asInstanceOf[LiteralValue[_]].dataType
+          assert(values.forall(_.asInstanceOf[LiteralValue[_]].dataType.sameType(dataType)))
+          val inValues = values.map(v =>
+            CatalystTypeConverters.convertToScala(v.asInstanceOf[LiteralValue[_]].value, dataType))
+          Some(In(attribute, inValues))
+        } else {
+          Some(In(attribute, Array.empty[Any]))
+        }
+
+      case _ =>
+        throw QueryExecutionErrors.unsupportedPredicateToFilterConversionError(predicate.name())

Review Comment:
   It's weird to fail here. If we can't translate, just return `None`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r931737305


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter. Predicate
+import org.apache.spark.sql.sources.{Filter, In}
+
+private[sql] object PredicateUtils {
+
+  def toV1(predicate: Predicate): Option[Filter] = {
+    predicate.name() match {
+      // Todo: add conversion for other V2 Predicate
+      case "IN" =>
+        val attribute = predicate.children()(0)
+          .asInstanceOf[NamedReference].fieldNames().mkString(".")
+        val values = predicate.children().drop(1)
+        if (values.length > 0) {
+          val dataType = values(0).asInstanceOf[LiteralValue[_]].dataType

Review Comment:
   There is no guarantee that the v2 predicate `IN` contains `NamedReference` as input and literals as values. We should check them specifically and return None if they are not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r931736745


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter. Predicate

Review Comment:
   ```suggestion
   import org.apache.spark.sql.connector.expressions.filter.Predicate
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r931782817


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.sources.{Filter, In}
+
+private[sql] object PredicateUtils {
+
+  def toV1(predicate: Predicate): Option[Filter] = {
+    predicate.name() match {
+      // TODO: add conversion for other V2 Predicate
+      case "IN" if (predicate.children()(0).isInstanceOf[NamedReference]) =>

Review Comment:
   ```suggestion
         case "IN" if predicate.children()(0).isInstanceOf[NamedReference] =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r925186542


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -55,16 +57,27 @@ case class BatchScanExec(
 
   @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
     val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e)
+      case DynamicPruningExpression(e) =>
+        if (scan.isInstanceOf[SupportsRuntimeFiltering]) {

Review Comment:
   Changed. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r925292702


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -56,16 +58,26 @@ case class BatchScanExec(
 
   @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
     val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e)
+      case DynamicPruningExpression(e) =>
+        scan match {
+          case _: SupportsRuntimeFiltering =>
+            DataSourceStrategy.translateRuntimeFilter(e)
+          case _: SupportsRuntimeV2Filtering =>

Review Comment:
   shall we make `SupportsRuntimeV2Filtering` have higher priority over `SupportsRuntimeFiltering`? Also we need to document the behavior if a source implements both of them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r925296507


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -85,6 +85,13 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
         } else {
           None
         }
+      case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) =>

Review Comment:
   I'm not very sure, is it possible to make `SupportsRuntimeFiltering` extends `SupportsRuntimeV2Filtering`? And inside `SupportsRuntimeFiltering` we translate v2 filter to v1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r926452983


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -85,6 +85,13 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
         } else {
           None
         }
+      case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) =>

Review Comment:
   removing API is always hard...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zinking commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
zinking commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r925446962


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util
+
+import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue, NamedReference, Transform}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.read.{InputPartition, Scan, ScanBuilder, SupportsRuntimeV2Filtering}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class InMemoryTableWithV2Filter(
+    name: String,
+    schema: StructType,
+    partitioning: Array[Transform],
+    properties: util.Map[String, String])
+  extends InMemoryTable(name, schema, partitioning, properties) {
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    new InMemoryV2FilterScanBuilder(schema)
+  }
+
+  class InMemoryV2FilterScanBuilder(tableSchema: StructType)
+    extends InMemoryScanBuilder(tableSchema) {
+    override def build: Scan =
+      InMemoryV2FilterBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
+  }
+
+  case class InMemoryV2FilterBatchScan(
+      var _data: Seq[InputPartition],
+      readSchema: StructType,
+      tableSchema: StructType)
+    extends BatchScanBaseClass (_data, readSchema, tableSchema) with SupportsRuntimeV2Filtering {
+
+    override def filterAttributes(): Array[NamedReference] = {
+      val scanFields = readSchema.fields.map(_.name).toSet
+      partitioning.flatMap(_.references)
+        .filter(ref => scanFields.contains(ref.fieldNames.mkString(".")))
+    }
+
+    override def filter(filters: Array[Predicate]): Unit = {
+      if (partitioning.length == 1 && partitioning.head.references().length == 1) {
+        val ref = partitioning.head.references().head
+        filters.foreach {
+          case p : Predicate if p.name().equals("IN") =>

Review Comment:
   feels like some `unapply` method to extract what you want is more preferable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r926056639


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -85,6 +85,13 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
         } else {
           None
         }
+      case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) =>

Review Comment:
   I know it looks ugly to have duplicate code for both `SupportsRuntimeFiltering` and `SupportsRuntimeV2Filtering`. I thought we will deprecate and remove `SupportsRuntimeFiltering` in the future?



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util
+
+import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue, NamedReference, Transform}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.read.{InputPartition, Scan, ScanBuilder, SupportsRuntimeV2Filtering}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class InMemoryTableWithV2Filter(
+    name: String,
+    schema: StructType,
+    partitioning: Array[Transform],
+    properties: util.Map[String, String])
+  extends InMemoryTable(name, schema, partitioning, properties) {
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    new InMemoryV2FilterScanBuilder(schema)
+  }
+
+  class InMemoryV2FilterScanBuilder(tableSchema: StructType)
+    extends InMemoryScanBuilder(tableSchema) {
+    override def build: Scan =
+      InMemoryV2FilterBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
+  }
+
+  case class InMemoryV2FilterBatchScan(
+      var _data: Seq[InputPartition],
+      readSchema: StructType,
+      tableSchema: StructType)
+    extends BatchScanBaseClass (_data, readSchema, tableSchema) with SupportsRuntimeV2Filtering {
+
+    override def filterAttributes(): Array[NamedReference] = {
+      val scanFields = readSchema.fields.map(_.name).toSet
+      partitioning.flatMap(_.references)
+        .filter(ref => scanFields.contains(ref.fieldNames.mkString(".")))
+    }
+
+    override def filter(filters: Array[Predicate]): Unit = {
+      if (partitioning.length == 1 && partitioning.head.references().length == 1) {
+        val ref = partitioning.head.references().head
+        filters.foreach {
+          case p : Predicate if p.name().equals("IN") =>

Review Comment:
   `Predicate` is a java class. I don't think `unapply` can be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r953384036


##########
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala:
##########
@@ -1805,3 +1805,21 @@ class DynamicPartitionPruningV2SuiteAEOff extends DynamicPartitionPruningV2Suite
 
 class DynamicPartitionPruningV2SuiteAEOn extends DynamicPartitionPruningV2Suite
   with EnableAdaptiveExecutionSuite
+
+abstract class DynamicPartitionPruningV2FilterSuite
+    extends DynamicPartitionPruningDataSourceSuiteBase {

Review Comment:
   Sounds good. I have a follow-up [here](https://github.com/apache/spark/pull/37643)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r925294799


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala:
##########
@@ -582,6 +583,28 @@ private[sql] object DataSourceV2Strategy {
           throw new IllegalStateException("Failed to rebuild Expression for filter: " + predicate))
     }
   }
+
+  /**
+   * Translates a runtime filter into a data source v2 Predicate.
+   *
+   * Runtime filters usually contain a subquery that must be evaluated before the translation.
+   * If the underlying subquery hasn't completed yet, this method will throw an exception.
+   */
+  protected[sql] def translateRuntimeFilterV2(expr: Expression): Option[Predicate] = expr match {
+    case in @ InSubqueryExec(PushableColumnAndNestedColumn(name), _, _, _, _, _) =>
+      val values = in.values().getOrElse {
+        throw new IllegalStateException(s"Can't translate $in to v2 Predicate, no subquery result")
+      }
+      val literals = values.map { value =>
+        val literal = Literal(value)
+        LiteralValue(literal.value, literal.dataType)

Review Comment:
   We don't need to infer the data type by creating a catalyst `Literal`. The type must be `in.child.dataType`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r926056489


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -56,16 +58,26 @@ case class BatchScanExec(
 
   @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
     val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e)
+      case DynamicPruningExpression(e) =>
+        scan match {
+          case _: SupportsRuntimeFiltering =>
+            DataSourceStrategy.translateRuntimeFilter(e)
+          case _: SupportsRuntimeV2Filtering =>

Review Comment:
   It doesn't seem to me that a data source would implement both `SupportsRuntimeV2Filtering` and `SupportsRuntimeFiltering`? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala:
##########
@@ -582,6 +583,28 @@ private[sql] object DataSourceV2Strategy {
           throw new IllegalStateException("Failed to rebuild Expression for filter: " + predicate))
     }
   }
+
+  /**
+   * Translates a runtime filter into a data source v2 Predicate.
+   *
+   * Runtime filters usually contain a subquery that must be evaluated before the translation.
+   * If the underlying subquery hasn't completed yet, this method will throw an exception.
+   */
+  protected[sql] def translateRuntimeFilterV2(expr: Expression): Option[Predicate] = expr match {
+    case in @ InSubqueryExec(PushableColumnAndNestedColumn(name), _, _, _, _, _) =>
+      val values = in.values().getOrElse {
+        throw new IllegalStateException(s"Can't translate $in to v2 Predicate, no subquery result")
+      }
+      val literals = values.map { value =>
+        val literal = Literal(value)
+        LiteralValue(literal.value, literal.dataType)

Review Comment:
   Fixed. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r930794492


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/util/PredicateUtils.scala:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.sources.{Filter, In}
+
+private[sql] object PredicateUtils {

Review Comment:
   Since it's data source related and private, how about we put it in package `org.apache.spark.sql.internal.connector`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #36918:
URL: https://github.com/apache/spark/pull/36918#issuecomment-1190884681

   The test failure is unrelated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #36918:
URL: https://github.com/apache/spark/pull/36918#issuecomment-1198130779

   The GA failure is unrelated. Merging to master, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering
URL: https://github.com/apache/spark/pull/36918


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r930796831


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/util/PredicateUtils.scala:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.connector.expressions.{LiteralValue, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.sources.{Filter, In}
+
+private[sql] object PredicateUtils {
+
+  def toV1(predicate: Predicate): Option[Filter] = {
+    predicate.name() match {
+      // Todo: add conversion for other V2 Predicate
+      case "IN" =>

Review Comment:
   nvm, it's ok to support IN first, as that's the only expression needed by runtime filtering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zinking commented on a diff in pull request #36918: [SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering

Posted by GitBox <gi...@apache.org>.
zinking commented on code in PR #36918:
URL: https://github.com/apache/spark/pull/36918#discussion_r902107022


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -55,16 +57,27 @@ case class BatchScanExec(
 
   @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
     val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e)
+      case DynamicPruningExpression(e) =>
+        if (scan.isInstanceOf[SupportsRuntimeFiltering]) {

Review Comment:
   what about  `match` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org