You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/20 17:06:38 UTC

[iceberg] branch master updated: Spark: Improve setup perf of dictionary benchmarks (#3329)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d524a1c  Spark: Improve setup perf of dictionary benchmarks (#3329)
d524a1c is described below

commit d524a1c924ad3ffe6619422140bbc08c241b1119
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Wed Oct 20 12:06:27 2021 -0500

    Spark: Improve setup perf of dictionary benchmarks (#3329)
    
    Previously the benchmarking code used a number of "when" constructs to
    generate data in rings. This became much more expensive somewhere between
    Spark 2.4.5 and 2.4.6 for reasons we are not certain of. In an attempt
    to improve performance we have removed all of these constructs and
    replaced them with identical modulo operations and add date operations
    when applicable.
---
 ...dDictionaryEncodedFlatParquetDataBenchmark.java | 141 +++++---------------
 ...dDictionaryEncodedFlatParquetDataBenchmark.java | 142 ++++++---------------
 2 files changed, 71 insertions(+), 212 deletions(-)

diff --git a/spark/v2.4/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java b/spark/v2.4/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index f0d8bda..f5b9cd0 100644
--- a/spark/v2.4/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++ b/spark/v2.4/spark2/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -28,9 +28,11 @@ import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DataTypes;
 import org.openjdk.jmh.annotations.Setup;
 
 import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.date_add;
 import static org.apache.spark.sql.functions.lit;
 import static org.apache.spark.sql.functions.pmod;
 import static org.apache.spark.sql.functions.to_date;
@@ -71,136 +73,63 @@ public class VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
 
   @Override
   void appendData() {
-    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
-      Dataset<Row> df = withLongColumnDictEncoded();
-      df = withIntColumnDictEncoded(df);
-      df = withFloatColumnDictEncoded(df);
-      df = withDoubleColumnDictEncoded(df);
-      df = withDecimalColumnDictEncoded(df);
-      df = withDateColumnDictEncoded(df);
-      df = withTimestampColumnDictEncoded(df);
-      df = withStringColumnDictEncoded(df);
-      appendAsFile(df);
-    }
+    Dataset<Row> df = idDF();
+    df = withLongColumnDictEncoded(df);
+    df = withIntColumnDictEncoded(df);
+    df = withFloatColumnDictEncoded(df);
+    df = withDoubleColumnDictEncoded(df);
+    df = withDecimalColumnDictEncoded(df);
+    df = withDateColumnDictEncoded(df);
+    df = withTimestampColumnDictEncoded(df);
+    df = withStringColumnDictEncoded(df);
+    df = df.drop("id");
+    df.write().format("iceberg")
+        .mode(SaveMode.Append)
+        .save(table().location());
   }
 
-  private Dataset<Row> withLongColumnDictEncoded() {
-    return spark().range(NUM_ROWS_PER_FILE)
-        .withColumn(
-            "longCol",
-            when(pmod(col("id"), lit(9)).equalTo(lit(0)), lit(0L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(1)), lit(1L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(2)), lit(2L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(3)), lit(3L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(4)), lit(4L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(5)), lit(5L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(6)), lit(6L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(7)), lit(7L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(8)), lit(8L)))
-        .drop("id");
+  private static final Column modColumn() {
+    return pmod(col("id"), lit(9));
+  }
+
+  private Dataset<Row> idDF() {
+    return spark().range(0, NUM_ROWS_PER_FILE * NUM_FILES, 1, NUM_FILES).toDF();
+  }
+
+  private static Dataset<Row> withLongColumnDictEncoded(Dataset<Row> df) {
+    return df.withColumn("longCol", modColumn().cast(DataTypes.LongType));
   }
 
   private static Dataset<Row> withIntColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "intCol",
-        when(modColumn(9, 0), lit(0))
-            .when(modColumn(9, 1), lit(1))
-            .when(modColumn(9, 2), lit(2))
-            .when(modColumn(9, 3), lit(3))
-            .when(modColumn(9, 4), lit(4))
-            .when(modColumn(9, 5), lit(5))
-            .when(modColumn(9, 6), lit(6))
-            .when(modColumn(9, 7), lit(7))
-            .when(modColumn(9, 8), lit(8)));
+    return df.withColumn("intCol", modColumn().cast(DataTypes.IntegerType));
   }
 
   private static Dataset<Row> withFloatColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "floatCol",
-        when(modColumn(9, 0), lit(0.0f))
-            .when(modColumn(9, 1), lit(1.0f))
-            .when(modColumn(9, 2), lit(2.0f))
-            .when(modColumn(9, 3), lit(3.0f))
-            .when(modColumn(9, 4), lit(4.0f))
-            .when(modColumn(9, 5), lit(5.0f))
-            .when(modColumn(9, 6), lit(6.0f))
-            .when(modColumn(9, 7), lit(7.0f))
-            .when(modColumn(9, 8), lit(8.0f)));
+    return df.withColumn("floatCol", modColumn().cast(DataTypes.FloatType));
+
   }
 
   private static Dataset<Row> withDoubleColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "doubleCol",
-        when(modColumn(9, 0), lit(0.0d))
-            .when(modColumn(9, 1), lit(1.0d))
-            .when(modColumn(9, 2), lit(2.0d))
-            .when(modColumn(9, 3), lit(3.0d))
-            .when(modColumn(9, 4), lit(4.0d))
-            .when(modColumn(9, 5), lit(5.0d))
-            .when(modColumn(9, 6), lit(6.0d))
-            .when(modColumn(9, 7), lit(7.0d))
-            .when(modColumn(9, 8), lit(8.0d)));
+    return df.withColumn("doubleCol", modColumn().cast(DataTypes.DoubleType));
   }
 
   private static Dataset<Row> withDecimalColumnDictEncoded(Dataset<Row> df) {
     Types.DecimalType type = Types.DecimalType.of(20, 5);
-    return df.withColumn(
-        "decimalCol",
-        when(modColumn(9, 0), bigDecimal(type, 0))
-            .when(modColumn(9, 1), bigDecimal(type, 1))
-            .when(modColumn(9, 2), bigDecimal(type, 2))
-            .when(modColumn(9, 3), bigDecimal(type, 3))
-            .when(modColumn(9, 4), bigDecimal(type, 4))
-            .when(modColumn(9, 5), bigDecimal(type, 5))
-            .when(modColumn(9, 6), bigDecimal(type, 6))
-            .when(modColumn(9, 7), bigDecimal(type, 7))
-            .when(modColumn(9, 8), bigDecimal(type, 8)));
+    return df.withColumn("decimalCol", lit(bigDecimal(type, 0)).plus(modColumn()));
   }
 
   private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "dateCol",
-        when(modColumn(9, 0), to_date(lit("04/12/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 1), to_date(lit("04/13/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 2), to_date(lit("04/14/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 3), to_date(lit("04/15/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 4), to_date(lit("04/16/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 5), to_date(lit("04/17/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 6), to_date(lit("04/18/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 7), to_date(lit("04/19/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 8), to_date(lit("04/20/2019"), "MM/dd/yyyy")));
+    Column dateAdd = modColumn().cast(DataTypes.ShortType);
+    return df.withColumn("dateCol", date_add(to_date(lit("04/12/2019"), "MM/dd/yyyy"), dateAdd));
   }
 
   private static Dataset<Row> withTimestampColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "timestampCol",
-        when(modColumn(9, 0), to_timestamp(lit("04/12/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 1), to_timestamp(lit("04/13/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 2), to_timestamp(lit("04/14/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 3), to_timestamp(lit("04/15/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 4), to_timestamp(lit("04/16/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 5), to_timestamp(lit("04/17/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 6), to_timestamp(lit("04/18/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 7), to_timestamp(lit("04/19/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 8), to_timestamp(lit("04/20/2019"), "MM/dd/yyyy")));
+    Column dateAdd = modColumn().cast(DataTypes.ShortType);
+    return df.withColumn("timestampCol", to_timestamp(date_add(to_date(lit("04/12/2019"), "MM/dd/yyyy"), dateAdd)));
   }
 
   private static Dataset<Row> withStringColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "stringCol",
-        when(pmod(col("longCol"), lit(9)).equalTo(lit(0)), lit("0"))
-            .when(modColumn(9, 1), lit("1"))
-            .when(modColumn(9, 2), lit("2"))
-            .when(modColumn(9, 3), lit("3"))
-            .when(modColumn(9, 4), lit("4"))
-            .when(modColumn(9, 5), lit("5"))
-            .when(modColumn(9, 6), lit("6"))
-            .when(modColumn(9, 7), lit("7"))
-            .when(modColumn(9, 8), lit("8")));
-  }
-
-  private static Column modColumn(int divisor, int remainder) {
-    return pmod(col("longCol"), lit(divisor)).equalTo(lit(remainder));
+    return df.withColumn("stringCol", modColumn().cast(DataTypes.StringType));
   }
 
   private static BigDecimal bigDecimal(Types.DecimalType type, int value) {
diff --git a/spark/v3.0/spark3/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java b/spark/v3.0/spark3/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index f0d8bda..a8bc68f 100644
--- a/spark/v3.0/spark3/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++ b/spark/v3.0/spark3/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -28,9 +28,12 @@ import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.DataTypes;
 import org.openjdk.jmh.annotations.Setup;
 
 import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.date_add;
 import static org.apache.spark.sql.functions.lit;
 import static org.apache.spark.sql.functions.pmod;
 import static org.apache.spark.sql.functions.to_date;
@@ -71,136 +74,63 @@ public class VectorizedReadDictionaryEncodedFlatParquetDataBenchmark extends Vec
 
   @Override
   void appendData() {
-    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
-      Dataset<Row> df = withLongColumnDictEncoded();
-      df = withIntColumnDictEncoded(df);
-      df = withFloatColumnDictEncoded(df);
-      df = withDoubleColumnDictEncoded(df);
-      df = withDecimalColumnDictEncoded(df);
-      df = withDateColumnDictEncoded(df);
-      df = withTimestampColumnDictEncoded(df);
-      df = withStringColumnDictEncoded(df);
-      appendAsFile(df);
-    }
+    Dataset<Row> df = idDF();
+    df = withLongColumnDictEncoded(df);
+    df = withIntColumnDictEncoded(df);
+    df = withFloatColumnDictEncoded(df);
+    df = withDoubleColumnDictEncoded(df);
+    df = withDecimalColumnDictEncoded(df);
+    df = withDateColumnDictEncoded(df);
+    df = withTimestampColumnDictEncoded(df);
+    df = withStringColumnDictEncoded(df);
+    df = df.drop("id");
+    df.write().format("iceberg")
+        .mode(SaveMode.Append)
+        .save(table().location());
   }
 
-  private Dataset<Row> withLongColumnDictEncoded() {
-    return spark().range(NUM_ROWS_PER_FILE)
-        .withColumn(
-            "longCol",
-            when(pmod(col("id"), lit(9)).equalTo(lit(0)), lit(0L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(1)), lit(1L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(2)), lit(2L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(3)), lit(3L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(4)), lit(4L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(5)), lit(5L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(6)), lit(6L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(7)), lit(7L))
-                .when(pmod(col("id"), lit(9)).equalTo(lit(8)), lit(8L)))
-        .drop("id");
+  private static final Column modColumn() {
+    return pmod(col("id"), lit(9));
+  }
+
+  private Dataset<Row> idDF() {
+    return spark().range(0, NUM_ROWS_PER_FILE * NUM_FILES, 1, NUM_FILES).toDF();
+  }
+
+  private static Dataset<Row> withLongColumnDictEncoded(Dataset<Row> df) {
+    return df.withColumn("longCol", modColumn().cast(DataTypes.LongType));
   }
 
   private static Dataset<Row> withIntColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "intCol",
-        when(modColumn(9, 0), lit(0))
-            .when(modColumn(9, 1), lit(1))
-            .when(modColumn(9, 2), lit(2))
-            .when(modColumn(9, 3), lit(3))
-            .when(modColumn(9, 4), lit(4))
-            .when(modColumn(9, 5), lit(5))
-            .when(modColumn(9, 6), lit(6))
-            .when(modColumn(9, 7), lit(7))
-            .when(modColumn(9, 8), lit(8)));
+    return df.withColumn("intCol", modColumn().cast(DataTypes.IntegerType));
   }
 
   private static Dataset<Row> withFloatColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "floatCol",
-        when(modColumn(9, 0), lit(0.0f))
-            .when(modColumn(9, 1), lit(1.0f))
-            .when(modColumn(9, 2), lit(2.0f))
-            .when(modColumn(9, 3), lit(3.0f))
-            .when(modColumn(9, 4), lit(4.0f))
-            .when(modColumn(9, 5), lit(5.0f))
-            .when(modColumn(9, 6), lit(6.0f))
-            .when(modColumn(9, 7), lit(7.0f))
-            .when(modColumn(9, 8), lit(8.0f)));
+    return df.withColumn("floatCol", modColumn().cast(DataTypes.FloatType));
+
   }
 
   private static Dataset<Row> withDoubleColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "doubleCol",
-        when(modColumn(9, 0), lit(0.0d))
-            .when(modColumn(9, 1), lit(1.0d))
-            .when(modColumn(9, 2), lit(2.0d))
-            .when(modColumn(9, 3), lit(3.0d))
-            .when(modColumn(9, 4), lit(4.0d))
-            .when(modColumn(9, 5), lit(5.0d))
-            .when(modColumn(9, 6), lit(6.0d))
-            .when(modColumn(9, 7), lit(7.0d))
-            .when(modColumn(9, 8), lit(8.0d)));
+    return df.withColumn("doubleCol", modColumn().cast(DataTypes.DoubleType));
   }
 
   private static Dataset<Row> withDecimalColumnDictEncoded(Dataset<Row> df) {
     Types.DecimalType type = Types.DecimalType.of(20, 5);
-    return df.withColumn(
-        "decimalCol",
-        when(modColumn(9, 0), bigDecimal(type, 0))
-            .when(modColumn(9, 1), bigDecimal(type, 1))
-            .when(modColumn(9, 2), bigDecimal(type, 2))
-            .when(modColumn(9, 3), bigDecimal(type, 3))
-            .when(modColumn(9, 4), bigDecimal(type, 4))
-            .when(modColumn(9, 5), bigDecimal(type, 5))
-            .when(modColumn(9, 6), bigDecimal(type, 6))
-            .when(modColumn(9, 7), bigDecimal(type, 7))
-            .when(modColumn(9, 8), bigDecimal(type, 8)));
+    return df.withColumn("decimalCol", lit(bigDecimal(type, 0)).plus(modColumn()));
   }
 
   private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "dateCol",
-        when(modColumn(9, 0), to_date(lit("04/12/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 1), to_date(lit("04/13/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 2), to_date(lit("04/14/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 3), to_date(lit("04/15/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 4), to_date(lit("04/16/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 5), to_date(lit("04/17/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 6), to_date(lit("04/18/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 7), to_date(lit("04/19/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 8), to_date(lit("04/20/2019"), "MM/dd/yyyy")));
+    Column dateAdd = modColumn().cast(DataTypes.ShortType);
+    return df.withColumn("dateCol", date_add(to_date(lit("04/12/2019"), "MM/dd/yyyy"), dateAdd));
   }
 
   private static Dataset<Row> withTimestampColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "timestampCol",
-        when(modColumn(9, 0), to_timestamp(lit("04/12/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 1), to_timestamp(lit("04/13/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 2), to_timestamp(lit("04/14/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 3), to_timestamp(lit("04/15/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 4), to_timestamp(lit("04/16/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 5), to_timestamp(lit("04/17/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 6), to_timestamp(lit("04/18/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 7), to_timestamp(lit("04/19/2019"), "MM/dd/yyyy"))
-            .when(modColumn(9, 8), to_timestamp(lit("04/20/2019"), "MM/dd/yyyy")));
+    Column dateAdd = modColumn().cast(DataTypes.ShortType);
+    return df.withColumn("timestampCol", to_timestamp(date_add(to_date(lit("04/12/2019"), "MM/dd/yyyy"), dateAdd)));
   }
 
   private static Dataset<Row> withStringColumnDictEncoded(Dataset<Row> df) {
-    return df.withColumn(
-        "stringCol",
-        when(pmod(col("longCol"), lit(9)).equalTo(lit(0)), lit("0"))
-            .when(modColumn(9, 1), lit("1"))
-            .when(modColumn(9, 2), lit("2"))
-            .when(modColumn(9, 3), lit("3"))
-            .when(modColumn(9, 4), lit("4"))
-            .when(modColumn(9, 5), lit("5"))
-            .when(modColumn(9, 6), lit("6"))
-            .when(modColumn(9, 7), lit("7"))
-            .when(modColumn(9, 8), lit("8")));
-  }
-
-  private static Column modColumn(int divisor, int remainder) {
-    return pmod(col("longCol"), lit(divisor)).equalTo(lit(remainder));
+    return df.withColumn("stringCol", modColumn().cast(DataTypes.StringType));
   }
 
   private static BigDecimal bigDecimal(Types.DecimalType type, int value) {