You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/04 08:18:00 UTC

(incubator-paimon) branch master updated: [spark] Richer spark scan description (#2444)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2378319cd [spark] Richer spark scan description (#2444)
2378319cd is described below

commit 2378319cde54fdb76ae7e2b4f341394ebb6046a6
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Mon Dec 4 16:17:55 2023 +0800

    [spark] Richer spark scan description (#2444)
---
 .../scala/org/apache/paimon/spark/PaimonScan.scala   |  4 ++--
 .../java/org/apache/paimon/spark/SparkTable.java     |  4 ----
 .../org/apache/paimon/spark/PaimonBaseScan.scala     |  6 ++----
 .../apache/paimon/spark/PaimonBaseScanBuilder.scala  | 20 +++++++++-----------
 .../scala/org/apache/paimon/spark/PaimonScan.scala   |  4 ++--
 .../org/apache/paimon/spark/PaimonScanBuilder.scala  | 10 +++++++---
 6 files changed, 22 insertions(+), 26 deletions(-)

diff --git a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index f4cc95b63..01aabe3ca 100644
--- a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -20,5 +20,5 @@ package org.apache.paimon.spark
 import org.apache.paimon.table.Table
 import org.apache.paimon.table.source.ReadBuilder
 
-case class PaimonScan(table: Table, readBuilder: ReadBuilder)
-  extends PaimonBaseScan(table, readBuilder)
+case class PaimonScan(table: Table, readBuilder: ReadBuilder, desc: String)
+  extends PaimonBaseScan(table, readBuilder, desc)
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 4a35f759b..f74754ff7 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -20,7 +20,6 @@ package org.apache.paimon.spark;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -37,8 +36,6 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-import javax.annotation.Nullable;
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -53,7 +50,6 @@ public class SparkTable
                 PaimonPartitionManagement {
 
     private final Table table;
-    @Nullable protected Predicate deletePredicate;
 
     public SparkTable(Table table) {
         this.table = table;
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 556d1bd8b..8eb732a99 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -29,15 +29,13 @@ import java.util.OptionalLong
 
 import scala.collection.JavaConverters._
 
-abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder)
+abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: String)
   extends Scan
   with SupportsReportStatistics {
 
   protected var splits: Array[Split] = _
 
-  override def description(): String = {
-    s"paimon(${readBuilder.tableName()})"
-  }
+  override def description(): String = desc
 
   override def readSchema(): StructType = {
     SparkTypeUtils.fromPaimonRowType(readBuilder.readType())
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 831b848bc..0a29f9d76 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -40,7 +40,7 @@ abstract class PaimonBaseScanBuilder(table: Table)
 
   protected var projectedIndexes: Option[Array[Int]] = None
 
-  protected def getReadBuilder(): ReadBuilder = {
+  protected def getReadBuilder: ReadBuilder = {
     val readBuilder = table.newReadBuilder()
     projectedIndexes.foreach(readBuilder.withProjection)
     predicates.foreach(readBuilder.withFilter)
@@ -48,8 +48,15 @@ abstract class PaimonBaseScanBuilder(table: Table)
     readBuilder
   }
 
+  protected def getDescription: String = {
+    val description = s"PaimonTable: [${table.name()}]"
+    description + pushed
+      .map(filters => s" PushedFilters: [${filters.mkString(", ")}]")
+      .getOrElse("")
+  }
+
   override def build(): Scan = {
-    new PaimonScan(table, getReadBuilder());
+    PaimonScan(table, getReadBuilder, getDescription)
   }
 
   /**
@@ -85,15 +92,6 @@ abstract class PaimonBaseScanBuilder(table: Table)
     postScan.toArray
   }
 
-  /**
-   * Returns the filters that are pushed to the data source via {@link # pushFilters ( Filter [ ]
-   * )}. <p> There are 3 kinds of filters: <ol> <li>pushable filters which don't need to be
-   * evaluated again after scanning.</li> <li>pushable filters which still need to be evaluated
-   * after scanning, e.g. parquet row group filter.</li> <li>non-pushable filters.</li> </ol> <p>
-   * Both case 1 and 2 should be considered as pushed filters and should be returned by this method.
-   * <p> It's possible that there is no filters in the query and {@link # pushFilters ( Filter [ ]
-   * )} is never called, empty array should be returned for this case.
-   */
   override def pushedFilters(): Array[Filter] = {
     pushed.getOrElse(Array.empty)
   }
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 9e76aecbc..a51d09a7d 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.sources.{Filter, In}
 
 import scala.collection.JavaConverters._
 
-case class PaimonScan(table: Table, readBuilder: ReadBuilder)
-  extends PaimonBaseScan(table, readBuilder)
+case class PaimonScan(table: Table, readBuilder: ReadBuilder, desc: String)
+  extends PaimonBaseScan(table, readBuilder, desc)
   with SupportsRuntimeFiltering {
 
   override def filterAttributes(): Array[NamedReference] = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 55f24d93a..6c9071ed7 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark
 import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
 import org.apache.paimon.table.source.ReadBuilder
 
-import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownLimit}
+import org.apache.spark.sql.connector.read.SupportsPushDownLimit
 
 class PaimonScanBuilder(table: Table)
   extends PaimonBaseScanBuilder(table)
@@ -28,12 +28,16 @@ class PaimonScanBuilder(table: Table)
 
   private var pushDownLimit: Option[Int] = None
 
-  override protected def getReadBuilder(): ReadBuilder = {
-    val readBuilder = super.getReadBuilder()
+  override protected def getReadBuilder: ReadBuilder = {
+    val readBuilder = super.getReadBuilder
     pushDownLimit.foreach(readBuilder.withLimit)
     readBuilder
   }
 
+  override protected def getDescription: String = {
+    super.getDescription + pushDownLimit.map(limit => s" Limit: [$limit]").getOrElse("")
+  }
+
   override def pushLimit(limit: Int): Boolean = {
     if (table.isInstanceOf[AppendOnlyFileStoreTable]) {
       pushDownLimit = Some(limit)