You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2023/04/07 06:37:18 UTC

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6760: Spark 3.3: use a deterministic where condition to make rewrite_data_files…

szehon-ho commented on code in PR #6760:
URL: https://github.com/apache/iceberg/pull/6760#discussion_r1160474216


##########
spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala:
##########
@@ -36,15 +35,30 @@ object SparkExpressionConverter {
     SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get)
   }
 
-  @throws[AnalysisException]
-  def collectResolvedSparkExpression(session: SparkSession, tableName: String, where: String): Expression = {
+  def collectDeterministicSparkExpression(session: SparkSession,
+                                          tableName: String, where: String): Boolean = {
+    // used only to check if a deterministic expression is true or false
+    val tableAttrs = session.table(tableName).queryExecution.analyzed.output
+    val firstColumnName = tableAttrs.head.name
+    val anotherWhere = s"$firstColumnName is not null and $where"
+    val unresolvedExpression = session.sessionState.sqlParser.parseExpression(anotherWhere)
+    val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs))
+    val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan
+    val option = optimizedLogicalPlan.collectFirst {
+      case filter: Filter => Some(filter.condition)
+    }.getOrElse(Option.empty)
+    if (option.isDefined) true else false
+  }
+
+  def collectResolvedSparkExpressionOption(session: SparkSession,
+                                           tableName: String, where: String): Option[Expression] = {
     val tableAttrs = session.table(tableName).queryExecution.analyzed.output
     val unresolvedExpression = session.sessionState.sqlParser.parseExpression(where)
     val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs))
     val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan
     optimizedLogicalPlan.collectFirst {
-      case filter: Filter => filter.condition
-    }.getOrElse(throw new AnalysisException("Failed to find filter expression"))
+      case filter: Filter => Some(filter.condition)

Review Comment:
   I think @aokolnychyi comment is the right way:
   
   We just need to modify the method :  collectResolvedIcebergExpression  , and add those extra scala pattern-matching that Anton showed.  
   
   Then, no need to modify the outside method.  The convert will automatically convert those Spark true/false to Iceberg true/false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org