You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "oss-maker (via GitHub)" <gi...@apache.org> on 2023/07/04 16:54:47 UTC
[GitHub] [spark] oss-maker opened a new pull request, #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
oss-maker opened a new pull request, #41856:
URL: https://github.com/apache/spark/pull/41856
### What changes were proposed in this pull request?
This PR adds support to run benchmark for TPCH. The schema used is defined here in the official TPCH site - https://www.tpc.org/TPC_Documents_Current_Versions/pdf/TPC-H_v3.0.1.pdf
### Why are the changes needed?
This PR makes it easier to run the benchmark for TPCH especially in scenarios where we want to test aggregate performance of TPCH aggregate heavy queries like Query 1
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This PR was tested locally and verified that the benchmark runs properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1623004625
@LuciferYang I have done the required change, thanks for your help, could you please review once. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255729471
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,391 +17,20 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
-
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
-
- def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
- val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
- def next(): Stream[T] = q.take match {
- case Left(0) => Stream.empty
- case Left(code) =>
- if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
- case Right(s) => Stream.cons(s, next())
- }
-
- new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
- }
- }
-
- // See scala.sys.process.ProcessImpl.Spawn
- private object Spawn {
- def apply(f: => Unit): Thread = apply(f, daemon = false)
- def apply(f: => Unit, daemon: Boolean): Thread = {
- val thread = new Thread() { override def run() = { f } }
- thread.setDaemon(daemon)
- thread.start()
- thread
- }
- }
-
- def apply(command: Seq[String]): Stream[String] = {
- val streamed = BlockingStreamed[String](true)
- val process = command.run(BasicIO(false, streamed.process, None))
- Spawn(streamed.done(process.exitValue()))
- streamed.stream()
- }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
- private val dsdgen = s"$dsdgenDir/dsdgen"
-
- def generate(
- sparkContext: SparkContext,
- tableName: String,
- partitions: Int,
- scaleFactor: Int): RDD[String] = {
- val generatedData = {
- sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
- val localToolsDir = if (new java.io.File(dsdgen).exists) {
- dsdgenDir
- } else if (new java.io.File(s"/$dsdgen").exists) {
- s"/$dsdgenDir"
- } else {
- throw new IllegalStateException(
- s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
- }
-
- // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
- val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
- val commands = Seq(
- "bash", "-c",
- s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
- s"-RNGSEED 100 $parallel")
- BlockingLineStream(commands)
- }
- }
-
- generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
- generatedData
- }
-}
+import org.apache.spark.sql.types.StructType
class TPCDSTables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int)
- extends TPCDSSchema with Logging with Serializable {
+ extends TableGenerator with TPCDSSchema with Logging with Serializable {
- private val dataGenerator = new Dsdgen(dsdgenDir)
-
- private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+ override protected val dataGenerator: DataGenerator = new Dsdgen(dsdgenDir)
+ override protected val sparkSQLContext: SQLContext = sqlContext
+ override protected val tpcScaleFactor: Int = scaleFactor
Review Comment:
Updated. Thanks.
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
+ protected val tpcScaleFactor: Int
Review Comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257158528
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
Review Comment:
I created https://github.com/apache/spark/pull/41900 to replace `SQLContext` with `SparkSession` for `GenTPCDSData` and it works good.
@surnaik Could you wait for https://github.com/apache/spark/pull/41900 merged and update this PR ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257160176
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
Review Comment:
Sounds good. Thanks, please update once #41900 is merged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
Re: [PR] [SPARK-44301][SQL] Add Benchmark Suite for TPCH [spark]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
URL: https://github.com/apache/spark/pull/41856
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255729113
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
Review Comment:
Done, didn't notice this, Initially I was also adding the tool name, but realised it was not needed anymore
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257916349
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,20 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
+class TPCDSTables(sparkSession: SparkSession, config: GenTPCDataConfig)
Review Comment:
```suggestion
class TPCDSTables(val spark: SparkSession, config: GenTPCDataConfig)
```
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,20 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
+class TPCDSTables(sparkSession: SparkSession, config: GenTPCDataConfig)
+ extends TableGenerator with TPCDSSchema with Logging with Serializable {
- def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
- val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
- def next(): Stream[T] = q.take match {
- case Left(0) => Stream.empty
- case Left(code) =>
- if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
- case Right(s) => Stream.cons(s, next())
- }
-
- new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
- }
- }
-
- // See scala.sys.process.ProcessImpl.Spawn
- private object Spawn {
- def apply(f: => Unit): Thread = apply(f, daemon = false)
- def apply(f: => Unit, daemon: Boolean): Thread = {
- val thread = new Thread() { override def run() = { f } }
- thread.setDaemon(daemon)
- thread.start()
- thread
- }
- }
-
- def apply(command: Seq[String]): Stream[String] = {
- val streamed = BlockingStreamed[String](true)
- val process = command.run(BasicIO(false, streamed.process, None))
- Spawn(streamed.done(process.exitValue()))
- streamed.stream()
- }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
- private val dsdgen = s"$dsdgenDir/dsdgen"
-
- def generate(
- sparkContext: SparkContext,
- tableName: String,
- partitions: Int,
- scaleFactor: Int): RDD[String] = {
- val generatedData = {
- sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
- val localToolsDir = if (new java.io.File(dsdgen).exists) {
- dsdgenDir
- } else if (new java.io.File(s"/$dsdgen").exists) {
- s"/$dsdgenDir"
- } else {
- throw new IllegalStateException(
- s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
- }
-
- // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
- val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
- val commands = Seq(
- "bash", "-c",
- s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
- s"-RNGSEED 100 $parallel")
- BlockingLineStream(commands)
- }
- }
-
- generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
- generatedData
- }
-}
-
-class TPCDSTables(spark: SparkSession, dsdgenDir: String, scaleFactor: Int)
- extends TPCDSSchema with Logging with Serializable {
-
- private val dataGenerator = new Dsdgen(dsdgenDir)
-
- private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+ override protected val dataGenerator: DataGenerator = new Dsdgen(config.dsdgenDir)
+ override protected val spark: SparkSession = sparkSession
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634390549
Should a larger TPCH dataset be used?
Correspondingly, when sf=1, the size of the tpcds dataset cache is 252MB
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254290182
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -20,14 +20,11 @@ package org.apache.spark.sql.execution.benchmark
import java.util.Locale
-class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
- var dataLocation: String = sys.env.getOrElse("SPARK_TPCDS_DATA", null)
+class TPCQueryBenchmarkArguments(val args: Array[String], val dataLocationEnv: String) {
+ var dataLocation: String = sys.env.getOrElse(dataLocationEnv, null)
var queryFilter: Set[String] = Set.empty
var cboEnabled: Boolean = false
- parseArgs(args.toList)
Review Comment:
My bad, it's fixed now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1258218998
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,18 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
+class TPCDSTables(val spark: SparkSession, val dsdgenDir: String, val scaleFactor: Int)
Review Comment:
`dsdgenDir` is not extended from the parent trait, so remove the `val`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634382776
Ya, that's strange, even I'm very confused about that, using the tpch-dbgen
tool from the github.com/databricks/tpch-dbgen repository, the cache shows
~4KB.
Thanks,
Suraj Naik
On Thu, Jul 13, 2023, 8:16 PM YangJie ***@***.***> wrote:
> The data only 4KB when sf = 1?
>
> —
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/41856#issuecomment-1634377891>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AJW6OTJSNFFTSGIB4OHQTCDXQAC4RANCNFSM6AAAAAAZ564SOY>
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1622082961
@LuciferYang I have added the TPCH Benchmark to Github action, but I don't see any benchmark workflows being triggered (both TPCDS and TPCH), any idea why and how I can trigger them? Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] surnaik commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263516785
##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
# To keep the directory structure and file permissions, tar them
# See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
- tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+ tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`
Review Comment:
Interesting find, let me check the local data, but do you know where else would tpch dbgen be available from?
I'm using from github.com/databricks/tpch-dbgen
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254289533
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBenchmarkUtils.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBenchmarkUtils extends SqlBasedBenchmark with Logging {
Review Comment:
Thanks, fixed the name
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] surnaik commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255671079
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
Review Comment:
Thanks, didn't know this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634393056
Ya, let me try with 5 and check once. Will update here once I have the result.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634447494
I tried with SF 10, the actual data size is definitely large but the cache size actually shrunk to ~3.5KB. Let me locally try it once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1628494711
@beliefer Thanks for the change, I have updated my PR after the merge of #41900
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255731038
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
Review Comment:
The sqlContext is extracted out of the same SparkSession, I don't think this is required, even if we pass it, in the genData, we will have to again get hold of the sqlContext or catalog to operate on table. Let me know if this is absolutely required. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255608832
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
Review Comment:
```suggestion
protected val sqlContext: SQLContext
```
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
+ protected val tpcScaleFactor: Int
Review Comment:
```suggestion
protected val scaleFactor: Int
```
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,391 +17,20 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
-
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
-
- def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
- val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
- def next(): Stream[T] = q.take match {
- case Left(0) => Stream.empty
- case Left(code) =>
- if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
- case Right(s) => Stream.cons(s, next())
- }
-
- new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
- }
- }
-
- // See scala.sys.process.ProcessImpl.Spawn
- private object Spawn {
- def apply(f: => Unit): Thread = apply(f, daemon = false)
- def apply(f: => Unit, daemon: Boolean): Thread = {
- val thread = new Thread() { override def run() = { f } }
- thread.setDaemon(daemon)
- thread.start()
- thread
- }
- }
-
- def apply(command: Seq[String]): Stream[String] = {
- val streamed = BlockingStreamed[String](true)
- val process = command.run(BasicIO(false, streamed.process, None))
- Spawn(streamed.done(process.exitValue()))
- streamed.stream()
- }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
- private val dsdgen = s"$dsdgenDir/dsdgen"
-
- def generate(
- sparkContext: SparkContext,
- tableName: String,
- partitions: Int,
- scaleFactor: Int): RDD[String] = {
- val generatedData = {
- sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
- val localToolsDir = if (new java.io.File(dsdgen).exists) {
- dsdgenDir
- } else if (new java.io.File(s"/$dsdgen").exists) {
- s"/$dsdgenDir"
- } else {
- throw new IllegalStateException(
- s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
- }
-
- // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
- val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
- val commands = Seq(
- "bash", "-c",
- s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
- s"-RNGSEED 100 $parallel")
- BlockingLineStream(commands)
- }
- }
-
- generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
- generatedData
- }
-}
+import org.apache.spark.sql.types.StructType
class TPCDSTables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int)
- extends TPCDSSchema with Logging with Serializable {
+ extends TableGenerator with TPCDSSchema with Logging with Serializable {
- private val dataGenerator = new Dsdgen(dsdgenDir)
-
- private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+ override protected val dataGenerator: DataGenerator = new Dsdgen(dsdgenDir)
+ override protected val sparkSQLContext: SQLContext = sqlContext
+ override protected val tpcScaleFactor: Int = scaleFactor
Review Comment:
```suggestion
```
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,391 +17,20 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
-
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
-
- def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
- val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
- def next(): Stream[T] = q.take match {
- case Left(0) => Stream.empty
- case Left(code) =>
- if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
- case Right(s) => Stream.cons(s, next())
- }
-
- new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
- }
- }
-
- // See scala.sys.process.ProcessImpl.Spawn
- private object Spawn {
- def apply(f: => Unit): Thread = apply(f, daemon = false)
- def apply(f: => Unit, daemon: Boolean): Thread = {
- val thread = new Thread() { override def run() = { f } }
- thread.setDaemon(daemon)
- thread.start()
- thread
- }
- }
-
- def apply(command: Seq[String]): Stream[String] = {
- val streamed = BlockingStreamed[String](true)
- val process = command.run(BasicIO(false, streamed.process, None))
- Spawn(streamed.done(process.exitValue()))
- streamed.stream()
- }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
- private val dsdgen = s"$dsdgenDir/dsdgen"
-
- def generate(
- sparkContext: SparkContext,
- tableName: String,
- partitions: Int,
- scaleFactor: Int): RDD[String] = {
- val generatedData = {
- sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
- val localToolsDir = if (new java.io.File(dsdgen).exists) {
- dsdgenDir
- } else if (new java.io.File(s"/$dsdgen").exists) {
- s"/$dsdgenDir"
- } else {
- throw new IllegalStateException(
- s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
- }
-
- // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
- val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
- val commands = Seq(
- "bash", "-c",
- s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
- s"-RNGSEED 100 $parallel")
- BlockingLineStream(commands)
- }
- }
-
- generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
- generatedData
- }
-}
+import org.apache.spark.sql.types.StructType
class TPCDSTables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int)
- extends TPCDSSchema with Logging with Serializable {
+ extends TableGenerator with TPCDSSchema with Logging with Serializable {
- private val dataGenerator = new Dsdgen(dsdgenDir)
-
- private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+ override protected val dataGenerator: DataGenerator = new Dsdgen(dsdgenDir)
+ override protected val sparkSQLContext: SQLContext = sqlContext
+ override protected val tpcScaleFactor: Int = scaleFactor
Review Comment:
Please declare `sqlContext: SQLContext` and `scaleFactor: Int` in `TableGenerator`.
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
Review Comment:
Shall we aovid the duplicated variable `dsdgenDir` and `toolsDir` ?
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
Review Comment:
BTW. SQLContext is an old API. It seems it's the time use `SparkSession` instead. cc @cloud-fan @MaxGekk @wangyum
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634300120
https://github.com/LuciferYang/spark/actions/runs/5540894623/jobs/10117467990
Test run successful, but I doubt the generated TPCH table data may not cache successfully,
```
Cache Size: ~0 MB (4166 B)
```
<img width="1289" alt="image" src="https://github.com/apache/spark/assets/1475305/ccddfc69-fb1b-42b7-80fc-481980241d41">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634394953
Thanks @surnaik
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1625212828
@surnaik Thank you for you ping.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1625025167
@beliefer Could you also please review once. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1622097222
Triggered a workflow - https://github.com/oss-maker/spark/actions/runs/5466653951
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634342717
@LuciferYang The size of the data is incredibly small at SF=1. The cache is working fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254100099
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
Review Comment:
Yes, I have based this on that, note that there is no GenTPCHData, so, that is new
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254038964
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
Review Comment:
Based on [TPCH.scala](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/tpch/TPCH.scala)?
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+class Dbgen(dbgenDir: String, params: Seq[String]) extends Serializable {
+ val dbgen = s"$dbgenDir/dbgen"
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(dbgen).exists) {
+ dbgenDir
+ } else if (new java.io.File(s"/$dbgenDir").exists) {
+ s"/$dbgenDir"
+ } else {
+ throw new IllegalStateException
+ (s"Could not find dbgen at $dbgen or /$dbgenDir. Run install")
+ }
+ val parallel = if (partitions > 1) s"-C $partitions -S $i" else ""
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val commands = Seq(
+ "bash", "-c",
+ s"cd $localToolsDir && ./dbgen -q $paramsString " +
+ s"-T ${shortTableNames(tableName)} -s $scaleFactor $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+}
+
+class TPCHTables(
+ sqlContext: SQLContext,
+ dbgenDir: String,
+ scaleFactor: Int,
+ generatorParams: Seq[String] = Nil)
+ extends TPCHSchema with Logging with Serializable {
+
+ private val dataGenerator = new Dbgen(dbgenDir, generatorParams)
+
+ private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+ val partitionColumns = tablePartitionColumns.getOrElse(tableName, Nil)
+ .map(_.stripPrefix("`").stripSuffix("`"))
+ Table(tableName, partitionColumns, StructType.fromDDL(schemaString))
+ }.toSeq
+
+ private case class Table(name: String, partitionColumns: Seq[String], schema: StructType) {
+ def nonPartitioned: Table = {
+ Table(name, Nil, schema)
+ }
+
+ private def df(numPartition: Int) = {
+ val generatedData = dataGenerator.generate(
+ sqlContext.sparkContext, name, numPartition, scaleFactor)
+ val rows = generatedData.mapPartitions { iter =>
+ iter.map { l =>
+ val values = l.split("\\|", -1).dropRight(1).map { v =>
+ if (v.equals("")) {
+ // If the string value is an empty string, we turn it to a null
+ null
+ } else {
+ v
+ }
+ }
+ Row.fromSeq(values)
+ }
+ }
+
+ val stringData =
+ sqlContext.createDataFrame(
+ rows,
+ StructType(schema.fields.map(f => StructField(f.name, StringType))))
+
+ val convertedData = {
+ val columns = schema.fields.map { f =>
+ val c = f.dataType match {
+ // Needs right-padding for char types
+ case CharType(n) => rpad(Column(f.name), n, " ")
+ // Don't need a cast for varchar types
+ case _: VarcharType => col(f.name)
+ case _ => col(f.name).cast(f.dataType)
+ }
+ c.as(f.name)
+ }
+ stringData.select(columns: _*)
+ }
+
+ convertedData
+ }
+
+ def genData(
+ location: String,
+ format: String,
+ overwrite: Boolean,
+ clusterByPartitionColumns: Boolean,
+ filterOutNullPartitionValues: Boolean,
+ numPartitions: Int): Unit = {
+ val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore
+
+ val data = df(numPartitions)
+ val tempTableName = s"${name}_text"
+ data.createOrReplaceTempView(tempTableName)
+
+ val writer = if (partitionColumns.nonEmpty) {
+ if (clusterByPartitionColumns) {
+ val columnString = data.schema.fields.map { field =>
+ field.name
+ }.mkString(",")
+ val partitionColumnString = partitionColumns.mkString(",")
+ val predicates = if (filterOutNullPartitionValues) {
+ partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "")
+ } else {
+ ""
+ }
+
+ val query =
+ s"""
+ |SELECT
+ | $columnString
+ |FROM
+ | $tempTableName
+ |$predicates
+ |DISTRIBUTE BY
+ | $partitionColumnString
+ """.stripMargin
+ val grouped = sqlContext.sql(query)
+ logInfo(s"Pre-clustering with partitioning columns with query $query.")
+ grouped.write
+ } else {
+ data.write
+ }
+ } else {
+ // treat non-partitioned tables as "one partition" that we want to coalesce
+ if (clusterByPartitionColumns) {
+ // in case data has more than maxRecordsPerFile, split into multiple writers to improve
+ // datagen speed files will be truncated to maxRecordsPerFile value, so the final
+ // result will be the same.
+ val numRows = data.count
+ val maxRecordPerFile = Try {
+ sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt
+ }.getOrElse(0)
+
+ if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
+ val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
+ logInfo(s"Coalescing into $numFiles files")
+ data.coalesce(numFiles).write
+ } else {
+ data.coalesce(1).write
+ }
+ } else {
+ data.write
+ }
+ }
+ writer.format(format).mode(mode)
+ if (partitionColumns.nonEmpty) {
+ writer.partitionBy(partitionColumns: _*)
+ }
+ logInfo(s"Generating table $name in database to $location with save mode $mode.")
+ writer.save(location)
+ sqlContext.dropTempTable(tempTableName)
+ }
+ }
+
+ def genData(
+ location: String,
+ format: String,
+ overwrite: Boolean,
+ partitionTables: Boolean,
+ clusterByPartitionColumns: Boolean,
+ filterOutNullPartitionValues: Boolean,
+ tableFilter: String = "",
+ numPartitions: Int = 100): Unit = {
+ var tablesToBeGenerated = if (partitionTables) {
+ tables
+ } else {
+ tables.map(_.nonPartitioned)
+ }
+
+ if (!tableFilter.isEmpty) {
+ tablesToBeGenerated = tablesToBeGenerated.filter(_.name == tableFilter)
+ if (tablesToBeGenerated.isEmpty) {
+ throw new RuntimeException("Bad table name filter: " + tableFilter)
+ }
+ }
+
+ tablesToBeGenerated.foreach { table =>
+ val tableLocation = s"$location/${table.name}"
+ table.genData(tableLocation, format, overwrite, clusterByPartitionColumns,
+ filterOutNullPartitionValues, numPartitions)
+ }
+ }
+}
+
+class GenTPCHDataConfig(args: Array[String]) {
+ var master: String = "local[*]"
+ var dbgenDir: String = null
+ var location: String = null
+ var scaleFactor: Int = 1
+ var format: String = "parquet"
+ var overwrite: Boolean = false
+ var partitionTables: Boolean = false
+ var clusterByPartitionColumns: Boolean = false
+ var filterOutNullPartitionValues: Boolean = false
+ var tableFilter: String = ""
+ var numPartitions: Int = 100
+
+ parseArgs(args.toList)
+
+ private def parseArgs(inputArgs: List[String]): Unit = {
+ var args = inputArgs
+
+ while (args.nonEmpty) {
+ args match {
+ case "--master" :: value :: tail =>
+ master = value
+ args = tail
+
+ case "--dbgenDir" :: value :: tail =>
+ dbgenDir = value
+ args = tail
+
+ case "--location" :: value :: tail =>
+ location = value
+ args = tail
+
+ case "--scaleFactor" :: value :: tail =>
+ scaleFactor = toPositiveIntValue("Scale factor", value)
+ args = tail
+
+ case "--format" :: value :: tail =>
+ format = value
+ args = tail
+
+ case "--overwrite" :: tail =>
+ overwrite = true
+ args = tail
+
+ case "--partitionTables" :: tail =>
+ partitionTables = true
+ args = tail
+
+ case "--clusterByPartitionColumns" :: tail =>
+ clusterByPartitionColumns = true
+ args = tail
+
+ case "--filterOutNullPartitionValues" :: tail =>
+ filterOutNullPartitionValues = true
+ args = tail
+
+ case "--tableFilter" :: value :: tail =>
+ tableFilter = value
+ args = tail
+
+ case "--numPartitions" :: value :: tail =>
+ numPartitions = toPositiveIntValue("Number of partitions", value)
+ args = tail
+
+ case "--help" :: tail =>
+ printUsageAndExit(0)
+
+ case _ =>
+ // scalastyle:off println
+ System.err.println("Unknown/unsupported param " + args)
+ // scalastyle:on println
+ printUsageAndExit(1)
+ }
+ }
+
+ checkRequiredArguments()
+ }
+
+ private def printUsageAndExit(exitCode: Int): Unit = {
+ // scalastyle:off
+ System.err.println("""
+ |build/sbt "test:runMain <this class> [Options]"
Review Comment:
line 315 ~ 328
indentation: 2 spaces
And this file seems have significant duplication with https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala.
Is it possible to further refactor to reduce duplicate code?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -20,14 +20,11 @@ package org.apache.spark.sql.execution.benchmark
import java.util.Locale
-class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
- var dataLocation: String = sys.env.getOrElse("SPARK_TPCDS_DATA", null)
+class TPCQueryBenchmarkArguments(val args: Array[String], val dataLocationEnv: String) {
+ var dataLocation: String = sys.env.getOrElse(dataLocationEnv, null)
var queryFilter: Set[String] = Set.empty
var cboEnabled: Boolean = false
- parseArgs(args.toList)
Review Comment:
why remove the invoke of `parseArgs` and `validateArguments`, these 2 functions will be unused after this pr
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+class Dbgen(dbgenDir: String, params: Seq[String]) extends Serializable {
+ val dbgen = s"$dbgenDir/dbgen"
+ def generate(
+ sparkContext: SparkContext,
Review Comment:
line 33 ~ 36
indentation: 4 spaces
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBenchmarkUtils.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBenchmarkUtils extends SqlBasedBenchmark with Logging {
Review Comment:
suggested a different name, maybe TPCBasedBenchmark? may be other more suitable ones
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1254289875
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+class Dbgen(dbgenDir: String, params: Seq[String]) extends Serializable {
+ val dbgen = s"$dbgenDir/dbgen"
+ def generate(
+ sparkContext: SparkContext,
Review Comment:
Fixed it. Thanks
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCHData.scala:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+class Dbgen(dbgenDir: String, params: Seq[String]) extends Serializable {
+ val dbgen = s"$dbgenDir/dbgen"
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(dbgen).exists) {
+ dbgenDir
+ } else if (new java.io.File(s"/$dbgenDir").exists) {
+ s"/$dbgenDir"
+ } else {
+ throw new IllegalStateException
+ (s"Could not find dbgen at $dbgen or /$dbgenDir. Run install")
+ }
+ val parallel = if (partitions > 1) s"-C $partitions -S $i" else ""
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val commands = Seq(
+ "bash", "-c",
+ s"cd $localToolsDir && ./dbgen -q $paramsString " +
+ s"-T ${shortTableNames(tableName)} -s $scaleFactor $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+}
+
+class TPCHTables(
+ sqlContext: SQLContext,
+ dbgenDir: String,
+ scaleFactor: Int,
+ generatorParams: Seq[String] = Nil)
+ extends TPCHSchema with Logging with Serializable {
+
+ private val dataGenerator = new Dbgen(dbgenDir, generatorParams)
+
+ private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
+ val partitionColumns = tablePartitionColumns.getOrElse(tableName, Nil)
+ .map(_.stripPrefix("`").stripSuffix("`"))
+ Table(tableName, partitionColumns, StructType.fromDDL(schemaString))
+ }.toSeq
+
+ private case class Table(name: String, partitionColumns: Seq[String], schema: StructType) {
+ def nonPartitioned: Table = {
+ Table(name, Nil, schema)
+ }
+
+ private def df(numPartition: Int) = {
+ val generatedData = dataGenerator.generate(
+ sqlContext.sparkContext, name, numPartition, scaleFactor)
+ val rows = generatedData.mapPartitions { iter =>
+ iter.map { l =>
+ val values = l.split("\\|", -1).dropRight(1).map { v =>
+ if (v.equals("")) {
+ // If the string value is an empty string, we turn it to a null
+ null
+ } else {
+ v
+ }
+ }
+ Row.fromSeq(values)
+ }
+ }
+
+ val stringData =
+ sqlContext.createDataFrame(
+ rows,
+ StructType(schema.fields.map(f => StructField(f.name, StringType))))
+
+ val convertedData = {
+ val columns = schema.fields.map { f =>
+ val c = f.dataType match {
+ // Needs right-padding for char types
+ case CharType(n) => rpad(Column(f.name), n, " ")
+ // Don't need a cast for varchar types
+ case _: VarcharType => col(f.name)
+ case _ => col(f.name).cast(f.dataType)
+ }
+ c.as(f.name)
+ }
+ stringData.select(columns: _*)
+ }
+
+ convertedData
+ }
+
+ def genData(
+ location: String,
+ format: String,
+ overwrite: Boolean,
+ clusterByPartitionColumns: Boolean,
+ filterOutNullPartitionValues: Boolean,
+ numPartitions: Int): Unit = {
+ val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore
+
+ val data = df(numPartitions)
+ val tempTableName = s"${name}_text"
+ data.createOrReplaceTempView(tempTableName)
+
+ val writer = if (partitionColumns.nonEmpty) {
+ if (clusterByPartitionColumns) {
+ val columnString = data.schema.fields.map { field =>
+ field.name
+ }.mkString(",")
+ val partitionColumnString = partitionColumns.mkString(",")
+ val predicates = if (filterOutNullPartitionValues) {
+ partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "")
+ } else {
+ ""
+ }
+
+ val query =
+ s"""
+ |SELECT
+ | $columnString
+ |FROM
+ | $tempTableName
+ |$predicates
+ |DISTRIBUTE BY
+ | $partitionColumnString
+ """.stripMargin
+ val grouped = sqlContext.sql(query)
+ logInfo(s"Pre-clustering with partitioning columns with query $query.")
+ grouped.write
+ } else {
+ data.write
+ }
+ } else {
+ // treat non-partitioned tables as "one partition" that we want to coalesce
+ if (clusterByPartitionColumns) {
+ // in case data has more than maxRecordsPerFile, split into multiple writers to improve
+ // datagen speed files will be truncated to maxRecordsPerFile value, so the final
+ // result will be the same.
+ val numRows = data.count
+ val maxRecordPerFile = Try {
+ sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt
+ }.getOrElse(0)
+
+ if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
+ val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
+ logInfo(s"Coalescing into $numFiles files")
+ data.coalesce(numFiles).write
+ } else {
+ data.coalesce(1).write
+ }
+ } else {
+ data.write
+ }
+ }
+ writer.format(format).mode(mode)
+ if (partitionColumns.nonEmpty) {
+ writer.partitionBy(partitionColumns: _*)
+ }
+ logInfo(s"Generating table $name in database to $location with save mode $mode.")
+ writer.save(location)
+ sqlContext.dropTempTable(tempTableName)
+ }
+ }
+
+ def genData(
+ location: String,
+ format: String,
+ overwrite: Boolean,
+ partitionTables: Boolean,
+ clusterByPartitionColumns: Boolean,
+ filterOutNullPartitionValues: Boolean,
+ tableFilter: String = "",
+ numPartitions: Int = 100): Unit = {
+ var tablesToBeGenerated = if (partitionTables) {
+ tables
+ } else {
+ tables.map(_.nonPartitioned)
+ }
+
+ if (!tableFilter.isEmpty) {
+ tablesToBeGenerated = tablesToBeGenerated.filter(_.name == tableFilter)
+ if (tablesToBeGenerated.isEmpty) {
+ throw new RuntimeException("Bad table name filter: " + tableFilter)
+ }
+ }
+
+ tablesToBeGenerated.foreach { table =>
+ val tableLocation = s"$location/${table.name}"
+ table.genData(tableLocation, format, overwrite, clusterByPartitionColumns,
+ filterOutNullPartitionValues, numPartitions)
+ }
+ }
+}
+
+class GenTPCHDataConfig(args: Array[String]) {
+ var master: String = "local[*]"
+ var dbgenDir: String = null
+ var location: String = null
+ var scaleFactor: Int = 1
+ var format: String = "parquet"
+ var overwrite: Boolean = false
+ var partitionTables: Boolean = false
+ var clusterByPartitionColumns: Boolean = false
+ var filterOutNullPartitionValues: Boolean = false
+ var tableFilter: String = ""
+ var numPartitions: Int = 100
+
+ parseArgs(args.toList)
+
+ private def parseArgs(inputArgs: List[String]): Unit = {
+ var args = inputArgs
+
+ while (args.nonEmpty) {
+ args match {
+ case "--master" :: value :: tail =>
+ master = value
+ args = tail
+
+ case "--dbgenDir" :: value :: tail =>
+ dbgenDir = value
+ args = tail
+
+ case "--location" :: value :: tail =>
+ location = value
+ args = tail
+
+ case "--scaleFactor" :: value :: tail =>
+ scaleFactor = toPositiveIntValue("Scale factor", value)
+ args = tail
+
+ case "--format" :: value :: tail =>
+ format = value
+ args = tail
+
+ case "--overwrite" :: tail =>
+ overwrite = true
+ args = tail
+
+ case "--partitionTables" :: tail =>
+ partitionTables = true
+ args = tail
+
+ case "--clusterByPartitionColumns" :: tail =>
+ clusterByPartitionColumns = true
+ args = tail
+
+ case "--filterOutNullPartitionValues" :: tail =>
+ filterOutNullPartitionValues = true
+ args = tail
+
+ case "--tableFilter" :: value :: tail =>
+ tableFilter = value
+ args = tail
+
+ case "--numPartitions" :: value :: tail =>
+ numPartitions = toPositiveIntValue("Number of partitions", value)
+ args = tail
+
+ case "--help" :: tail =>
+ printUsageAndExit(0)
+
+ case _ =>
+ // scalastyle:off println
+ System.err.println("Unknown/unsupported param " + args)
+ // scalastyle:on println
+ printUsageAndExit(1)
+ }
+ }
+
+ checkRequiredArguments()
+ }
+
+ private def printUsageAndExit(exitCode: Int): Unit = {
+ // scalastyle:off
+ System.err.println("""
+ |build/sbt "test:runMain <this class> [Options]"
Review Comment:
Done. Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263524205
##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
# To keep the directory structure and file permissions, tar them
# See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
- tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+ tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`
Review Comment:
Sorry, I don’t know ...
##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
# To keep the directory structure and file permissions, tar them
# See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
- tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+ tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`
Review Comment:
Sorry, I don’t know ...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263294643
##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
# To keep the directory structure and file permissions, tar them
# See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
- tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+ tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`
Review Comment:
@surnaik I remove `--exclude=tpch-sf-1` and download the generated tpch data, then I found that the data file only contains parquet footer and no data
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] pan3793 commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1645013166
@surnaik Apache Kyuubi has a [Spark TPC-H Connector](https://github.com/apache/kyuubi/tree/master/extensions/spark/kyuubi-spark-connector-tpch/src/main/scala/org/apache/kyuubi/spark/connector/tpch) based on DataSource V2 API, which is inspired by [TPCH connector](https://trino.io/docs/current/connector/tpch.html#connector-tpch--page-root)
```
spark-sql \
--packages org.apache.kyuubi:kyuubi-spark-connector-tpch_2.12:1.7.1 \
--conf spark.sql.catalog.tpch=org.apache.kyuubi.spark.connector.tpch.TPCHCatalog
```
```
spark-sql> show tables in tpch.sf1;
customer
orders
lineitem
part
partsupp
supplier
nation
region
Time taken: 0.1 seconds, Fetched 8 row(s)
spark-sql> select * from tpch.sf1.orders limit 10;
1 36901 O 173665.47 1996-01-02 5-LOW Clerk#000000951 0 nstructions sleep furiously among
2 78002 O 46929.18 1996-12-01 1-URGENT Clerk#000000880 0 foxes. pending accounts at the pending, silent asymptot
3 123314 F 193846.25 1993-10-14 5-LOW Clerk#000000955 0 sly final accounts boost. carefully regular ideas cajole carefully. depos
4 136777 O 32151.78 1995-10-11 5-LOW Clerk#000000124 0 sits. slyly regular warthogs cajole. regular, regular theodolites acro
5 44485 F 144659.2 1994-07-30 5-LOW Clerk#000000925 0 quickly. bold deposits sleep slyly. packages use slyly
6 55624 F 58749.59 1992-02-21 4-NOT SPECIFIED Clerk#000000058 0 ggle. special, final requests are against the furiously specia
7 39136 O 252004.18 1996-01-10 2-HIGH Clerk#000000470 0 ly special requests
32 130057 O 208660.75 1995-07-16 2-HIGH Clerk#000000616 0 ise blithely bold, regular requests. quickly unusual dep
33 66958 F 163243.98 1993-10-27 3-MEDIUM Clerk#000000409 0 uriously. furiously final request
34 61001 O 58949.67 1998-07-21 3-MEDIUM Clerk#000000223 0 ly final packages. fluffily final deposits wake blithely ideas. spe
Time taken: 2.608 seconds, Fetched 10 row(s)
```
It could also be used to generate the data by using CTAS.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] srinivasst commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "srinivasst (via GitHub)" <gi...@apache.org>.
srinivasst commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1622126254
Lgtm
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1628680383
@beliefer Thanks for your suggestion, I wasn't even aware of this Scala feature. That's why I kept using a different name in the class parameter. I appreciate your help, this was very useful.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263294643
##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
# To keep the directory structure and file permissions, tar them
# See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
- tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+ tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`
Review Comment:
@surnaik I remove `--exclude=tpch-sf-1` and download the generated tpch data from github action, then I found that the data file only contains parquet footer and no data
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1255622200
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected val toolsDir: String
+
+ protected def generateData(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dsdgenDir"
+
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String])
+ extends DataGenerator with Serializable {
+ override protected val toolsDir: String = s"$dbgenDir"
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val sparkSQLContext: SQLContext
Review Comment:
BTW. `SQLContext` is an old API. It seems it's the time use `SparkSession` instead. cc @cloud-fan @MaxGekk @wangyum
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1258132477
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,19 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
+class TPCDSTables(val spark: SparkSession, config: GenTPCDataConfig)
+ extends TableGenerator with TPCDSSchema with Logging with Serializable {
- def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
- val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
-
- def next(): Stream[T] = q.take match {
- case Left(0) => Stream.empty
- case Left(code) =>
- if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
- case Right(s) => Stream.cons(s, next())
- }
-
- new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
- }
- }
-
- // See scala.sys.process.ProcessImpl.Spawn
- private object Spawn {
- def apply(f: => Unit): Thread = apply(f, daemon = false)
- def apply(f: => Unit, daemon: Boolean): Thread = {
- val thread = new Thread() { override def run() = { f } }
- thread.setDaemon(daemon)
- thread.start()
- thread
- }
- }
-
- def apply(command: Seq[String]): Stream[String] = {
- val streamed = BlockingStreamed[String](true)
- val process = command.run(BasicIO(false, streamed.process, None))
- Spawn(streamed.done(process.exitValue()))
- streamed.stream()
- }
-}
-
-class Dsdgen(dsdgenDir: String) extends Serializable {
- private val dsdgen = s"$dsdgenDir/dsdgen"
-
- def generate(
- sparkContext: SparkContext,
- tableName: String,
- partitions: Int,
- scaleFactor: Int): RDD[String] = {
- val generatedData = {
- sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
- val localToolsDir = if (new java.io.File(dsdgen).exists) {
- dsdgenDir
- } else if (new java.io.File(s"/$dsdgen").exists) {
- s"/$dsdgenDir"
- } else {
- throw new IllegalStateException(
- s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
- }
-
- // NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
- val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
- val commands = Seq(
- "bash", "-c",
- s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
- s"-RNGSEED 100 $parallel")
- BlockingLineStream(commands)
- }
- }
-
- generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
- generatedData
- }
-}
-
-class TPCDSTables(spark: SparkSession, dsdgenDir: String, scaleFactor: Int)
Review Comment:
Although this is a developer API, I think keep it and do not change the signature is better.
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected def generateData(
+ sparkContext: SparkContext,
+ toolsDir: String,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, dsdgenDir, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String] = Nil)
+ extends DataGenerator with Serializable {
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, dbgenDir, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val spark: SparkSession
+ protected val scaleFactor: Int
Review Comment:
Please add `protected val dsdgenDir: String` so that the default constructor of TPCDSTables without change.
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -429,8 +58,7 @@ object GenTPCDSData {
val tables = new TPCDSTables(
spark,
- dsdgenDir = config.dsdgenDir,
- scaleFactor = config.scaleFactor)
+ config)
Review Comment:
Please recover the change.
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala:
##########
@@ -17,390 +17,19 @@
package org.apache.spark.sql
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.immutable.Stream
-import scala.sys.process._
-import scala.util.Try
-
-import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions.{col, rpad}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
-
-// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
-
-/**
- * Using ProcessBuilder.lineStream produces a stream, that uses
- * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
- *
- * This causes OOM if the consumer cannot keep up with the producer.
- *
- * See scala.sys.process.ProcessBuilderImpl.lineStream
- */
-object BlockingLineStream {
-
- // See scala.sys.process.Streamed
- private final class BlockingStreamed[T](
- val process: T => Unit,
- val done: Int => Unit,
- val stream: () => Stream[T])
+import org.apache.spark.sql.types.StructType
- // See scala.sys.process.Streamed
- private object BlockingStreamed {
- // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
- // which causes OOMs if the consumer cannot keep up with producer.
- val maxQueueSize = 65536
+class TPCDSTables(val spark: SparkSession, config: GenTPCDataConfig)
Review Comment:
`class TPCDSTables(val spark: SparkSession, dsdgenDir: String, scaleFactor: Int)`
We can pass `config.dsdgenDir` and `config.scaleFactor` directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1258168110
##########
sql/core/src/test/scala/org/apache/spark/sql/GenTPCData.scala:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.sys.process._
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{col, rpad}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
+
+// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
+
+/**
+ * Using ProcessBuilder.lineStream produces a stream, that uses
+ * a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
+ *
+ * This causes OOM if the consumer cannot keep up with the producer.
+ *
+ * See scala.sys.process.ProcessBuilderImpl.lineStream
+ */
+object BlockingLineStream {
+
+ // See scala.sys.process.Streamed
+ private final class BlockingStreamed[T](
+ val process: T => Unit,
+ val done: Int => Unit,
+ val stream: () => Stream[T])
+
+ // See scala.sys.process.Streamed
+ private object BlockingStreamed {
+ // scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
+ // which causes OOMs if the consumer cannot keep up with producer.
+ val maxQueueSize = 65536
+
+ def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
+ val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
+
+ def next(): Stream[T] = q.take match {
+ case Left(0) => Stream.empty
+ case Left(code) =>
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
+ case Right(s) => Stream.cons(s, next())
+ }
+
+ new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
+ }
+ }
+
+ // See scala.sys.process.ProcessImpl.Spawn
+ private object Spawn {
+ def apply(f: => Unit): Thread = apply(f, daemon = false)
+ def apply(f: => Unit, daemon: Boolean): Thread = {
+ val thread = new Thread() { override def run() = { f } }
+ thread.setDaemon(daemon)
+ thread.start()
+ thread
+ }
+ }
+
+ def apply(command: Seq[String]): Stream[String] = {
+ val streamed = BlockingStreamed[String](true)
+ val process = command.run(BasicIO(false, streamed.process, None))
+ Spawn(streamed.done(process.exitValue()))
+ streamed.stream()
+ }
+}
+
+trait DataGenerator extends Serializable {
+ protected def generateData(
+ sparkContext: SparkContext,
+ toolsDir: String,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int,
+ command: String): RDD[String] = {
+ val generatedData = {
+ sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
+ val localToolsDir = if (new java.io.File(toolsDir).exists) {
+ toolsDir
+ } else if (new java.io.File(s"/$toolsDir").exists) {
+ s"/$toolsDir"
+ } else {
+ throw new IllegalStateException(
+ s"Could not find tools at $toolsDir or /$toolsDir. Run install")
+ }
+
+ val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
+ val commands = Seq("bash", "-c", s"cd $localToolsDir && $command $parallel")
+ BlockingLineStream(commands)
+ }
+ }
+
+ generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
+ generatedData
+ }
+
+ def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String]
+}
+
+class Dsdgen(dsdgenDir: String) extends DataGenerator with Serializable {
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val command = s"./dsdgen -table $tableName -filter Y -scale $scaleFactor -RNGSEED 100"
+ generateData(sparkContext, dsdgenDir, tableName, partitions, scaleFactor, command)
+ }
+}
+
+class Dbgen(dbgenDir: String, params: Seq[String] = Nil)
+ extends DataGenerator with Serializable {
+ override def generate(
+ sparkContext: SparkContext,
+ tableName: String,
+ partitions: Int,
+ scaleFactor: Int): RDD[String] = {
+ val shortTableNames = Map(
+ "customer" -> "c",
+ "lineitem" -> "L",
+ "nation" -> "n",
+ "orders" -> "O",
+ "part" -> "P",
+ "region" -> "r",
+ "supplier" -> "s",
+ "partsupp" -> "S"
+ )
+ val paramsString = params.mkString(" ")
+ val command = s"./dbgen -q $paramsString -T ${shortTableNames(tableName)} -s $scaleFactor"
+ generateData(sparkContext, dbgenDir, tableName, partitions, scaleFactor, command)
+ }
+}
+
+trait TableGenerator extends Serializable with Logging {
+ protected val dataGenerator: DataGenerator
+ protected val spark: SparkSession
+ protected val scaleFactor: Int
Review Comment:
This is not used in the trait at all, so we don't need the dsdgenDir and dbgenDir to be declared in the trait, they are used to construct the DataGenerator, so we can directly use them in the TPCDSTables and TPCHTables.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
Re: [PR] [SPARK-44301][SQL] Add Benchmark Suite for TPCH [spark]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1784289605
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1621553373
All benchmarks can be run using Github Action:
https://github.com/apache/spark/blob/master/.github/workflows/benchmark.yml
At the same time, all benchmark results generated by GitHub Action and records in Spark, such as
https://github.com/apache/spark/blob/master/sql/core/benchmarks/TPCDSQueryBenchmark-results.txt
So I think if we need to introduce `TPCHQueryBenchmark`, it should also be able to run using GitHub Action like others.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] oss-maker commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "oss-maker (via GitHub)" <gi...@apache.org>.
oss-maker commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1621760824
Thanks for the review @LuciferYang, I'll make the change to add this to the Github action.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1633810972
Thanks @beliefer,, please wait me check this pr with Scala 2.13
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1634377891
The data only 4KB when sf = 1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1263294643
##########
.github/workflows/benchmark.yml:
##########
@@ -186,7 +251,7 @@ jobs:
# To keep the directory structure and file permissions, tar them
# See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
- tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
+ tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpch-sf-1 --exclude-standard`
Review Comment:
@srinivasst I remove `--exclude=tpch-sf-1` and download the generated tpch data, then I found that the data file only contains parquet footer and no data
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] surnaik commented on pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "surnaik (via GitHub)" <gi...@apache.org>.
surnaik commented on PR #41856:
URL: https://github.com/apache/spark/pull/41856#issuecomment-1643310115
Back from a break. I will use the official dbgen from TPCH website and update the PR. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41856: [SPARK-44301][SQL] Add Benchmark Suite for TPCH
Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #41856:
URL: https://github.com/apache/spark/pull/41856#discussion_r1257098969
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -63,15 +63,15 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
System.err.println("""
|Usage: spark-submit --class <this class> <spark sql test jar> [Options]
|Options:
- | --data-location Path to TPCDS data
+ | --data-location Path to TPCDS/H data
| --query-filter Queries to filter, e.g., q3,q5,q13
| --cbo Whether to enable cost-based optimization
|
|------------------------------------------------------------------------------------------------------------------
|In order to run this benchmark, please follow the instructions at
|https://github.com/databricks/spark-sql-perf/blob/master/README.md
- |to generate the TPCDS data locally (preferably with a scale factor of 5 for benchmarking).
- |Thereafter, the value of <TPCDS data location> needs to be set to the location where the generated data is stored.
+ |to generate the TPCDS/H data locally (preferably with a scale factor of 5 for benchmarking).
Review Comment:
It seems a little odd. May it be `TPC-DS/TPC-H`
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -63,15 +63,15 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
System.err.println("""
|Usage: spark-submit --class <this class> <spark sql test jar> [Options]
|Options:
- | --data-location Path to TPCDS data
+ | --data-location Path to TPCDS/H data
| --query-filter Queries to filter, e.g., q3,q5,q13
| --cbo Whether to enable cost-based optimization
|
|------------------------------------------------------------------------------------------------------------------
|In order to run this benchmark, please follow the instructions at
|https://github.com/databricks/spark-sql-perf/blob/master/README.md
- |to generate the TPCDS data locally (preferably with a scale factor of 5 for benchmarking).
- |Thereafter, the value of <TPCDS data location> needs to be set to the location where the generated data is stored.
+ |to generate the TPCDS/H data locally (preferably with a scale factor of 5 for benchmarking).
+ |Thereafter, the value of <TPCDS/H data location> needs to be set to the location where the generated data is stored.
Review Comment:
ditto.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBasedBenchmark.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBasedBenchmark extends SqlBasedBenchmark with Logging {
+
+ val tables: Seq[String]
+ val queryType: String
+
+ override def getSparkSession: SparkSession = {
+ val conf = new SparkConf()
+ .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
+ .setAppName("test-sql-context")
+ .set("spark.sql.parquet.compression.codec", "snappy")
+ .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
+ .set("spark.driver.memory", "3g")
+ .set("spark.executor.memory", "3g")
+ .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
+ .set("spark.sql.crossJoin.enabled", "true")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrationRequired", "true")
+
+ SparkSession.builder.config(conf).getOrCreate()
+ }
+
+ def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
+ tables.map { tableName =>
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ val options = Map("path" -> s"$dataLocation/$tableName")
+ spark.catalog.createTable(tableName, "parquet", tableColumns(tableName), options)
+ // Recover partitions but don't fail if a table is not partitioned.
+ Try {
+ spark.sql(s"ALTER TABLE $tableName RECOVER PARTITIONS")
+ }.getOrElse {
+ logInfo(s"Recovering partitions of table $tableName failed")
+ }
+ tableName -> spark.table(tableName).count()
+ }.toMap
+
+ def runTpcQueries(
+ queryLocation: String,
+ queries: Seq[String],
+ tableSizes: Map[String, Long],
+ nameSuffix: String = ""): Unit = {
+ queries.foreach { name =>
+ val queryString = resourceToString(s"$queryLocation/$name.sql",
+ classLoader = Thread.currentThread().getContextClassLoader)
+
+ // This is an indirect hack to estimate the size of each query's input by traversing the
+ // logical plan and adding up the sizes of all tables that appear in the plan.
+ val queryRelations = scala.collection.mutable.HashSet[String]()
+ spark.sparkContext.setJobGroup(name, s"$name:\n$queryString", true)
+ spark.sql(queryString).queryExecution.analyzed.foreach {
+ case SubqueryAlias(alias, _: LogicalRelation) =>
+ queryRelations.add(alias.name)
+ case LogicalRelation(_, _, Some(catalogTable), _) =>
+ queryRelations.add(catalogTable.identifier.table)
+ case HiveTableRelation(tableMeta, _, _, _, _) =>
+ queryRelations.add(tableMeta.identifier.table)
+ case _ =>
+ }
+ val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
+ val benchmark = new Benchmark(s"$queryType Snappy", numRows, 2, output = output)
+ benchmark.addCase(s"$name$nameSuffix") { _ =>
+ spark.sql(queryString).noop()
+ }
+ benchmark.run()
+ }
+ }
+
+ def filterQueries(
Review Comment:
ditto
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBasedBenchmark.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBasedBenchmark extends SqlBasedBenchmark with Logging {
+
+ val tables: Seq[String]
+ val queryType: String
+
+ override def getSparkSession: SparkSession = {
+ val conf = new SparkConf()
+ .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
+ .setAppName("test-sql-context")
+ .set("spark.sql.parquet.compression.codec", "snappy")
+ .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
+ .set("spark.driver.memory", "3g")
+ .set("spark.executor.memory", "3g")
+ .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
+ .set("spark.sql.crossJoin.enabled", "true")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrationRequired", "true")
+
+ SparkSession.builder.config(conf).getOrCreate()
+ }
+
+ def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
+ tables.map { tableName =>
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ val options = Map("path" -> s"$dataLocation/$tableName")
+ spark.catalog.createTable(tableName, "parquet", tableColumns(tableName), options)
+ // Recover partitions but don't fail if a table is not partitioned.
+ Try {
+ spark.sql(s"ALTER TABLE $tableName RECOVER PARTITIONS")
+ }.getOrElse {
+ logInfo(s"Recovering partitions of table $tableName failed")
+ }
+ tableName -> spark.table(tableName).count()
+ }.toMap
+
+ def runTpcQueries(
Review Comment:
ditto.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCQueryBenchmarkArguments.scala:
##########
@@ -63,15 +63,15 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
System.err.println("""
|Usage: spark-submit --class <this class> <spark sql test jar> [Options]
|Options:
- | --data-location Path to TPCDS data
+ | --data-location Path to TPCDS/H data
Review Comment:
ditto
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBasedBenchmark.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBasedBenchmark extends SqlBasedBenchmark with Logging {
+
+ val tables: Seq[String]
+ val queryType: String
+
+ override def getSparkSession: SparkSession = {
+ val conf = new SparkConf()
+ .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
+ .setAppName("test-sql-context")
+ .set("spark.sql.parquet.compression.codec", "snappy")
+ .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
+ .set("spark.driver.memory", "3g")
+ .set("spark.executor.memory", "3g")
+ .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
+ .set("spark.sql.crossJoin.enabled", "true")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrationRequired", "true")
+
+ SparkSession.builder.config(conf).getOrCreate()
+ }
+
+ def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
Review Comment:
```suggestion
protected def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCBasedBenchmark.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
+import org.apache.spark.sql.catalyst.util.resourceToString
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+trait TPCBasedBenchmark extends SqlBasedBenchmark with Logging {
+
+ val tables: Seq[String]
+ val queryType: String
+
+ override def getSparkSession: SparkSession = {
+ val conf = new SparkConf()
+ .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
+ .setAppName("test-sql-context")
+ .set("spark.sql.parquet.compression.codec", "snappy")
+ .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
+ .set("spark.driver.memory", "3g")
+ .set("spark.executor.memory", "3g")
+ .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
+ .set("spark.sql.crossJoin.enabled", "true")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrationRequired", "true")
+
+ SparkSession.builder.config(conf).getOrCreate()
+ }
+
+ def setupTables(dataLocation: String, tableColumns: Map[String, StructType]): Map[String, Long] =
+ tables.map { tableName =>
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ val options = Map("path" -> s"$dataLocation/$tableName")
+ spark.catalog.createTable(tableName, "parquet", tableColumns(tableName), options)
+ // Recover partitions but don't fail if a table is not partitioned.
+ Try {
+ spark.sql(s"ALTER TABLE $tableName RECOVER PARTITIONS")
+ }.getOrElse {
+ logInfo(s"Recovering partitions of table $tableName failed")
+ }
+ tableName -> spark.table(tableName).count()
+ }.toMap
+
+ def runTpcQueries(
+ queryLocation: String,
+ queries: Seq[String],
+ tableSizes: Map[String, Long],
+ nameSuffix: String = ""): Unit = {
+ queries.foreach { name =>
+ val queryString = resourceToString(s"$queryLocation/$name.sql",
+ classLoader = Thread.currentThread().getContextClassLoader)
+
+ // This is an indirect hack to estimate the size of each query's input by traversing the
+ // logical plan and adding up the sizes of all tables that appear in the plan.
+ val queryRelations = scala.collection.mutable.HashSet[String]()
+ spark.sparkContext.setJobGroup(name, s"$name:\n$queryString", true)
+ spark.sql(queryString).queryExecution.analyzed.foreach {
+ case SubqueryAlias(alias, _: LogicalRelation) =>
+ queryRelations.add(alias.name)
+ case LogicalRelation(_, _, Some(catalogTable), _) =>
+ queryRelations.add(catalogTable.identifier.table)
+ case HiveTableRelation(tableMeta, _, _, _, _) =>
+ queryRelations.add(tableMeta.identifier.table)
+ case _ =>
+ }
+ val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
+ val benchmark = new Benchmark(s"$queryType Snappy", numRows, 2, output = output)
+ benchmark.addCase(s"$name$nameSuffix") { _ =>
+ spark.sql(queryString).noop()
+ }
+ benchmark.run()
+ }
+ }
+
+ def filterQueries(
+ origQueries: Seq[String],
+ queryFilter: Set[String],
+ nameSuffix: String = ""): Seq[String] = {
+ if (queryFilter.nonEmpty) {
+ if (nameSuffix.nonEmpty) {
+ origQueries.filter { name => queryFilter.contains(s"$name$nameSuffix") }
+ } else {
+ origQueries.filter(queryFilter.contains)
+ }
+ } else {
+ origQueries
+ }
+ }
+
+ def configureCbo(benchmarkArgs: TPCQueryBenchmarkArguments): Unit = {
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org