You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "oss-maker (via GitHub)" <gi...@apache.org> on 2023/07/04 16:54:47 UTC

[GitHub] [spark] oss-maker opened a new pull request, #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

oss-maker opened a new pull request, #41856:
URL: https://github.com/apache/spark/pull/41856

   ### What changes were proposed in this pull request?
   This PR adds support to run benchmark for TPCH. The schema used is defined here in the official TPCH site - https://www.tpc.org/TPC_Documents_Current_Versions/pdf/TPC-H_v3.0.1.pdf
   
   
   ### Why are the changes needed?
   This PR makes it easier to run the benchmark for TPCH especially in scenarios where we want to test aggregate performance of TPCH aggregate heavy queries like Query 1
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   This PR was tested locally and verified that the benchmark runs properly.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1623004625

   @LuciferYang I have done the required change, thanks for your help, could you please review once. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255729471


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,391 +17,20 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
-  // See scala.sys.process.Streamed
-  private final class BlockingStreamed[T](
-    val process: T => Unit,
-    val done: Int => Unit,
-    val stream: () => Stream[T])
-
-  // See scala.sys.process.Streamed
-  private object BlockingStreamed {
-    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
-    // which causes OOMs if the consumer cannot keep up with producer.
-    val maxQueueSize = 65536
-
-    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
-      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
-      def next(): Stream[T] = q.take match {
-        case Left(0) => Stream.empty
-        case Left(code) =>
-          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
-        case Right(s) => Stream.cons(s, next())
-      }
-
-      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
-    }
-  }
-
-  // See scala.sys.process.ProcessImpl.Spawn
-  private object Spawn {
-    def apply(f: => Unit): Thread = apply(f, daemon = false)
-    def apply(f: => Unit, daemon: Boolean): Thread = {
-      val thread = new Thread() { override def run() = { f } }
-      thread.setDaemon(daemon)
-      thread.start()
-      thread
-    }
-  }
-
-  def apply(command: Seq[String]): Stream[String] = {
-    val streamed = BlockingStreamed[String](true)
-    val process = command.run(BasicIO(false, streamed.process, None))
-    Spawn(streamed.done(process.exitValue()))
-    streamed.stream()
-  }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
-  private val dsdgen = s"$dsdgenDir/dsdgen"
-
-  def generate(
-      sparkContext: SparkContext,
-      tableName: String,
-      partitions: Int,
-      scaleFactor: Int): RDD[String] = {
-    val generatedData = {
-      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
-        val localToolsDir = if (new java.io.File(dsdgen).exists) {
-          dsdgenDir
-        } else if (new java.io.File(s"/$dsdgen").exists) {
-          s"/$dsdgenDir"
-        } else {
-          throw new IllegalStateException(
-            s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
-        }
-
-        // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
-        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
-        val commands = Seq(
-          "bash", "-c",
-          s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
-          s"-RNGSEED 100 $parallel")
-        BlockingLineStream(commands)
-      }
-    }
-
-    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
-    generatedData
-  }
-}
+import org.apache.spark.sql.types.StructType
 
 class TPCDSTables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int)
-  extends TPCDSSchema with Logging with Serializable {
+  extends TableGenerator with TPCDSSchema with Logging with Serializable {
 
-  private val dataGenerator = new Dsdgen(dsdgenDir)
-
-  private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+  override protected val dataGenerator: DataGenerator = new Dsdgen(dsdgenDir)
+  override protected val sparkSQLContext: SQLContext = sqlContext
+  override protected val tpcScaleFactor: Int = scaleFactor

Review Comment:
   Updated. Thanks.



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext
+  protected val tpcScaleFactor: Int

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257158528


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext

Review Comment:
   I created https://github.com/apache/spark/pull/41900 to replace `SQLContext` with `SparkSession` for `GenTPCDSData` and it works good.
   
   @surnaik Could you wait for https://github.com/apache/spark/pull/41900 merged and update this PR ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257160176


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext

Review Comment:
   Sounds good. Thanks, please update once #41900 is merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44301][SQL] Add Benchmark Suite for TPCH [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
URL: https://github.com/apache/spark/pull/41856


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255729113


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"

Review Comment:
   Done, didn't notice this, Initially I was also adding the tool name, but realised it was not needed anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257916349


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,20 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
-  // See scala.sys.process.Streamed
-  private final class BlockingStreamed[T](
-    val process: T => Unit,
-    val done: Int => Unit,
-    val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
 
-  // See scala.sys.process.Streamed
-  private object BlockingStreamed {
-    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
-    // which causes OOMs if the consumer cannot keep up with producer.
-    val maxQueueSize = 65536
+class TPCDSTables(sparkSession: SparkSession, config: GenTPCDataConfig)

Review Comment:
   ```suggestion
   class TPCDSTables(val spark: SparkSession, config: GenTPCDataConfig)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,20 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
-  // See scala.sys.process.Streamed
-  private final class BlockingStreamed[T](
-    val process: T => Unit,
-    val done: Int => Unit,
-    val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
 
-  // See scala.sys.process.Streamed
-  private object BlockingStreamed {
-    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
-    // which causes OOMs if the consumer cannot keep up with producer.
-    val maxQueueSize = 65536
+class TPCDSTables(sparkSession: SparkSession, config: GenTPCDataConfig)
+  extends TableGenerator with TPCDSSchema with Logging with Serializable {
 
-    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
-      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
-      def next(): Stream[T] = q.take match {
-        case Left(0) => Stream.empty
-        case Left(code) =>
-          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
-        case Right(s) => Stream.cons(s, next())
-      }
-
-      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
-    }
-  }
-
-  // See scala.sys.process.ProcessImpl.Spawn
-  private object Spawn {
-    def apply(f: => Unit): Thread = apply(f, daemon = false)
-    def apply(f: => Unit, daemon: Boolean): Thread = {
-      val thread = new Thread() { override def run() = { f } }
-      thread.setDaemon(daemon)
-      thread.start()
-      thread
-    }
-  }
-
-  def apply(command: Seq[String]): Stream[String] = {
-    val streamed = BlockingStreamed[String](true)
-    val process = command.run(BasicIO(false, streamed.process, None))
-    Spawn(streamed.done(process.exitValue()))
-    streamed.stream()
-  }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
-  private val dsdgen = s"$dsdgenDir/dsdgen"
-
-  def generate(
-      sparkContext: SparkContext,
-      tableName: String,
-      partitions: Int,
-      scaleFactor: Int): RDD[String] = {
-    val generatedData = {
-      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
-        val localToolsDir = if (new java.io.File(dsdgen).exists) {
-          dsdgenDir
-        } else if (new java.io.File(s"/$dsdgen").exists) {
-          s"/$dsdgenDir"
-        } else {
-          throw new IllegalStateException(
-            s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
-        }
-
-        // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
-        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
-        val commands = Seq(
-          "bash", "-c",
-          s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
-          s"-RNGSEED 100 $parallel")
-        BlockingLineStream(commands)
-      }
-    }
-
-    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
-    generatedData
-  }
-}
-
-class TPCDSTables(spark: SparkSession, dsdgenDir: String, scaleFactor: Int)
-  extends TPCDSSchema with Logging with Serializable {
-
-  private val dataGenerator = new Dsdgen(dsdgenDir)
-
-  private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+  override protected val dataGenerator: DataGenerator = new Dsdgen(config.dsdgenDir)
+  override protected val spark: SparkSession = sparkSession

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634390549

   Should a larger TPCH dataset be used?
   
   Correspondingly, when sf=1, the size of the tpcds dataset  cache is 252MB


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254290182


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -20,14 +20,11 @@ package org.apache.spark.sql.execution.benchmark
 import java.util.Locale
 
 
-class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
-  var dataLocation: String = sys.env.getOrElse("SPARK_TPCDS_DATA", null)
+class TPCQueryBenchmarkArguments(val args: Array[String], val dataLocationEnv: String) {
+  var dataLocation: String = sys.env.getOrElse(dataLocationEnv, null)
   var queryFilter: Set[String] = Set.empty
   var cboEnabled: Boolean = false
 
-  parseArgs(args.toList)

Review Comment:
   My bad, it's fixed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1258218998


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,18 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
-  // See scala.sys.process.Streamed
-  private final class BlockingStreamed[T](
-    val process: T => Unit,
-    val done: Int => Unit,
-    val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
 
-  // See scala.sys.process.Streamed
-  private object BlockingStreamed {
-    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
-    // which causes OOMs if the consumer cannot keep up with producer.
-    val maxQueueSize = 65536
+class TPCDSTables(val spark: SparkSession, val dsdgenDir: String, val scaleFactor: Int)

Review Comment:
   `dsdgenDir` is not extended from the parent trait, so remove the `val`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634382776

   Ya, that's strange, even I'm very confused about that, using the tpch-dbgen
   tool from the github.com/databricks/tpch-dbgen repository, the cache shows
   ~4KB.
   
   Thanks,
   Suraj Naik
   
   On Thu, Jul 13, 2023, 8:16 PM YangJie ***@***.***> wrote:
   
   > The data only 4KB when sf = 1?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/spark/pull/41856#issuecomment-1634377891>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AJW6OTJSNFFTSGIB4OHQTCDXQAC4RANCNFSM6AAAAAAZ564SOY>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1622082961

   @LuciferYang I have added the TPCH Benchmark to Github action, but I don't see any benchmark workflows being triggered (both TPCDS and TPCH), any idea why and how I can trigger them? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] surnaik commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263516785


##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
         # To keep the directory structure and file permissions, tar them
         # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
         echo "Preparing the benchmark results:"
-        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`

Review Comment:
   Interesting find, let me check the local data, but do you know where else would tpch dbgen be available from?
   I'm using from github.com/databricks/tpch-dbgen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254289533


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBenchmarkUtils.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBenchmarkUtils extends SqlBasedBenchmark with Logging {

Review Comment:
   Thanks, fixed the name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] surnaik commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255671079


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext

Review Comment:
   Thanks, didn't know this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634393056

   Ya, let me try with 5 and check once. Will update here once I have the result.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634447494

   I tried with SF 10, the actual data size is definitely large but the cache size actually shrunk to ~3.5KB. Let me locally try it once.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1628494711

   @beliefer Thanks for the change, I have updated my PR after the merge of #41900 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255731038


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext

Review Comment:
   The sqlContext is extracted out of the same SparkSession, I don't think this is required, even if we pass it, in the genData, we will have to again get hold of the sqlContext or catalog to operate on table. Let me know if this is absolutely required. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255608832


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext

Review Comment:
   ```suggestion
     protected val sqlContext: SQLContext
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext
+  protected val tpcScaleFactor: Int

Review Comment:
   ```suggestion
     protected val scaleFactor: Int
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,391 +17,20 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
-  // See scala.sys.process.Streamed
-  private final class BlockingStreamed[T](
-    val process: T => Unit,
-    val done: Int => Unit,
-    val stream: () => Stream[T])
-
-  // See scala.sys.process.Streamed
-  private object BlockingStreamed {
-    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
-    // which causes OOMs if the consumer cannot keep up with producer.
-    val maxQueueSize = 65536
-
-    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
-      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
-      def next(): Stream[T] = q.take match {
-        case Left(0) => Stream.empty
-        case Left(code) =>
-          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
-        case Right(s) => Stream.cons(s, next())
-      }
-
-      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
-    }
-  }
-
-  // See scala.sys.process.ProcessImpl.Spawn
-  private object Spawn {
-    def apply(f: => Unit): Thread = apply(f, daemon = false)
-    def apply(f: => Unit, daemon: Boolean): Thread = {
-      val thread = new Thread() { override def run() = { f } }
-      thread.setDaemon(daemon)
-      thread.start()
-      thread
-    }
-  }
-
-  def apply(command: Seq[String]): Stream[String] = {
-    val streamed = BlockingStreamed[String](true)
-    val process = command.run(BasicIO(false, streamed.process, None))
-    Spawn(streamed.done(process.exitValue()))
-    streamed.stream()
-  }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
-  private val dsdgen = s"$dsdgenDir/dsdgen"
-
-  def generate(
-      sparkContext: SparkContext,
-      tableName: String,
-      partitions: Int,
-      scaleFactor: Int): RDD[String] = {
-    val generatedData = {
-      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
-        val localToolsDir = if (new java.io.File(dsdgen).exists) {
-          dsdgenDir
-        } else if (new java.io.File(s"/$dsdgen").exists) {
-          s"/$dsdgenDir"
-        } else {
-          throw new IllegalStateException(
-            s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
-        }
-
-        // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
-        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
-        val commands = Seq(
-          "bash", "-c",
-          s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
-          s"-RNGSEED 100 $parallel")
-        BlockingLineStream(commands)
-      }
-    }
-
-    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
-    generatedData
-  }
-}
+import org.apache.spark.sql.types.StructType
 
 class TPCDSTables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int)
-  extends TPCDSSchema with Logging with Serializable {
+  extends TableGenerator with TPCDSSchema with Logging with Serializable {
 
-  private val dataGenerator = new Dsdgen(dsdgenDir)
-
-  private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+  override protected val dataGenerator: DataGenerator = new Dsdgen(dsdgenDir)
+  override protected val sparkSQLContext: SQLContext = sqlContext
+  override protected val tpcScaleFactor: Int = scaleFactor

Review Comment:
   ```suggestion
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,391 +17,20 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
-  // See scala.sys.process.Streamed
-  private final class BlockingStreamed[T](
-    val process: T => Unit,
-    val done: Int => Unit,
-    val stream: () => Stream[T])
-
-  // See scala.sys.process.Streamed
-  private object BlockingStreamed {
-    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
-    // which causes OOMs if the consumer cannot keep up with producer.
-    val maxQueueSize = 65536
-
-    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
-      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
-      def next(): Stream[T] = q.take match {
-        case Left(0) => Stream.empty
-        case Left(code) =>
-          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
-        case Right(s) => Stream.cons(s, next())
-      }
-
-      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
-    }
-  }
-
-  // See scala.sys.process.ProcessImpl.Spawn
-  private object Spawn {
-    def apply(f: => Unit): Thread = apply(f, daemon = false)
-    def apply(f: => Unit, daemon: Boolean): Thread = {
-      val thread = new Thread() { override def run() = { f } }
-      thread.setDaemon(daemon)
-      thread.start()
-      thread
-    }
-  }
-
-  def apply(command: Seq[String]): Stream[String] = {
-    val streamed = BlockingStreamed[String](true)
-    val process = command.run(BasicIO(false, streamed.process, None))
-    Spawn(streamed.done(process.exitValue()))
-    streamed.stream()
-  }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
-  private val dsdgen = s"$dsdgenDir/dsdgen"
-
-  def generate(
-      sparkContext: SparkContext,
-      tableName: String,
-      partitions: Int,
-      scaleFactor: Int): RDD[String] = {
-    val generatedData = {
-      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
-        val localToolsDir = if (new java.io.File(dsdgen).exists) {
-          dsdgenDir
-        } else if (new java.io.File(s"/$dsdgen").exists) {
-          s"/$dsdgenDir"
-        } else {
-          throw new IllegalStateException(
-            s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
-        }
-
-        // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
-        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
-        val commands = Seq(
-          "bash", "-c",
-          s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
-          s"-RNGSEED 100 $parallel")
-        BlockingLineStream(commands)
-      }
-    }
-
-    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
-    generatedData
-  }
-}
+import org.apache.spark.sql.types.StructType
 
 class TPCDSTables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int)
-  extends TPCDSSchema with Logging with Serializable {
+  extends TableGenerator with TPCDSSchema with Logging with Serializable {
 
-  private val dataGenerator = new Dsdgen(dsdgenDir)
-
-  private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+  override protected val dataGenerator: DataGenerator = new Dsdgen(dsdgenDir)
+  override protected val sparkSQLContext: SQLContext = sqlContext
+  override protected val tpcScaleFactor: Int = scaleFactor

Review Comment:
   Please declare `sqlContext: SQLContext` and `scaleFactor: Int` in `TableGenerator`.



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"

Review Comment:
   Shall we aovid the duplicated variable `dsdgenDir` and `toolsDir` ?



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext

Review Comment:
   BTW. SQLContext is an old API. It seems it's the time use `SparkSession` instead. cc @cloud-fan @MaxGekk @wangyum 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634300120

   https://github.com/LuciferYang/spark/actions/runs/5540894623/jobs/10117467990
   
   Test run successful, but I doubt the generated TPCH table data may not cache successfully, 
   ```
   Cache Size: ~0 MB (4166 B)
   ``` 
   <img width="1289" alt="image" src="https://github.com/apache/spark/assets/1475305/ccddfc69-fb1b-42b7-80fc-481980241d41">
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634394953

   Thanks @surnaik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1625212828

   @surnaik Thank you for you ping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1625025167

   @beliefer Could you also please review once. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1622097222

   Triggered a workflow - https://github.com/oss-maker/spark/actions/runs/5466653951
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634342717

   @LuciferYang The size of the data is incredibly small at SF=1. The cache is working fine. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254100099


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf

Review Comment:
   Yes, I have based this on that, note that there is no GenTPCHData, so, that is new



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254038964


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf

Review Comment:
   Based on [TPCH.scala](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala)?



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+class Dbgen(dbgenDir: String, params: Seq[String]) extends Serializable {
+  val dbgen = s"$dbgenDir/dbgen"
+  def generate(
+    sparkContext: SparkContext,
+    tableName: String,
+    partitions: Int,
+    scaleFactor: Int): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(dbgen).exists) {
+          dbgenDir
+        } else if (new java.io.File(s"/$dbgenDir").exists) {
+          s"/$dbgenDir"
+        } else {
+          throw new IllegalStateException
+            (s"Could not find dbgen at $dbgen or /$dbgenDir. Run install")
+        }
+        val parallel = if (partitions > 1) s"-C $partitions -S $i" else ""
+        val shortTableNames = Map(
+          "customer" -> "c",
+          "lineitem" -> "L",
+          "nation" -> "n",
+          "orders" -> "O",
+          "part" -> "P",
+          "region" -> "r",
+          "supplier" -> "s",
+          "partsupp" -> "S"
+        )
+        val paramsString = params.mkString(" ")
+        val commands = Seq(
+          "bash", "-c",
+          s"cd $localToolsDir && ./dbgen -q $paramsString " +
+            s"-T ${shortTableNames(tableName)} -s $scaleFactor $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+}
+
+class TPCHTables(
+    sqlContext: SQLContext,
+    dbgenDir: String,
+    scaleFactor: Int,
+    generatorParams: Seq[String] = Nil)
+    extends TPCHSchema with Logging with Serializable {
+
+  private val dataGenerator = new Dbgen(dbgenDir, generatorParams)
+
+  private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+    val partitionColumns = tablePartitionColumns.getOrElse(tableName, Nil)
+      .map(_.stripPrefix("`").stripSuffix("`"))
+    Table(tableName, partitionColumns, StructType.fromDDL(schemaString))
+  }.toSeq
+
+  private case class Table(name: String, partitionColumns: Seq[String], schema: StructType) {
+    def nonPartitioned: Table = {
+      Table(name, Nil, schema)
+    }
+
+    private def df(numPartition: Int) = {
+      val generatedData = dataGenerator.generate(
+        sqlContext.sparkContext, name, numPartition, scaleFactor)
+      val rows = generatedData.mapPartitions { iter =>
+        iter.map { l =>
+          val values = l.split("\\|", -1).dropRight(1).map { v =>
+            if (v.equals("")) {
+              // If the string value is an empty string, we turn it to a null
+              null
+            } else {
+              v
+            }
+          }
+          Row.fromSeq(values)
+        }
+      }
+
+      val stringData =
+        sqlContext.createDataFrame(
+          rows,
+          StructType(schema.fields.map(f => StructField(f.name, StringType))))
+
+      val convertedData = {
+        val columns = schema.fields.map { f =>
+          val c = f.dataType match {
+            // Needs right-padding for char types
+            case CharType(n) => rpad(Column(f.name), n, " ")
+            // Don't need a cast for varchar types
+            case _: VarcharType => col(f.name)
+            case _ => col(f.name).cast(f.dataType)
+          }
+          c.as(f.name)
+        }
+        stringData.select(columns: _*)
+      }
+
+      convertedData
+    }
+
+    def genData(
+        location: String,
+        format: String,
+        overwrite: Boolean,
+        clusterByPartitionColumns: Boolean,
+        filterOutNullPartitionValues: Boolean,
+        numPartitions: Int): Unit = {
+      val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore
+
+      val data = df(numPartitions)
+      val tempTableName = s"${name}_text"
+      data.createOrReplaceTempView(tempTableName)
+
+      val writer = if (partitionColumns.nonEmpty) {
+        if (clusterByPartitionColumns) {
+          val columnString = data.schema.fields.map { field =>
+            field.name
+          }.mkString(",")
+          val partitionColumnString = partitionColumns.mkString(",")
+          val predicates = if (filterOutNullPartitionValues) {
+            partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "")
+          } else {
+            ""
+          }
+
+          val query =
+            s"""
+               |SELECT
+               |  $columnString
+               |FROM
+               |  $tempTableName
+               |$predicates
+               |DISTRIBUTE BY
+               |  $partitionColumnString
+            """.stripMargin
+          val grouped = sqlContext.sql(query)
+          logInfo(s"Pre-clustering with partitioning columns with query $query.")
+          grouped.write
+        } else {
+          data.write
+        }
+      } else {
+        // treat non-partitioned tables as "one partition" that we want to coalesce
+        if (clusterByPartitionColumns) {
+          // in case data has more than maxRecordsPerFile, split into multiple writers to improve
+          // datagen speed files will be truncated to maxRecordsPerFile value, so the final
+          // result will be the same.
+          val numRows = data.count
+          val maxRecordPerFile = Try {
+            sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt
+          }.getOrElse(0)
+
+          if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
+            val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
+            logInfo(s"Coalescing into $numFiles files")
+            data.coalesce(numFiles).write
+          } else {
+            data.coalesce(1).write
+          }
+        } else {
+          data.write
+        }
+      }
+      writer.format(format).mode(mode)
+      if (partitionColumns.nonEmpty) {
+        writer.partitionBy(partitionColumns: _*)
+      }
+      logInfo(s"Generating table $name in database to $location with save mode $mode.")
+      writer.save(location)
+      sqlContext.dropTempTable(tempTableName)
+    }
+  }
+
+  def genData(
+      location: String,
+      format: String,
+      overwrite: Boolean,
+      partitionTables: Boolean,
+      clusterByPartitionColumns: Boolean,
+      filterOutNullPartitionValues: Boolean,
+      tableFilter: String = "",
+      numPartitions: Int = 100): Unit = {
+    var tablesToBeGenerated = if (partitionTables) {
+      tables
+    } else {
+      tables.map(_.nonPartitioned)
+    }
+
+    if (!tableFilter.isEmpty) {
+      tablesToBeGenerated = tablesToBeGenerated.filter(_.name == tableFilter)
+      if (tablesToBeGenerated.isEmpty) {
+        throw new RuntimeException("Bad table name filter: " + tableFilter)
+      }
+    }
+
+    tablesToBeGenerated.foreach { table =>
+      val tableLocation = s"$location/${table.name}"
+      table.genData(tableLocation, format, overwrite, clusterByPartitionColumns,
+        filterOutNullPartitionValues, numPartitions)
+    }
+  }
+}
+
+class GenTPCHDataConfig(args: Array[String]) {
+  var master: String = "local[*]"
+  var dbgenDir: String = null
+  var location: String = null
+  var scaleFactor: Int = 1
+  var format: String = "parquet"
+  var overwrite: Boolean = false
+  var partitionTables: Boolean = false
+  var clusterByPartitionColumns: Boolean = false
+  var filterOutNullPartitionValues: Boolean = false
+  var tableFilter: String = ""
+  var numPartitions: Int = 100
+
+  parseArgs(args.toList)
+
+  private def parseArgs(inputArgs: List[String]): Unit = {
+    var args = inputArgs
+
+    while (args.nonEmpty) {
+      args match {
+        case "--master" :: value :: tail =>
+          master = value
+          args = tail
+
+        case "--dbgenDir" :: value :: tail =>
+          dbgenDir = value
+          args = tail
+
+        case "--location" :: value :: tail =>
+          location = value
+          args = tail
+
+        case "--scaleFactor" :: value :: tail =>
+          scaleFactor = toPositiveIntValue("Scale factor", value)
+          args = tail
+
+        case "--format" :: value :: tail =>
+          format = value
+          args = tail
+
+        case "--overwrite" :: tail =>
+          overwrite = true
+          args = tail
+
+        case "--partitionTables" :: tail =>
+          partitionTables = true
+          args = tail
+
+        case "--clusterByPartitionColumns" :: tail =>
+          clusterByPartitionColumns = true
+          args = tail
+
+        case "--filterOutNullPartitionValues" :: tail =>
+          filterOutNullPartitionValues = true
+          args = tail
+
+        case "--tableFilter" :: value :: tail =>
+          tableFilter = value
+          args = tail
+
+        case "--numPartitions" :: value :: tail =>
+          numPartitions = toPositiveIntValue("Number of partitions", value)
+          args = tail
+
+        case "--help" :: tail =>
+          printUsageAndExit(0)
+
+        case _ =>
+          // scalastyle:off println
+          System.err.println("Unknown/unsupported param " + args)
+          // scalastyle:on println
+          printUsageAndExit(1)
+      }
+    }
+
+    checkRequiredArguments()
+  }
+
+  private def printUsageAndExit(exitCode: Int): Unit = {
+    // scalastyle:off
+    System.err.println("""
+                         |build/sbt "test:runMain <this class> [Options]"

Review Comment:
   line 315 ~ 328
   indentation: 2 spaces
   
   And this file seems have significant duplication with https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala. 
   
   Is it possible to further refactor to reduce duplicate code?
   
   



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -20,14 +20,11 @@ package org.apache.spark.sql.execution.benchmark
 import java.util.Locale
 
 
-class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
-  var dataLocation: String = sys.env.getOrElse("SPARK_TPCDS_DATA", null)
+class TPCQueryBenchmarkArguments(val args: Array[String], val dataLocationEnv: String) {
+  var dataLocation: String = sys.env.getOrElse(dataLocationEnv, null)
   var queryFilter: Set[String] = Set.empty
   var cboEnabled: Boolean = false
 
-  parseArgs(args.toList)

Review Comment:
   why remove the invoke of `parseArgs` and `validateArguments`, these 2 functions will be unused after this pr



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+class Dbgen(dbgenDir: String, params: Seq[String]) extends Serializable {
+  val dbgen = s"$dbgenDir/dbgen"
+  def generate(
+    sparkContext: SparkContext,

Review Comment:
   line 33 ~ 36
   indentation: 4 spaces



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBenchmarkUtils.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBenchmarkUtils extends SqlBasedBenchmark with Logging {

Review Comment:
   suggested a different name, maybe TPCBasedBenchmark?  may be other more suitable ones



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254289875


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+class Dbgen(dbgenDir: String, params: Seq[String]) extends Serializable {
+  val dbgen = s"$dbgenDir/dbgen"
+  def generate(
+    sparkContext: SparkContext,

Review Comment:
   Fixed it. Thanks



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+class Dbgen(dbgenDir: String, params: Seq[String]) extends Serializable {
+  val dbgen = s"$dbgenDir/dbgen"
+  def generate(
+    sparkContext: SparkContext,
+    tableName: String,
+    partitions: Int,
+    scaleFactor: Int): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(dbgen).exists) {
+          dbgenDir
+        } else if (new java.io.File(s"/$dbgenDir").exists) {
+          s"/$dbgenDir"
+        } else {
+          throw new IllegalStateException
+            (s"Could not find dbgen at $dbgen or /$dbgenDir. Run install")
+        }
+        val parallel = if (partitions > 1) s"-C $partitions -S $i" else ""
+        val shortTableNames = Map(
+          "customer" -> "c",
+          "lineitem" -> "L",
+          "nation" -> "n",
+          "orders" -> "O",
+          "part" -> "P",
+          "region" -> "r",
+          "supplier" -> "s",
+          "partsupp" -> "S"
+        )
+        val paramsString = params.mkString(" ")
+        val commands = Seq(
+          "bash", "-c",
+          s"cd $localToolsDir && ./dbgen -q $paramsString " +
+            s"-T ${shortTableNames(tableName)} -s $scaleFactor $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+}
+
+class TPCHTables(
+    sqlContext: SQLContext,
+    dbgenDir: String,
+    scaleFactor: Int,
+    generatorParams: Seq[String] = Nil)
+    extends TPCHSchema with Logging with Serializable {
+
+  private val dataGenerator = new Dbgen(dbgenDir, generatorParams)
+
+  private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+    val partitionColumns = tablePartitionColumns.getOrElse(tableName, Nil)
+      .map(_.stripPrefix("`").stripSuffix("`"))
+    Table(tableName, partitionColumns, StructType.fromDDL(schemaString))
+  }.toSeq
+
+  private case class Table(name: String, partitionColumns: Seq[String], schema: StructType) {
+    def nonPartitioned: Table = {
+      Table(name, Nil, schema)
+    }
+
+    private def df(numPartition: Int) = {
+      val generatedData = dataGenerator.generate(
+        sqlContext.sparkContext, name, numPartition, scaleFactor)
+      val rows = generatedData.mapPartitions { iter =>
+        iter.map { l =>
+          val values = l.split("\\|", -1).dropRight(1).map { v =>
+            if (v.equals("")) {
+              // If the string value is an empty string, we turn it to a null
+              null
+            } else {
+              v
+            }
+          }
+          Row.fromSeq(values)
+        }
+      }
+
+      val stringData =
+        sqlContext.createDataFrame(
+          rows,
+          StructType(schema.fields.map(f => StructField(f.name, StringType))))
+
+      val convertedData = {
+        val columns = schema.fields.map { f =>
+          val c = f.dataType match {
+            // Needs right-padding for char types
+            case CharType(n) => rpad(Column(f.name), n, " ")
+            // Don't need a cast for varchar types
+            case _: VarcharType => col(f.name)
+            case _ => col(f.name).cast(f.dataType)
+          }
+          c.as(f.name)
+        }
+        stringData.select(columns: _*)
+      }
+
+      convertedData
+    }
+
+    def genData(
+        location: String,
+        format: String,
+        overwrite: Boolean,
+        clusterByPartitionColumns: Boolean,
+        filterOutNullPartitionValues: Boolean,
+        numPartitions: Int): Unit = {
+      val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore
+
+      val data = df(numPartitions)
+      val tempTableName = s"${name}_text"
+      data.createOrReplaceTempView(tempTableName)
+
+      val writer = if (partitionColumns.nonEmpty) {
+        if (clusterByPartitionColumns) {
+          val columnString = data.schema.fields.map { field =>
+            field.name
+          }.mkString(",")
+          val partitionColumnString = partitionColumns.mkString(",")
+          val predicates = if (filterOutNullPartitionValues) {
+            partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "")
+          } else {
+            ""
+          }
+
+          val query =
+            s"""
+               |SELECT
+               |  $columnString
+               |FROM
+               |  $tempTableName
+               |$predicates
+               |DISTRIBUTE BY
+               |  $partitionColumnString
+            """.stripMargin
+          val grouped = sqlContext.sql(query)
+          logInfo(s"Pre-clustering with partitioning columns with query $query.")
+          grouped.write
+        } else {
+          data.write
+        }
+      } else {
+        // treat non-partitioned tables as "one partition" that we want to coalesce
+        if (clusterByPartitionColumns) {
+          // in case data has more than maxRecordsPerFile, split into multiple writers to improve
+          // datagen speed files will be truncated to maxRecordsPerFile value, so the final
+          // result will be the same.
+          val numRows = data.count
+          val maxRecordPerFile = Try {
+            sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt
+          }.getOrElse(0)
+
+          if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
+            val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
+            logInfo(s"Coalescing into $numFiles files")
+            data.coalesce(numFiles).write
+          } else {
+            data.coalesce(1).write
+          }
+        } else {
+          data.write
+        }
+      }
+      writer.format(format).mode(mode)
+      if (partitionColumns.nonEmpty) {
+        writer.partitionBy(partitionColumns: _*)
+      }
+      logInfo(s"Generating table $name in database to $location with save mode $mode.")
+      writer.save(location)
+      sqlContext.dropTempTable(tempTableName)
+    }
+  }
+
+  def genData(
+      location: String,
+      format: String,
+      overwrite: Boolean,
+      partitionTables: Boolean,
+      clusterByPartitionColumns: Boolean,
+      filterOutNullPartitionValues: Boolean,
+      tableFilter: String = "",
+      numPartitions: Int = 100): Unit = {
+    var tablesToBeGenerated = if (partitionTables) {
+      tables
+    } else {
+      tables.map(_.nonPartitioned)
+    }
+
+    if (!tableFilter.isEmpty) {
+      tablesToBeGenerated = tablesToBeGenerated.filter(_.name == tableFilter)
+      if (tablesToBeGenerated.isEmpty) {
+        throw new RuntimeException("Bad table name filter: " + tableFilter)
+      }
+    }
+
+    tablesToBeGenerated.foreach { table =>
+      val tableLocation = s"$location/${table.name}"
+      table.genData(tableLocation, format, overwrite, clusterByPartitionColumns,
+        filterOutNullPartitionValues, numPartitions)
+    }
+  }
+}
+
+class GenTPCHDataConfig(args: Array[String]) {
+  var master: String = "local[*]"
+  var dbgenDir: String = null
+  var location: String = null
+  var scaleFactor: Int = 1
+  var format: String = "parquet"
+  var overwrite: Boolean = false
+  var partitionTables: Boolean = false
+  var clusterByPartitionColumns: Boolean = false
+  var filterOutNullPartitionValues: Boolean = false
+  var tableFilter: String = ""
+  var numPartitions: Int = 100
+
+  parseArgs(args.toList)
+
+  private def parseArgs(inputArgs: List[String]): Unit = {
+    var args = inputArgs
+
+    while (args.nonEmpty) {
+      args match {
+        case "--master" :: value :: tail =>
+          master = value
+          args = tail
+
+        case "--dbgenDir" :: value :: tail =>
+          dbgenDir = value
+          args = tail
+
+        case "--location" :: value :: tail =>
+          location = value
+          args = tail
+
+        case "--scaleFactor" :: value :: tail =>
+          scaleFactor = toPositiveIntValue("Scale factor", value)
+          args = tail
+
+        case "--format" :: value :: tail =>
+          format = value
+          args = tail
+
+        case "--overwrite" :: tail =>
+          overwrite = true
+          args = tail
+
+        case "--partitionTables" :: tail =>
+          partitionTables = true
+          args = tail
+
+        case "--clusterByPartitionColumns" :: tail =>
+          clusterByPartitionColumns = true
+          args = tail
+
+        case "--filterOutNullPartitionValues" :: tail =>
+          filterOutNullPartitionValues = true
+          args = tail
+
+        case "--tableFilter" :: value :: tail =>
+          tableFilter = value
+          args = tail
+
+        case "--numPartitions" :: value :: tail =>
+          numPartitions = toPositiveIntValue("Number of partitions", value)
+          args = tail
+
+        case "--help" :: tail =>
+          printUsageAndExit(0)
+
+        case _ =>
+          // scalastyle:off println
+          System.err.println("Unknown/unsupported param " + args)
+          // scalastyle:on println
+          printUsageAndExit(1)
+      }
+    }
+
+    checkRequiredArguments()
+  }
+
+  private def printUsageAndExit(exitCode: Int): Unit = {
+    // scalastyle:off
+    System.err.println("""
+                         |build/sbt "test:runMain <this class> [Options]"

Review Comment:
   Done. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263524205


##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
         # To keep the directory structure and file permissions, tar them
         # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
         echo "Preparing the benchmark results:"
-        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`

Review Comment:
   Sorry, I don’t know ...



##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
         # To keep the directory structure and file permissions, tar them
         # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
         echo "Preparing the benchmark results:"
-        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`

Review Comment:
   Sorry, I don’t know ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263294643


##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
         # To keep the directory structure and file permissions, tar them
         # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
         echo "Preparing the benchmark results:"
-        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`

Review Comment:
   @surnaik  I remove `--exclude=tpch-sf-1` and download the generated tpch data, then I found that the data file only contains parquet footer and no data
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1645013166

   @surnaik Apache Kyuubi has a [Spark TPC-H Connector](https://github.com/apache/kyuubi/tree/master/extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch) based on DataSource V2 API, which is inspired by [TPCH connector](https://trino.io/docs/current/connector/tpch.html#connector-tpch--page-root)
   
   ```
   spark-sql \
     --packages org.apache.kyuubi:kyuubi-spark-connector-tpch_2.12:1.7.1 \
     --conf spark.sql.catalog.tpch=org.apache.kyuubi.spark.connector.tpch.TPCHCatalog
   ```
   
   ```
   spark-sql> show tables in tpch.sf1;
   customer
   orders
   lineitem
   part
   partsupp
   supplier
   nation
   region
   Time taken: 0.1 seconds, Fetched 8 row(s)
   
   spark-sql> select * from tpch.sf1.orders limit 10;
   1	36901	O	173665.47	1996-01-02	5-LOW	Clerk#000000951	0	nstructions sleep furiously among
   2	78002	O	46929.18	1996-12-01	1-URGENT	Clerk#000000880	0	 foxes. pending accounts at the pending, silent asymptot
   3	123314	F	193846.25	1993-10-14	5-LOW	Clerk#000000955	0	sly final accounts boost. carefully regular ideas cajole carefully. depos
   4	136777	O	32151.78	1995-10-11	5-LOW	Clerk#000000124	0	sits. slyly regular warthogs cajole. regular, regular theodolites acro
   5	44485	F	144659.2	1994-07-30	5-LOW	Clerk#000000925	0	quickly. bold deposits sleep slyly. packages use slyly
   6	55624	F	58749.59	1992-02-21	4-NOT SPECIFIED	Clerk#000000058	0	ggle. special, final requests are against the furiously specia
   7	39136	O	252004.18	1996-01-10	2-HIGH	Clerk#000000470	0	ly special requests
   32	130057	O	208660.75	1995-07-16	2-HIGH	Clerk#000000616	0	ise blithely bold, regular requests. quickly unusual dep
   33	66958	F	163243.98	1993-10-27	3-MEDIUM	Clerk#000000409	0	uriously. furiously final request
   34	61001	O	58949.67	1998-07-21	3-MEDIUM	Clerk#000000223	0	ly final packages. fluffily final deposits wake blithely ideas. spe
   Time taken: 2.608 seconds, Fetched 10 row(s)
   ```
   
   It could also be used to generate the data by using CTAS.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srinivasst commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "srinivasst (via GitHub)" <gi...@apache.org>.
srinivasst commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1622126254

   Lgtm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1628680383

   @beliefer Thanks for your suggestion, I wasn't even aware of this Scala feature. That's why I kept using a different name in the class parameter. I appreciate your help, this was very useful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263294643


##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
         # To keep the directory structure and file permissions, tar them
         # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
         echo "Preparing the benchmark results:"
-        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`

Review Comment:
   @surnaik  I remove `--exclude=tpch-sf-1` and download the generated tpch data from github action, then I found that the data file only contains parquet footer and no data
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255622200


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected val toolsDir: String
+
+  protected def generateData(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dsdgenDir"
+
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+  extends DataGenerator with Serializable {
+  override protected val toolsDir: String = s"$dbgenDir"
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val sparkSQLContext: SQLContext

Review Comment:
   BTW. `SQLContext` is an old API. It seems it's the time use `SparkSession` instead. cc @cloud-fan @MaxGekk @wangyum 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1258132477


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,19 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
-  // See scala.sys.process.Streamed
-  private final class BlockingStreamed[T](
-    val process: T => Unit,
-    val done: Int => Unit,
-    val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
 
-  // See scala.sys.process.Streamed
-  private object BlockingStreamed {
-    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
-    // which causes OOMs if the consumer cannot keep up with producer.
-    val maxQueueSize = 65536
+class TPCDSTables(val spark: SparkSession, config: GenTPCDataConfig)
+  extends TableGenerator with TPCDSSchema with Logging with Serializable {
 
-    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
-      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
-      def next(): Stream[T] = q.take match {
-        case Left(0) => Stream.empty
-        case Left(code) =>
-          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
-        case Right(s) => Stream.cons(s, next())
-      }
-
-      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
-    }
-  }
-
-  // See scala.sys.process.ProcessImpl.Spawn
-  private object Spawn {
-    def apply(f: => Unit): Thread = apply(f, daemon = false)
-    def apply(f: => Unit, daemon: Boolean): Thread = {
-      val thread = new Thread() { override def run() = { f } }
-      thread.setDaemon(daemon)
-      thread.start()
-      thread
-    }
-  }
-
-  def apply(command: Seq[String]): Stream[String] = {
-    val streamed = BlockingStreamed[String](true)
-    val process = command.run(BasicIO(false, streamed.process, None))
-    Spawn(streamed.done(process.exitValue()))
-    streamed.stream()
-  }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
-  private val dsdgen = s"$dsdgenDir/dsdgen"
-
-  def generate(
-      sparkContext: SparkContext,
-      tableName: String,
-      partitions: Int,
-      scaleFactor: Int): RDD[String] = {
-    val generatedData = {
-      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
-        val localToolsDir = if (new java.io.File(dsdgen).exists) {
-          dsdgenDir
-        } else if (new java.io.File(s"/$dsdgen").exists) {
-          s"/$dsdgenDir"
-        } else {
-          throw new IllegalStateException(
-            s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
-        }
-
-        // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
-        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
-        val commands = Seq(
-          "bash", "-c",
-          s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
-          s"-RNGSEED 100 $parallel")
-        BlockingLineStream(commands)
-      }
-    }
-
-    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
-    generatedData
-  }
-}
-
-class TPCDSTables(spark: SparkSession, dsdgenDir: String, scaleFactor: Int)

Review Comment:
   Although this is a developer API, I think keep it and do not change the signature is better.



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected def generateData(
+      sparkContext: SparkContext,
+      toolsDir: String,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, dsdgenDir, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String] = Nil)
+  extends DataGenerator with Serializable {
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, dbgenDir, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val spark: SparkSession
+  protected val scaleFactor: Int

Review Comment:
   Please add `protected val dsdgenDir: String` so that the default constructor of TPCDSTables without change.



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -429,8 +58,7 @@ object GenTPCDSData {
 
     val tables = new TPCDSTables(
       spark,
-      dsdgenDir = config.dsdgenDir,
-      scaleFactor = config.scaleFactor)
+      config)

Review Comment:
   Please recover the change.



##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,19 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
-  // See scala.sys.process.Streamed
-  private final class BlockingStreamed[T](
-    val process: T => Unit,
-    val done: Int => Unit,
-    val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
 
-  // See scala.sys.process.Streamed
-  private object BlockingStreamed {
-    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
-    // which causes OOMs if the consumer cannot keep up with producer.
-    val maxQueueSize = 65536
+class TPCDSTables(val spark: SparkSession, config: GenTPCDataConfig)

Review Comment:
   `class TPCDSTables(val spark: SparkSession, dsdgenDir: String, scaleFactor: Int)`
   
   We can pass `config.dsdgenDir` and `config.scaleFactor` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1258168110


##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+  // See scala.sys.process.Streamed
+  private final class BlockingStreamed[T](
+      val process: T => Unit,
+      val done: Int => Unit,
+      val stream: () => Stream[T])
+
+  // See scala.sys.process.Streamed
+  private object BlockingStreamed {
+    // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+    // which causes OOMs if the consumer cannot keep up with producer.
+    val maxQueueSize = 65536
+
+    def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+      val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+      def next(): Stream[T] = q.take match {
+        case Left(0) => Stream.empty
+        case Left(code) =>
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+        case Right(s) => Stream.cons(s, next())
+      }
+
+      new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+    }
+  }
+
+  // See scala.sys.process.ProcessImpl.Spawn
+  private object Spawn {
+    def apply(f: => Unit): Thread = apply(f, daemon = false)
+    def apply(f: => Unit, daemon: Boolean): Thread = {
+      val thread = new Thread() { override def run() = { f } }
+      thread.setDaemon(daemon)
+      thread.start()
+      thread
+    }
+  }
+
+  def apply(command: Seq[String]): Stream[String] = {
+    val streamed = BlockingStreamed[String](true)
+    val process = command.run(BasicIO(false, streamed.process, None))
+    Spawn(streamed.done(process.exitValue()))
+    streamed.stream()
+  }
+}
+
+trait DataGenerator extends Serializable {
+  protected def generateData(
+      sparkContext: SparkContext,
+      toolsDir: String,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int,
+      command: String): RDD[String] = {
+    val generatedData = {
+      sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+        val localToolsDir = if (new java.io.File(toolsDir).exists) {
+          toolsDir
+        } else if (new java.io.File(s"/$toolsDir").exists) {
+          s"/$toolsDir"
+        } else {
+          throw new IllegalStateException(
+            s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+        }
+
+        val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+        val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+        BlockingLineStream(commands)
+      }
+    }
+
+    generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+    generatedData
+  }
+
+  def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+    generateData(sparkContext, dsdgenDir, tableName, partitions, scaleFactor, command)
+  }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String] = Nil)
+  extends DataGenerator with Serializable {
+  override def generate(
+      sparkContext: SparkContext,
+      tableName: String,
+      partitions: Int,
+      scaleFactor: Int): RDD[String] = {
+    val shortTableNames = Map(
+      "customer" -> "c",
+      "lineitem" -> "L",
+      "nation" -> "n",
+      "orders" -> "O",
+      "part" -> "P",
+      "region" -> "r",
+      "supplier" -> "s",
+      "partsupp" -> "S"
+    )
+    val paramsString = params.mkString(" ")
+    val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+    generateData(sparkContext, dbgenDir, tableName, partitions, scaleFactor, command)
+  }
+}
+
+trait TableGenerator extends Serializable with Logging {
+  protected val dataGenerator: DataGenerator
+  protected val spark: SparkSession
+  protected val scaleFactor: Int

Review Comment:
   This is not used in the trait at all, so we don't need the dsdgenDir and dbgenDir to be declared in the trait, they are used to construct the DataGenerator, so we can directly use them in the TPCDSTables and TPCHTables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44301][SQL] Add Benchmark Suite for TPCH [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1784289605

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1621553373

   All benchmarks can be run using Github Action:
   
   https://github.com/apache/spark/blob/master/.github/workflows/benchmark.yml
   
   At the same time, all benchmark results generated by GitHub Action and records in Spark, such as 
   
   https://github.com/apache/spark/blob/master/sql/core/benchmarks/TPCDSQueryBenchmark-results.txt
   
   So I think if we need to introduce `TPCHQueryBenchmark`, it should also be able to run using GitHub Action like others.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1621760824

   Thanks for the review @LuciferYang, I'll make the change to add this to the Github action.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1633810972

   Thanks @beliefer,, please wait me check this pr with Scala 2.13


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634377891

   The data only 4KB when sf = 1?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263294643


##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
         # To keep the directory structure and file permissions, tar them
         # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
         echo "Preparing the benchmark results:"
-        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+        tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`

Review Comment:
   @srinivasst  I remove `--exclude=tpch-sf-1` and download the generated tpch data, then I found that the data file only contains parquet footer and no data
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1643310115

   Back from a break. I will use the official dbgen from TPCH website and update the PR. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257098969


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -63,15 +63,15 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
     System.err.println("""
       |Usage: spark-submit --class <this class> <spark sql test jar> [Options]
       |Options:
-      |  --data-location      Path to TPCDS data
+      |  --data-location      Path to TPCDS/H data
       |  --query-filter       Queries to filter, e.g., q3,q5,q13
       |  --cbo                Whether to enable cost-based optimization
       |
       |------------------------------------------------------------------------------------------------------------------
       |In order to run this benchmark, please follow the instructions at
       |https://github.com/databricks/spark-sql-perf/blob/master/README.md
-      |to generate the TPCDS data locally (preferably with a scale factor of 5 for benchmarking).
-      |Thereafter, the value of <TPCDS data location> needs to be set to the location where the generated data is stored.
+      |to generate the TPCDS/H data locally (preferably with a scale factor of 5 for benchmarking).

Review Comment:
   It seems a little odd. May it be `TPC-DS/TPC-H`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -63,15 +63,15 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
     System.err.println("""
       |Usage: spark-submit --class <this class> <spark sql test jar> [Options]
       |Options:
-      |  --data-location      Path to TPCDS data
+      |  --data-location      Path to TPCDS/H data
       |  --query-filter       Queries to filter, e.g., q3,q5,q13
       |  --cbo                Whether to enable cost-based optimization
       |
       |------------------------------------------------------------------------------------------------------------------
       |In order to run this benchmark, please follow the instructions at
       |https://github.com/databricks/spark-sql-perf/blob/master/README.md
-      |to generate the TPCDS data locally (preferably with a scale factor of 5 for benchmarking).
-      |Thereafter, the value of <TPCDS data location> needs to be set to the location where the generated data is stored.
+      |to generate the TPCDS/H data locally (preferably with a scale factor of 5 for benchmarking).
+      |Thereafter, the value of <TPCDS/H data location> needs to be set to the location where the generated data is stored.

Review Comment:
   ditto.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBasedBenchmark.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBasedBenchmark extends SqlBasedBenchmark with Logging {
+
+  val tables: Seq[String]
+  val queryType: String
+
+  override def getSparkSession: SparkSession = {
+    val conf = new SparkConf()
+      .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
+      .setAppName("test-sql-context")
+      .set("spark.sql.parquet.compression.codec", "snappy")
+      .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
+      .set("spark.driver.memory", "3g")
+      .set("spark.executor.memory", "3g")
+      .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
+      .set("spark.sql.crossJoin.enabled", "true")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrationRequired", "true")
+
+    SparkSession.builder.config(conf).getOrCreate()
+  }
+
+  def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
+    tables.map { tableName =>
+      spark.sql(s"DROP TABLE IF EXISTS $tableName")
+      val options = Map("path" -> s"$dataLocation/$tableName")
+      spark.catalog.createTable(tableName, "parquet", tableColumns(tableName), options)
+      // Recover partitions but don't fail if a table is not partitioned.
+      Try {
+        spark.sql(s"ALTER TABLE $tableName RECOVER PARTITIONS")
+      }.getOrElse {
+        logInfo(s"Recovering partitions of table $tableName failed")
+      }
+      tableName -> spark.table(tableName).count()
+    }.toMap
+
+  def runTpcQueries(
+      queryLocation: String,
+      queries: Seq[String],
+      tableSizes: Map[String, Long],
+      nameSuffix: String = ""): Unit = {
+    queries.foreach { name =>
+      val queryString = resourceToString(s"$queryLocation/$name.sql",
+        classLoader = Thread.currentThread().getContextClassLoader)
+
+      // This is an indirect hack to estimate the size of each query's input by traversing the
+      // logical plan and adding up the sizes of all tables that appear in the plan.
+      val queryRelations = scala.collection.mutable.HashSet[String]()
+      spark.sparkContext.setJobGroup(name, s"$name:\n$queryString", true)
+      spark.sql(queryString).queryExecution.analyzed.foreach {
+        case SubqueryAlias(alias, _: LogicalRelation) =>
+          queryRelations.add(alias.name)
+        case LogicalRelation(_, _, Some(catalogTable), _) =>
+          queryRelations.add(catalogTable.identifier.table)
+        case HiveTableRelation(tableMeta, _, _, _, _) =>
+          queryRelations.add(tableMeta.identifier.table)
+        case _ =>
+      }
+      val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
+      val benchmark = new Benchmark(s"$queryType Snappy", numRows, 2, output = output)
+      benchmark.addCase(s"$name$nameSuffix") { _ =>
+        spark.sql(queryString).noop()
+      }
+      benchmark.run()
+    }
+  }
+
+  def filterQueries(

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBasedBenchmark.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBasedBenchmark extends SqlBasedBenchmark with Logging {
+
+  val tables: Seq[String]
+  val queryType: String
+
+  override def getSparkSession: SparkSession = {
+    val conf = new SparkConf()
+      .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
+      .setAppName("test-sql-context")
+      .set("spark.sql.parquet.compression.codec", "snappy")
+      .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
+      .set("spark.driver.memory", "3g")
+      .set("spark.executor.memory", "3g")
+      .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
+      .set("spark.sql.crossJoin.enabled", "true")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrationRequired", "true")
+
+    SparkSession.builder.config(conf).getOrCreate()
+  }
+
+  def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
+    tables.map { tableName =>
+      spark.sql(s"DROP TABLE IF EXISTS $tableName")
+      val options = Map("path" -> s"$dataLocation/$tableName")
+      spark.catalog.createTable(tableName, "parquet", tableColumns(tableName), options)
+      // Recover partitions but don't fail if a table is not partitioned.
+      Try {
+        spark.sql(s"ALTER TABLE $tableName RECOVER PARTITIONS")
+      }.getOrElse {
+        logInfo(s"Recovering partitions of table $tableName failed")
+      }
+      tableName -> spark.table(tableName).count()
+    }.toMap
+
+  def runTpcQueries(

Review Comment:
   ditto.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -63,15 +63,15 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
     System.err.println("""
       |Usage: spark-submit --class <this class> <spark sql test jar> [Options]
       |Options:
-      |  --data-location      Path to TPCDS data
+      |  --data-location      Path to TPCDS/H data

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBasedBenchmark.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBasedBenchmark extends SqlBasedBenchmark with Logging {
+
+  val tables: Seq[String]
+  val queryType: String
+
+  override def getSparkSession: SparkSession = {
+    val conf = new SparkConf()
+      .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
+      .setAppName("test-sql-context")
+      .set("spark.sql.parquet.compression.codec", "snappy")
+      .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
+      .set("spark.driver.memory", "3g")
+      .set("spark.executor.memory", "3g")
+      .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
+      .set("spark.sql.crossJoin.enabled", "true")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrationRequired", "true")
+
+    SparkSession.builder.config(conf).getOrCreate()
+  }
+
+  def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =

Review Comment:
   ```suggestion
     protected def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBasedBenchmark.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBasedBenchmark extends SqlBasedBenchmark with Logging {
+
+  val tables: Seq[String]
+  val queryType: String
+
+  override def getSparkSession: SparkSession = {
+    val conf = new SparkConf()
+      .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
+      .setAppName("test-sql-context")
+      .set("spark.sql.parquet.compression.codec", "snappy")
+      .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
+      .set("spark.driver.memory", "3g")
+      .set("spark.executor.memory", "3g")
+      .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
+      .set("spark.sql.crossJoin.enabled", "true")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrationRequired", "true")
+
+    SparkSession.builder.config(conf).getOrCreate()
+  }
+
+  def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
+    tables.map { tableName =>
+      spark.sql(s"DROP TABLE IF EXISTS $tableName")
+      val options = Map("path" -> s"$dataLocation/$tableName")
+      spark.catalog.createTable(tableName, "parquet", tableColumns(tableName), options)
+      // Recover partitions but don't fail if a table is not partitioned.
+      Try {
+        spark.sql(s"ALTER TABLE $tableName RECOVER PARTITIONS")
+      }.getOrElse {
+        logInfo(s"Recovering partitions of table $tableName failed")
+      }
+      tableName -> spark.table(tableName).count()
+    }.toMap
+
+  def runTpcQueries(
+      queryLocation: String,
+      queries: Seq[String],
+      tableSizes: Map[String, Long],
+      nameSuffix: String = ""): Unit = {
+    queries.foreach { name =>
+      val queryString = resourceToString(s"$queryLocation/$name.sql",
+        classLoader = Thread.currentThread().getContextClassLoader)
+
+      // This is an indirect hack to estimate the size of each query's input by traversing the
+      // logical plan and adding up the sizes of all tables that appear in the plan.
+      val queryRelations = scala.collection.mutable.HashSet[String]()
+      spark.sparkContext.setJobGroup(name, s"$name:\n$queryString", true)
+      spark.sql(queryString).queryExecution.analyzed.foreach {
+        case SubqueryAlias(alias, _: LogicalRelation) =>
+          queryRelations.add(alias.name)
+        case LogicalRelation(_, _, Some(catalogTable), _) =>
+          queryRelations.add(catalogTable.identifier.table)
+        case HiveTableRelation(tableMeta, _, _, _, _) =>
+          queryRelations.add(tableMeta.identifier.table)
+        case _ =>
+      }
+      val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
+      val benchmark = new Benchmark(s"$queryType Snappy", numRows, 2, output = output)
+      benchmark.addCase(s"$name$nameSuffix") { _ =>
+        spark.sql(queryString).noop()
+      }
+      benchmark.run()
+    }
+  }
+
+  def filterQueries(
+      origQueries: Seq[String],
+      queryFilter: Set[String],
+      nameSuffix: String = ""): Seq[String] = {
+    if (queryFilter.nonEmpty) {
+      if (nameSuffix.nonEmpty) {
+        origQueries.filter { name => queryFilter.contains(s"$name$nameSuffix") }
+      } else {
+        origQueries.filter(queryFilter.contains)
+      }
+    } else {
+      origQueries
+    }
+  }
+
+  def configureCbo(benchmarkArgs: TPCQueryBenchmarkArguments): Unit = {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org