You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/07/04 16:56:32 UTC
[spark] branch master updated: [SPARK-39653][SQL] Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils`
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new df9d2f81954 [SPARK-39653][SQL] Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils`
df9d2f81954 is described below
commit df9d2f81954ff5c14a60c1dc3dad69fb5e6a8152
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Mon Jul 4 11:56:12 2022 -0500
[SPARK-39653][SQL] Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils`
### What changes were proposed in this pull request?
After SPARK-39638 and SPARK-39231, `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` method in `ColumnVectorUtils` only used by `ConstantColumnVectorBenchmark` and `ColumnVectorSuite`. So this pr do following changes:
- Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils`
- Added a simplified version `populate` method for `ConstantColumnVectorBenchmark`
- Clean up `SPARK-38018: ColumnVectorUtils.populate to handle CalendarIntervalType correctly` from `ColumnVectorSuite` due this scenario no longer exists, and the similar scenarios using `ConstantColumnVector` have been covered by `fill calendar interval` in `ColumnVectorUtils`.
### Why are the changes needed?
Clean up useless code
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass GitHub Actions
- Execute `ConstantColumnVectorBenchmark` manually with [Benchmark GitHub Action](https://github.com/LuciferYang/spark/runs/7147111541), the result file can be produced successfully. Since the result has no obvious change, so not update in the current pr
Closes #37045 from LuciferYang/SPARK-39653.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../execution/vectorized/ColumnVectorUtils.java | 62 ----------------------
.../benchmark/ConstantColumnVectorBenchmark.scala | 34 ++++++++----
.../execution/vectorized/ColumnVectorSuite.scala | 11 +---
3 files changed, 26 insertions(+), 81 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 7c885863ff0..a6960f733de 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -42,68 +42,6 @@ import org.apache.spark.unsafe.types.UTF8String;
* These utilities are mostly used to convert ColumnVectors into other formats.
*/
public class ColumnVectorUtils {
- /**
- * Populates the entire `col` with `row[fieldIdx]`
- */
- public static void populate(WritableColumnVector col, InternalRow row, int fieldIdx) {
- int capacity = col.capacity;
- DataType t = col.dataType();
-
- if (row.isNullAt(fieldIdx)) {
- col.putNulls(0, capacity);
- } else {
- if (t == DataTypes.BooleanType) {
- col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
- } else if (t == DataTypes.BinaryType) {
- col.putByteArray(0, row.getBinary(fieldIdx));
- } else if (t == DataTypes.ByteType) {
- col.putBytes(0, capacity, row.getByte(fieldIdx));
- } else if (t == DataTypes.ShortType) {
- col.putShorts(0, capacity, row.getShort(fieldIdx));
- } else if (t == DataTypes.IntegerType) {
- col.putInts(0, capacity, row.getInt(fieldIdx));
- } else if (t == DataTypes.LongType) {
- col.putLongs(0, capacity, row.getLong(fieldIdx));
- } else if (t == DataTypes.FloatType) {
- col.putFloats(0, capacity, row.getFloat(fieldIdx));
- } else if (t == DataTypes.DoubleType) {
- col.putDoubles(0, capacity, row.getDouble(fieldIdx));
- } else if (t == DataTypes.StringType) {
- UTF8String v = row.getUTF8String(fieldIdx);
- byte[] bytes = v.getBytes();
- for (int i = 0; i < capacity; i++) {
- col.putByteArray(i, bytes);
- }
- } else if (t instanceof DecimalType) {
- DecimalType dt = (DecimalType)t;
- Decimal d = row.getDecimal(fieldIdx, dt.precision(), dt.scale());
- if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- col.putInts(0, capacity, (int)d.toUnscaledLong());
- } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- col.putLongs(0, capacity, d.toUnscaledLong());
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- for (int i = 0; i < capacity; i++) {
- col.putByteArray(i, bytes, 0, bytes.length);
- }
- }
- } else if (t instanceof CalendarIntervalType) {
- CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t);
- col.getChild(0).putInts(0, capacity, c.months);
- col.getChild(1).putInts(0, capacity, c.days);
- col.getChild(2).putLongs(0, capacity, c.microseconds);
- } else if (t instanceof DateType || t instanceof YearMonthIntervalType) {
- col.putInts(0, capacity, row.getInt(fieldIdx));
- } else if (t instanceof TimestampType || t instanceof TimestampNTZType ||
- t instanceof DayTimeIntervalType) {
- col.putLongs(0, capacity, row.getLong(fieldIdx));
- } else {
- throw new RuntimeException(String.format("DataType %s is not supported" +
- " in column vectorized reader.", t.sql()));
- }
- }
- }
/**
* Populates the value of `row[fieldIdx]` into `ConstantColumnVector`.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala
index 9e4902f2fb5..8046a4b6cc5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala
@@ -22,7 +22,7 @@ import org.apache.commons.lang3.RandomStringUtils
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.benchmark.BenchmarkBase
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
+import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.spark.unsafe.UTF8StringBuilder
@@ -41,6 +41,22 @@ import org.apache.spark.unsafe.UTF8StringBuilder
*/
object ConstantColumnVectorBenchmark extends BenchmarkBase {
+ private def populate(
+ col: WritableColumnVector, batchSize: Int, row: InternalRow, fieldIdx: Int): Unit = {
+ col.dataType() match {
+ case IntegerType => col.putInts(0, batchSize, row.getInt(fieldIdx))
+ case LongType => col.putLongs(0, batchSize, row.getLong(fieldIdx))
+ case FloatType => col.putFloats(0, batchSize, row.getFloat(fieldIdx))
+ case DoubleType => col.putDoubles(0, batchSize, row.getDouble(fieldIdx))
+ case StringType =>
+ val v = row.getUTF8String(fieldIdx)
+ val bytes = v.getBytes
+ (0 until batchSize).foreach { i =>
+ col.putByteArray(i, bytes)
+ }
+ }
+ }
+
private def readValues(dataType: DataType, batchSize: Int, vector: ColumnVector): Unit = {
dataType match {
case IntegerType =>
@@ -86,14 +102,14 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
benchmark.addCase("OnHeapColumnVector") { _: Int =>
for (_ <- 0 until valuesPerIteration) {
onHeapColumnVector.reset()
- ColumnVectorUtils.populate(onHeapColumnVector, row, 0)
+ populate(onHeapColumnVector, batchSize, row, 0)
}
}
benchmark.addCase("OffHeapColumnVector") { _: Int =>
for (_ <- 0 until valuesPerIteration) {
offHeapColumnVector.reset()
- ColumnVectorUtils.populate(offHeapColumnVector, row, 0)
+ populate(offHeapColumnVector, batchSize, row, 0)
}
}
@@ -114,9 +130,9 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
val constantColumnVector = new ConstantColumnVector(batchSize, dataType)
onHeapColumnVector.reset()
- ColumnVectorUtils.populate(onHeapColumnVector, row, 0)
+ populate(onHeapColumnVector, batchSize, row, 0)
offHeapColumnVector.reset()
- ColumnVectorUtils.populate(offHeapColumnVector, row, 0)
+ populate(offHeapColumnVector, batchSize, row, 0)
ColumnVectorUtils.populate(constantColumnVector, row, 0)
val other = if (dataType == StringType) {
@@ -184,7 +200,7 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
benchmark.addCase("OnHeapColumnVector") { _: Int =>
onHeapColumnVector.reset()
- ColumnVectorUtils.populate(onHeapColumnVector, row, 0)
+ populate(onHeapColumnVector, batchSize, row, 0)
for (_ <- 0 until valuesPerIteration) {
readValues(dataType, batchSize, onHeapColumnVector)
}
@@ -192,7 +208,7 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
benchmark.addCase("OffHeapColumnVector") { _: Int =>
offHeapColumnVector.reset()
- ColumnVectorUtils.populate(offHeapColumnVector, row, 0)
+ populate(offHeapColumnVector, batchSize, row, 0)
for (_ <- 0 until valuesPerIteration) {
readValues(dataType, batchSize, offHeapColumnVector)
}
@@ -229,13 +245,13 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase {
}
benchmark.addCase("OnHeapColumnVector") { _: Int =>
- for (i <- 0 until valuesPerIteration) {
+ for (_ <- 0 until valuesPerIteration) {
(0 until batchSize).foreach(onHeapColumnVector.isNullAt)
}
}
benchmark.addCase("OffHeapColumnVector") { _: Int =>
- for (i <- 0 until valuesPerIteration) {
+ for (_ <- 0 until valuesPerIteration) {
(0 until batchSize).foreach(offHeapColumnVector.isNullAt)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 4cf2376a3fc..cdf41ed651d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.columnar.ColumnAccessor
import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarArray
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.unsafe.types.UTF8String
class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
private def withVector(
@@ -605,14 +605,5 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}
}
-
- test("SPARK-38018: ColumnVectorUtils.populate to handle CalendarIntervalType correctly") {
- val vector = new OnHeapColumnVector(5, CalendarIntervalType)
- val row = new SpecificInternalRow(Array(CalendarIntervalType))
- val interval = new CalendarInterval(3, 5, 1000000)
- row.setInterval(0, interval)
- ColumnVectorUtils.populate(vector, row, 0)
- assert(vector.getInterval(0) === interval)
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org