You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/04 08:18:00 UTC
(incubator-paimon) branch master updated: [spark] Richer spark scan description (#2444)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2378319cd [spark] Richer spark scan description (#2444)
2378319cd is described below
commit 2378319cde54fdb76ae7e2b4f341394ebb6046a6
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Mon Dec 4 16:17:55 2023 +0800
[spark] Richer spark scan description (#2444)
---
.../scala/org/apache/paimon/spark/PaimonScan.scala | 4 ++--
.../java/org/apache/paimon/spark/SparkTable.java | 4 ----
.../org/apache/paimon/spark/PaimonBaseScan.scala | 6 ++----
.../apache/paimon/spark/PaimonBaseScanBuilder.scala | 20 +++++++++-----------
.../scala/org/apache/paimon/spark/PaimonScan.scala | 4 ++--
.../org/apache/paimon/spark/PaimonScanBuilder.scala | 10 +++++++---
6 files changed, 22 insertions(+), 26 deletions(-)
diff --git a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index f4cc95b63..01aabe3ca 100644
--- a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -20,5 +20,5 @@ package org.apache.paimon.spark
import org.apache.paimon.table.Table
import org.apache.paimon.table.source.ReadBuilder
-case class PaimonScan(table: Table, readBuilder: ReadBuilder)
- extends PaimonBaseScan(table, readBuilder)
+case class PaimonScan(table: Table, readBuilder: ReadBuilder, desc: String)
+ extends PaimonBaseScan(table, readBuilder, desc)
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 4a35f759b..f74754ff7 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -20,7 +20,6 @@ package org.apache.paimon.spark;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -37,8 +36,6 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-import javax.annotation.Nullable;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -53,7 +50,6 @@ public class SparkTable
PaimonPartitionManagement {
private final Table table;
- @Nullable protected Predicate deletePredicate;
public SparkTable(Table table) {
this.table = table;
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 556d1bd8b..8eb732a99 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -29,15 +29,13 @@ import java.util.OptionalLong
import scala.collection.JavaConverters._
-abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder)
+abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: String)
extends Scan
with SupportsReportStatistics {
protected var splits: Array[Split] = _
- override def description(): String = {
- s"paimon(${readBuilder.tableName()})"
- }
+ override def description(): String = desc
override def readSchema(): StructType = {
SparkTypeUtils.fromPaimonRowType(readBuilder.readType())
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 831b848bc..0a29f9d76 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -40,7 +40,7 @@ abstract class PaimonBaseScanBuilder(table: Table)
protected var projectedIndexes: Option[Array[Int]] = None
- protected def getReadBuilder(): ReadBuilder = {
+ protected def getReadBuilder: ReadBuilder = {
val readBuilder = table.newReadBuilder()
projectedIndexes.foreach(readBuilder.withProjection)
predicates.foreach(readBuilder.withFilter)
@@ -48,8 +48,15 @@ abstract class PaimonBaseScanBuilder(table: Table)
readBuilder
}
+ protected def getDescription: String = {
+ val description = s"PaimonTable: [${table.name()}]"
+ description + pushed
+ .map(filters => s" PushedFilters: [${filters.mkString(", ")}]")
+ .getOrElse("")
+ }
+
override def build(): Scan = {
- new PaimonScan(table, getReadBuilder());
+ PaimonScan(table, getReadBuilder, getDescription)
}
/**
@@ -85,15 +92,6 @@ abstract class PaimonBaseScanBuilder(table: Table)
postScan.toArray
}
- /**
- * Returns the filters that are pushed to the data source via {@link # pushFilters ( Filter [ ]
- * )}. <p> There are 3 kinds of filters: <ol> <li>pushable filters which don't need to be
- * evaluated again after scanning.</li> <li>pushable filters which still need to be evaluated
- * after scanning, e.g. parquet row group filter.</li> <li>non-pushable filters.</li> </ol> <p>
- * Both case 1 and 2 should be considered as pushed filters and should be returned by this method.
- * <p> It's possible that there is no filters in the query and {@link # pushFilters ( Filter [ ]
- * )} is never called, empty array should be returned for this case.
- */
override def pushedFilters(): Array[Filter] = {
pushed.getOrElse(Array.empty)
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 9e76aecbc..a51d09a7d 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.sources.{Filter, In}
import scala.collection.JavaConverters._
-case class PaimonScan(table: Table, readBuilder: ReadBuilder)
- extends PaimonBaseScan(table, readBuilder)
+case class PaimonScan(table: Table, readBuilder: ReadBuilder, desc: String)
+ extends PaimonBaseScan(table, readBuilder, desc)
with SupportsRuntimeFiltering {
override def filterAttributes(): Array[NamedReference] = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 55f24d93a..6c9071ed7 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark
import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
import org.apache.paimon.table.source.ReadBuilder
-import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownLimit}
+import org.apache.spark.sql.connector.read.SupportsPushDownLimit
class PaimonScanBuilder(table: Table)
extends PaimonBaseScanBuilder(table)
@@ -28,12 +28,16 @@ class PaimonScanBuilder(table: Table)
private var pushDownLimit: Option[Int] = None
- override protected def getReadBuilder(): ReadBuilder = {
- val readBuilder = super.getReadBuilder()
+ override protected def getReadBuilder: ReadBuilder = {
+ val readBuilder = super.getReadBuilder
pushDownLimit.foreach(readBuilder.withLimit)
readBuilder
}
+ override protected def getDescription: String = {
+ super.getDescription + pushDownLimit.map(limit => s" Limit: [$limit]").getOrElse("")
+ }
+
override def pushLimit(limit: Int): Boolean = {
if (table.isInstanceOf[AppendOnlyFileStoreTable]) {
pushDownLimit = Some(limit)