You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/07/04 16:56:32 UTC

[spark] branch master updated: [SPARK-39653][SQL] Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils`

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new df9d2f81954 [SPARK-39653][SQL] Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils`
df9d2f81954 is described below

commit df9d2f81954ff5c14a60c1dc3dad69fb5e6a8152
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Mon Jul 4 11:56:12 2022 -0500

    [SPARK-39653][SQL] Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils`
    
    ### What changes were proposed in this pull request?
    After SPARK-39638 and SPARK-39231, `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` method in `ColumnVectorUtils` only used by  `ConstantColumnVectorBenchmark` and `ColumnVectorSuite`.  So this pr do following changes:
    
    - Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils`
    - Added a simplified version `populate` method for `ConstantColumnVectorBenchmark`
    - Clean up `SPARK-38018: ColumnVectorUtils.populate to handle CalendarIntervalType correctly` from `ColumnVectorSuite` due this scenario no longer exists, and the similar scenarios using `ConstantColumnVector` have been covered by `fill calendar interval` in `ColumnVectorUtils`.
    
    ### Why are the changes needed?
    Clean up useless code
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass GitHub Actions
    - Execute `ConstantColumnVectorBenchmark` manually with [Benchmark GitHub Action](https://github.com/LuciferYang/spark/runs/7147111541), the result file can be produced successfully. Since the result has no obvious change, so not update in the current pr
    
    Closes #37045 from LuciferYang/SPARK-39653.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../execution/vectorized/ColumnVectorUtils.java    | 62 ----------------------
 .../benchmark/ConstantColumnVectorBenchmark.scala  | 34 ++++++++----
 .../execution/vectorized/ColumnVectorSuite.scala   | 11 +---
 3 files changed, 26 insertions(+), 81 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 7c885863ff0..a6960f733de 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -42,68 +42,6 @@ import org.apache.spark.unsafe.types.UTF8String;
  * These utilities are mostly used to convert ColumnVectors into other formats.
  */
 public class ColumnVectorUtils {
-  /**
-   * Populates the entire `col` with `row[fieldIdx]`
-   */
-  public static void populate(WritableColumnVector col, InternalRow row, int fieldIdx) {
-    int capacity = col.capacity;
-    DataType t = col.dataType();
-
-    if (row.isNullAt(fieldIdx)) {
-      col.putNulls(0, capacity);
-    } else {
-      if (t == DataTypes.BooleanType) {
-        col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
-      } else if (t == DataTypes.BinaryType) {
-        col.putByteArray(0, row.getBinary(fieldIdx));
-      } else if (t == DataTypes.ByteType) {
-        col.putBytes(0, capacity, row.getByte(fieldIdx));
-      } else if (t == DataTypes.ShortType) {
-        col.putShorts(0, capacity, row.getShort(fieldIdx));
-      } else if (t == DataTypes.IntegerType) {
-        col.putInts(0, capacity, row.getInt(fieldIdx));
-      } else if (t == DataTypes.LongType) {
-        col.putLongs(0, capacity, row.getLong(fieldIdx));
-      } else if (t == DataTypes.FloatType) {
-        col.putFloats(0, capacity, row.getFloat(fieldIdx));
-      } else if (t == DataTypes.DoubleType) {
-        col.putDoubles(0, capacity, row.getDouble(fieldIdx));
-      } else if (t == DataTypes.StringType) {
-        UTF8String v = row.getUTF8String(fieldIdx);
-        byte[] bytes = v.getBytes();
-        for (int i = 0; i < capacity; i++) {
-          col.putByteArray(i, bytes);
-        }
-      } else if (t instanceof DecimalType) {
-        DecimalType dt = (DecimalType)t;
-        Decimal d = row.getDecimal(fieldIdx, dt.precision(), dt.scale());
-        if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-          col.putInts(0, capacity, (int)d.toUnscaledLong());
-        } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-          col.putLongs(0, capacity, d.toUnscaledLong());
-        } else {
-          final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-          byte[] bytes = integer.toByteArray();
-          for (int i = 0; i < capacity; i++) {
-            col.putByteArray(i, bytes, 0, bytes.length);
-          }
-        }
-      } else if (t instanceof CalendarIntervalType) {
-        CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t);
-        col.getChild(0).putInts(0, capacity, c.months);
-        col.getChild(1).putInts(0, capacity, c.days);
-        col.getChild(2).putLongs(0, capacity, c.microseconds);
-      } else if (t instanceof DateType || t instanceof YearMonthIntervalType) {
-        col.putInts(0, capacity, row.getInt(fieldIdx));
-      } else if (t instanceof TimestampType || t instanceof TimestampNTZType ||
-        t instanceof DayTimeIntervalType) {
-        col.putLongs(0, capacity, row.getLong(fieldIdx));
-      } else {
-        throw new RuntimeException(String.format("DataType %s is not supported" +
-            " in column vectorized reader.", t.sql()));
-      }
-    }
-  }
 
   /**
    * Populates the value of `row[fieldIdx]` into `ConstantColumnVector`.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala
index 9e4902f2fb5..8046a4b6cc5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala
@@ -22,7 +22,7 @@ import org.apache.commons.lang3.RandomStringUtils
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.benchmark.BenchmarkBase
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnVector
 import org.apache.spark.unsafe.UTF8StringBuilder
@@ -41,6 +41,22 @@ import org.apache.spark.unsafe.UTF8StringBuilder
  */
 object ConstantColumnVectorBenchmark extends BenchmarkBase {
 
+  private def populate(
+      col: WritableColumnVector, batchSize: Int, row: InternalRow, fieldIdx: Int): Unit = {
+    col.dataType() match {
+      case IntegerType => col.putInts(0, batchSize, row.getInt(fieldIdx))
+      case LongType => col.putLongs(0, batchSize, row.getLong(fieldIdx))
+      case FloatType => col.putFloats(0, batchSize, row.getFloat(fieldIdx))
+      case DoubleType => col.putDoubles(0, batchSize, row.getDouble(fieldIdx))
+      case StringType =>
+        val v = row.getUTF8String(fieldIdx)
+        val bytes = v.getBytes
+        (0 until batchSize).foreach { i =>
+          col.putByteArray(i, bytes)
+        }
+    }
+  }
+
   private def readValues(dataType: DataType, batchSize: Int, vector: ColumnVector): Unit = {
     dataType match {
       case IntegerType =>
@@ -86,14 +102,14 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
     benchmark.addCase("OnHeapColumnVector") { _: Int =>
       for (_ <- 0 until valuesPerIteration) {
         onHeapColumnVector.reset()
-        ColumnVectorUtils.populate(onHeapColumnVector, row, 0)
+        populate(onHeapColumnVector, batchSize, row, 0)
       }
     }
 
     benchmark.addCase("OffHeapColumnVector") { _: Int =>
       for (_ <- 0 until valuesPerIteration) {
         offHeapColumnVector.reset()
-        ColumnVectorUtils.populate(offHeapColumnVector, row, 0)
+        populate(offHeapColumnVector, batchSize, row, 0)
       }
     }
 
@@ -114,9 +130,9 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
     val constantColumnVector = new ConstantColumnVector(batchSize, dataType)
 
     onHeapColumnVector.reset()
-    ColumnVectorUtils.populate(onHeapColumnVector, row, 0)
+    populate(onHeapColumnVector, batchSize, row, 0)
     offHeapColumnVector.reset()
-    ColumnVectorUtils.populate(offHeapColumnVector, row, 0)
+    populate(offHeapColumnVector, batchSize, row, 0)
     ColumnVectorUtils.populate(constantColumnVector, row, 0)
 
     val other = if (dataType == StringType) {
@@ -184,7 +200,7 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
 
     benchmark.addCase("OnHeapColumnVector") { _: Int =>
       onHeapColumnVector.reset()
-      ColumnVectorUtils.populate(onHeapColumnVector, row, 0)
+      populate(onHeapColumnVector, batchSize, row, 0)
       for (_ <- 0 until valuesPerIteration) {
         readValues(dataType, batchSize, onHeapColumnVector)
       }
@@ -192,7 +208,7 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
 
     benchmark.addCase("OffHeapColumnVector") { _: Int =>
       offHeapColumnVector.reset()
-      ColumnVectorUtils.populate(offHeapColumnVector, row, 0)
+      populate(offHeapColumnVector, batchSize, row, 0)
       for (_ <- 0 until valuesPerIteration) {
         readValues(dataType, batchSize, offHeapColumnVector)
       }
@@ -229,13 +245,13 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
     }
 
     benchmark.addCase("OnHeapColumnVector") { _: Int =>
-      for (i <- 0 until valuesPerIteration) {
+      for (_ <- 0 until valuesPerIteration) {
         (0 until batchSize).foreach(onHeapColumnVector.isNullAt)
       }
     }
 
     benchmark.addCase("OffHeapColumnVector") { _: Int =>
-      for (i <- 0 until valuesPerIteration) {
+      for (_ <- 0 until valuesPerIteration) {
         (0 until batchSize).foreach(offHeapColumnVector.isNullAt)
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 4cf2376a3fc..cdf41ed651d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.columnar.ColumnAccessor
 import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnarArray
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.unsafe.types.UTF8String
 
 class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
   private def withVector(
@@ -605,14 +605,5 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
       }
     }
   }
-
-  test("SPARK-38018: ColumnVectorUtils.populate to handle CalendarIntervalType correctly") {
-    val vector = new OnHeapColumnVector(5, CalendarIntervalType)
-    val row = new SpecificInternalRow(Array(CalendarIntervalType))
-    val interval = new CalendarInterval(3, 5, 1000000)
-    row.setInterval(0, interval)
-    ColumnVectorUtils.populate(vector, row, 0)
-    assert(vector.getInterval(0) === interval)
-  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org