You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/20 17:06:38 UTC
[iceberg] branch master updated: Spark: Improve setup perf of
dictionary benchmarks (#3329)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d524a1c Spark: Improve setup perf of dictionary benchmarks (#3329)
d524a1c is described below
commit d524a1c924ad3ffe6619422140bbc08c241b1119
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Wed Oct 20 12:06:27 2021 -0500
Spark: Improve setup perf of dictionary benchmarks (#3329)
Previously the benchmarking code used a number of "when" constructs to
generate data in rings. This became much more expensive somewhere between
Spark 2.4.5 and 2.4.6 for reasons we are not certain of. In an attempt
to improve performance we have removed all of these constructs and
replaced them with identical modulo operations and add date operations
when applicable.
---
...dDictionaryEncodedFlatParquetDataBenchmark.java | 141 +++++---------------
...dDictionaryEncodedFlatParquetDataBenchmark.java | 142 ++++++---------------
2 files changed, 71 insertions(+), 212 deletions(-)
diff --git a/spark/v2.4/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java b/spark/v2.4/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index f0d8bda..f5b9cd0 100644
--- a/spark/v2.4/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++ b/spark/v2.4/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -28,9 +28,11 @@ import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DataTypes;
import org.openjdk.jmh.annotations.Setup;
import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.date_add;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.pmod;
import static org.apache.spark.sql.functions.to_date;
@@ -71,136 +73,63 @@ public class VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
@Override
void appendData() {
- for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
- Dataset<Row> df = withLongColumnDictEncoded();
- df = withIntColumnDictEncoded(df);
- df = withFloatColumnDictEncoded(df);
- df = withDoubleColumnDictEncoded(df);
- df = withDecimalColumnDictEncoded(df);
- df = withDateColumnDictEncoded(df);
- df = withTimestampColumnDictEncoded(df);
- df = withStringColumnDictEncoded(df);
- appendAsFile(df);
- }
+ Dataset<Row> df = idDF();
+ df = withLongColumnDictEncoded(df);
+ df = withIntColumnDictEncoded(df);
+ df = withFloatColumnDictEncoded(df);
+ df = withDoubleColumnDictEncoded(df);
+ df = withDecimalColumnDictEncoded(df);
+ df = withDateColumnDictEncoded(df);
+ df = withTimestampColumnDictEncoded(df);
+ df = withStringColumnDictEncoded(df);
+ df = df.drop("id");
+ df.write().format("iceberg")
+ .mode(SaveMode.Append)
+ .save(table().location());
}
- private Dataset<Row> withLongColumnDictEncoded() {
- return spark().range(NUM_ROWS_PER_FILE)
- .withColumn(
- "longCol",
- when(pmod(col("id"), lit(9)).equalTo(lit(0)), lit(0L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(1)), lit(1L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(2)), lit(2L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(3)), lit(3L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(4)), lit(4L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(5)), lit(5L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(6)), lit(6L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(7)), lit(7L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(8)), lit(8L)))
- .drop("id");
+ private static final Column modColumn() {
+ return pmod(col("id"), lit(9));
+ }
+
+ private Dataset<Row> idDF() {
+ return spark().range(0, NUM_ROWS_PER_FILE * NUM_FILES, 1, NUM_FILES).toDF();
+ }
+
+ private static Dataset<Row> withLongColumnDictEncoded(Dataset<Row> df) {
+ return df.withColumn("longCol", modColumn().cast(DataTypes.LongType));
}
private static Dataset<Row> withIntColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "intCol",
- when(modColumn(9, 0), lit(0))
- .when(modColumn(9, 1), lit(1))
- .when(modColumn(9, 2), lit(2))
- .when(modColumn(9, 3), lit(3))
- .when(modColumn(9, 4), lit(4))
- .when(modColumn(9, 5), lit(5))
- .when(modColumn(9, 6), lit(6))
- .when(modColumn(9, 7), lit(7))
- .when(modColumn(9, 8), lit(8)));
+ return df.withColumn("intCol", modColumn().cast(DataTypes.IntegerType));
}
private static Dataset<Row> withFloatColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "floatCol",
- when(modColumn(9, 0), lit(0.0f))
- .when(modColumn(9, 1), lit(1.0f))
- .when(modColumn(9, 2), lit(2.0f))
- .when(modColumn(9, 3), lit(3.0f))
- .when(modColumn(9, 4), lit(4.0f))
- .when(modColumn(9, 5), lit(5.0f))
- .when(modColumn(9, 6), lit(6.0f))
- .when(modColumn(9, 7), lit(7.0f))
- .when(modColumn(9, 8), lit(8.0f)));
+ return df.withColumn("floatCol", modColumn().cast(DataTypes.FloatType));
+
}
private static Dataset<Row> withDoubleColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "doubleCol",
- when(modColumn(9, 0), lit(0.0d))
- .when(modColumn(9, 1), lit(1.0d))
- .when(modColumn(9, 2), lit(2.0d))
- .when(modColumn(9, 3), lit(3.0d))
- .when(modColumn(9, 4), lit(4.0d))
- .when(modColumn(9, 5), lit(5.0d))
- .when(modColumn(9, 6), lit(6.0d))
- .when(modColumn(9, 7), lit(7.0d))
- .when(modColumn(9, 8), lit(8.0d)));
+ return df.withColumn("doubleCol", modColumn().cast(DataTypes.DoubleType));
}
private static Dataset<Row> withDecimalColumnDictEncoded(Dataset<Row> df) {
Types.DecimalType type = Types.DecimalType.of(20, 5);
- return df.withColumn(
- "decimalCol",
- when(modColumn(9, 0), bigDecimal(type, 0))
- .when(modColumn(9, 1), bigDecimal(type, 1))
- .when(modColumn(9, 2), bigDecimal(type, 2))
- .when(modColumn(9, 3), bigDecimal(type, 3))
- .when(modColumn(9, 4), bigDecimal(type, 4))
- .when(modColumn(9, 5), bigDecimal(type, 5))
- .when(modColumn(9, 6), bigDecimal(type, 6))
- .when(modColumn(9, 7), bigDecimal(type, 7))
- .when(modColumn(9, 8), bigDecimal(type, 8)));
+ return df.withColumn("decimalCol", lit(bigDecimal(type, 0)).plus(modColumn()));
}
private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "dateCol",
- when(modColumn(9, 0), to_date(lit("04/12/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 1), to_date(lit("04/13/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 2), to_date(lit("04/14/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 3), to_date(lit("04/15/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 4), to_date(lit("04/16/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 5), to_date(lit("04/17/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 6), to_date(lit("04/18/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 7), to_date(lit("04/19/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 8), to_date(lit("04/20/2019"), "MM/dd/yyyy")));
+ Column dateAdd = modColumn().cast(DataTypes.ShortType);
+ return df.withColumn("dateCol", date_add(to_date(lit("04/12/2019"), "MM/dd/yyyy"), dateAdd));
}
private static Dataset<Row> withTimestampColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "timestampCol",
- when(modColumn(9, 0), to_timestamp(lit("04/12/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 1), to_timestamp(lit("04/13/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 2), to_timestamp(lit("04/14/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 3), to_timestamp(lit("04/15/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 4), to_timestamp(lit("04/16/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 5), to_timestamp(lit("04/17/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 6), to_timestamp(lit("04/18/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 7), to_timestamp(lit("04/19/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 8), to_timestamp(lit("04/20/2019"), "MM/dd/yyyy")));
+ Column dateAdd = modColumn().cast(DataTypes.ShortType);
+ return df.withColumn("timestampCol", to_timestamp(date_add(to_date(lit("04/12/2019"), "MM/dd/yyyy"), dateAdd)));
}
private static Dataset<Row> withStringColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "stringCol",
- when(pmod(col("longCol"), lit(9)).equalTo(lit(0)), lit("0"))
- .when(modColumn(9, 1), lit("1"))
- .when(modColumn(9, 2), lit("2"))
- .when(modColumn(9, 3), lit("3"))
- .when(modColumn(9, 4), lit("4"))
- .when(modColumn(9, 5), lit("5"))
- .when(modColumn(9, 6), lit("6"))
- .when(modColumn(9, 7), lit("7"))
- .when(modColumn(9, 8), lit("8")));
- }
-
- private static Column modColumn(int divisor, int remainder) {
- return pmod(col("longCol"), lit(divisor)).equalTo(lit(remainder));
+ return df.withColumn("stringCol", modColumn().cast(DataTypes.StringType));
}
private static BigDecimal bigDecimal(Types.DecimalType type, int value) {
diff --git a/spark/v3.0/spark3/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java b/spark/v3.0/spark3/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index f0d8bda..a8bc68f 100644
--- a/spark/v3.0/spark3/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++ b/spark/v3.0/spark3/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -28,9 +28,12 @@ import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.DataTypes;
import org.openjdk.jmh.annotations.Setup;
import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.date_add;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.pmod;
import static org.apache.spark.sql.functions.to_date;
@@ -71,136 +74,63 @@ public class VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
@Override
void appendData() {
- for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
- Dataset<Row> df = withLongColumnDictEncoded();
- df = withIntColumnDictEncoded(df);
- df = withFloatColumnDictEncoded(df);
- df = withDoubleColumnDictEncoded(df);
- df = withDecimalColumnDictEncoded(df);
- df = withDateColumnDictEncoded(df);
- df = withTimestampColumnDictEncoded(df);
- df = withStringColumnDictEncoded(df);
- appendAsFile(df);
- }
+ Dataset<Row> df = idDF();
+ df = withLongColumnDictEncoded(df);
+ df = withIntColumnDictEncoded(df);
+ df = withFloatColumnDictEncoded(df);
+ df = withDoubleColumnDictEncoded(df);
+ df = withDecimalColumnDictEncoded(df);
+ df = withDateColumnDictEncoded(df);
+ df = withTimestampColumnDictEncoded(df);
+ df = withStringColumnDictEncoded(df);
+ df = df.drop("id");
+ df.write().format("iceberg")
+ .mode(SaveMode.Append)
+ .save(table().location());
}
- private Dataset<Row> withLongColumnDictEncoded() {
- return spark().range(NUM_ROWS_PER_FILE)
- .withColumn(
- "longCol",
- when(pmod(col("id"), lit(9)).equalTo(lit(0)), lit(0L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(1)), lit(1L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(2)), lit(2L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(3)), lit(3L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(4)), lit(4L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(5)), lit(5L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(6)), lit(6L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(7)), lit(7L))
- .when(pmod(col("id"), lit(9)).equalTo(lit(8)), lit(8L)))
- .drop("id");
+ private static final Column modColumn() {
+ return pmod(col("id"), lit(9));
+ }
+
+ private Dataset<Row> idDF() {
+ return spark().range(0, NUM_ROWS_PER_FILE * NUM_FILES, 1, NUM_FILES).toDF();
+ }
+
+ private static Dataset<Row> withLongColumnDictEncoded(Dataset<Row> df) {
+ return df.withColumn("longCol", modColumn().cast(DataTypes.LongType));
}
private static Dataset<Row> withIntColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "intCol",
- when(modColumn(9, 0), lit(0))
- .when(modColumn(9, 1), lit(1))
- .when(modColumn(9, 2), lit(2))
- .when(modColumn(9, 3), lit(3))
- .when(modColumn(9, 4), lit(4))
- .when(modColumn(9, 5), lit(5))
- .when(modColumn(9, 6), lit(6))
- .when(modColumn(9, 7), lit(7))
- .when(modColumn(9, 8), lit(8)));
+ return df.withColumn("intCol", modColumn().cast(DataTypes.IntegerType));
}
private static Dataset<Row> withFloatColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "floatCol",
- when(modColumn(9, 0), lit(0.0f))
- .when(modColumn(9, 1), lit(1.0f))
- .when(modColumn(9, 2), lit(2.0f))
- .when(modColumn(9, 3), lit(3.0f))
- .when(modColumn(9, 4), lit(4.0f))
- .when(modColumn(9, 5), lit(5.0f))
- .when(modColumn(9, 6), lit(6.0f))
- .when(modColumn(9, 7), lit(7.0f))
- .when(modColumn(9, 8), lit(8.0f)));
+ return df.withColumn("floatCol", modColumn().cast(DataTypes.FloatType));
+
}
private static Dataset<Row> withDoubleColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "doubleCol",
- when(modColumn(9, 0), lit(0.0d))
- .when(modColumn(9, 1), lit(1.0d))
- .when(modColumn(9, 2), lit(2.0d))
- .when(modColumn(9, 3), lit(3.0d))
- .when(modColumn(9, 4), lit(4.0d))
- .when(modColumn(9, 5), lit(5.0d))
- .when(modColumn(9, 6), lit(6.0d))
- .when(modColumn(9, 7), lit(7.0d))
- .when(modColumn(9, 8), lit(8.0d)));
+ return df.withColumn("doubleCol", modColumn().cast(DataTypes.DoubleType));
}
private static Dataset<Row> withDecimalColumnDictEncoded(Dataset<Row> df) {
Types.DecimalType type = Types.DecimalType.of(20, 5);
- return df.withColumn(
- "decimalCol",
- when(modColumn(9, 0), bigDecimal(type, 0))
- .when(modColumn(9, 1), bigDecimal(type, 1))
- .when(modColumn(9, 2), bigDecimal(type, 2))
- .when(modColumn(9, 3), bigDecimal(type, 3))
- .when(modColumn(9, 4), bigDecimal(type, 4))
- .when(modColumn(9, 5), bigDecimal(type, 5))
- .when(modColumn(9, 6), bigDecimal(type, 6))
- .when(modColumn(9, 7), bigDecimal(type, 7))
- .when(modColumn(9, 8), bigDecimal(type, 8)));
+ return df.withColumn("decimalCol", lit(bigDecimal(type, 0)).plus(modColumn()));
}
private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "dateCol",
- when(modColumn(9, 0), to_date(lit("04/12/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 1), to_date(lit("04/13/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 2), to_date(lit("04/14/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 3), to_date(lit("04/15/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 4), to_date(lit("04/16/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 5), to_date(lit("04/17/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 6), to_date(lit("04/18/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 7), to_date(lit("04/19/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 8), to_date(lit("04/20/2019"), "MM/dd/yyyy")));
+ Column dateAdd = modColumn().cast(DataTypes.ShortType);
+ return df.withColumn("dateCol", date_add(to_date(lit("04/12/2019"), "MM/dd/yyyy"), dateAdd));
}
private static Dataset<Row> withTimestampColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "timestampCol",
- when(modColumn(9, 0), to_timestamp(lit("04/12/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 1), to_timestamp(lit("04/13/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 2), to_timestamp(lit("04/14/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 3), to_timestamp(lit("04/15/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 4), to_timestamp(lit("04/16/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 5), to_timestamp(lit("04/17/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 6), to_timestamp(lit("04/18/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 7), to_timestamp(lit("04/19/2019"), "MM/dd/yyyy"))
- .when(modColumn(9, 8), to_timestamp(lit("04/20/2019"), "MM/dd/yyyy")));
+ Column dateAdd = modColumn().cast(DataTypes.ShortType);
+ return df.withColumn("timestampCol", to_timestamp(date_add(to_date(lit("04/12/2019"), "MM/dd/yyyy"), dateAdd)));
}
private static Dataset<Row> withStringColumnDictEncoded(Dataset<Row> df) {
- return df.withColumn(
- "stringCol",
- when(pmod(col("longCol"), lit(9)).equalTo(lit(0)), lit("0"))
- .when(modColumn(9, 1), lit("1"))
- .when(modColumn(9, 2), lit("2"))
- .when(modColumn(9, 3), lit("3"))
- .when(modColumn(9, 4), lit("4"))
- .when(modColumn(9, 5), lit("5"))
- .when(modColumn(9, 6), lit("6"))
- .when(modColumn(9, 7), lit("7"))
- .when(modColumn(9, 8), lit("8")));
- }
-
- private static Column modColumn(int divisor, int remainder) {
- return pmod(col("longCol"), lit(divisor)).equalTo(lit(remainder));
+ return df.withColumn("stringCol", modColumn().cast(DataTypes.StringType));
}
private static BigDecimal bigDecimal(Types.DecimalType type, int value) {