You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jaceklaskowski (via GitHub)" <gi...@apache.org> on 2023/04/03 09:05:05 UTC

[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40638: [SPARK-42774][SQL]Expose VectorTypes API for DataSourceV2 Batch Scans

jaceklaskowski commented on code in PR #40638:
URL: https://github.com/apache/spark/pull/40638#discussion_r1155668281


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderFactory.java:
##########
@@ -65,4 +66,12 @@ default PartitionReader<ColumnarBatch> createColumnarReader(InputPartition parti
   default boolean supportColumnarReads(InputPartition partition) {
     return false;
   }
+
+  /**
+   * Returns exact java types of the columns that are output in columnar processing mode.

Review Comment:
   nit: Use `@return` instead?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala:
##########
@@ -62,6 +64,10 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
     redact(result)
   }
 
+  override def vectorTypes: Option[Seq[String]] = {
+    Option(readerFactory.getVectorTypes.get.asScala.toSeq)

Review Comment:
   Just curious if `getVectorTypes.get` can throw an exception since by default it's `Optional.empty`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala:
##########
@@ -79,6 +84,18 @@ case class OrcPartitionReaderFactory(
     }
   }
 
+  override def getVectorTypes: Optional[java.lang.Iterable[String]] = {
+
+    val vectorTypes: Iterable[String] = readDataSchema.fields.map(_.dataType match {

Review Comment:
   nit: Remove `Iterable[String]` (since `vectorTypes` is local).
   
   Can you also move `.getName` (from all the `classOf`s) in a separate `.map`? Less repetition and easier to understand the main idea of this code to make a conversion (with `getName` being just a mere "addition").



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala:
##########
@@ -108,6 +112,17 @@ case class ParquetPartitionReaderFactory(
     supportsColumnar
   }
 
+  override def getVectorTypes: Optional[java.lang.Iterable[String]] = {
+    val data: Iterable[String] = Iterable.fill(readDataSchema.fields.length)(

Review Comment:
   1. Remove the type declaration. Not needed.
   2. Why is it `data` (not `vectorTypes` as previously)?



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -905,6 +908,13 @@ object ColumnarReaderFactory extends PartitionReaderFactory {
 
   override def supportColumnarReads(partition: InputPartition): Boolean = true
 
+  override def getVectorTypes: Optional[java.lang.Iterable[String]] = {
+    val data: Iterable[String] = Iterable.fill(2)(
+        classOf[OnHeapColumnVector].getName
+    )

Review Comment:
   Oh, that's also very nice (re my previous comment to use `Collections.nCopies`).
   
   Can you rename `data`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala:
##########
@@ -108,6 +112,17 @@ case class ParquetPartitionReaderFactory(
     supportsColumnar
   }
 
+  override def getVectorTypes: Optional[java.lang.Iterable[String]] = {
+    val data: Iterable[String] = Iterable.fill(readDataSchema.fields.length)(
+      if (!enableOffHeapColumnVector) {
+        classOf[OnHeapColumnVector].getName
+      } else {
+        classOf[OffHeapColumnVector].getName

Review Comment:
   Can you move `.getName` to a separate `.map`?



##########
sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaColumnarDataSourceV2.java:
##########
@@ -69,6 +72,14 @@ public boolean supportColumnarReads(InputPartition partition) {
       return true;
     }
 
+    @Override
+    public Optional<Iterable<String>> getVectorTypes() {
+      List<String> vectorTypes = new ArrayList<>();
+      vectorTypes.add(OnHeapColumnVector.class.getName());
+      vectorTypes.add(OnHeapColumnVector.class.getName());

Review Comment:
   nit: [Collections.nCopies(int, Object)](https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#nCopies-int-T-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org