You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/24 03:04:05 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2439] Using Pure Java TPC-DS generator
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3ecdd4228 [KYUUBI #2439] Using Pure Java TPC-DS generator
3ecdd4228 is described below
commit 3ecdd42286d2b47b3c2b7b090f4041e8c52e5e99
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Sun Apr 24 11:03:55 2022 +0800
[KYUUBI #2439] Using Pure Java TPC-DS generator
### _Why are the changes needed?_
This PR proposes change the Kyuubi TPC-DS generator to pure Java implementation instead of the original C binary.
The new pure Java TPC-DS generator is under Apache License, and in fact, I don't know the original C binary License, so we exclude them from release in the past.
Since the change removes the License issue of Kyuubi TPC-DS module, we can bundle the TPC-DS tool in the future release.
And after migration, I haven't see "error=26, Text file busy" described in #2439 any more.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
1. Use old C binary based TPC-DS generator generate 1GB data under database `tpcds_s1`
2. Use new pure Java base TPC-DS generator generate 1GB data under database `new_tpcds_sf1`
3. Compare results of `select count(*)`, and `select sum(hash(*))`
```
spark-sql> select count(*) from tpcds_s1.inventory;
11745000
Time taken: 0.161 seconds, Fetched 1 row(s)
spark-sql> select count(*) from new_tpcds_sf1.inventory;
11745000
Time taken: 0.141 seconds, Fetched 1 row(s)
spark-sql> select sum(hash(*)) from tpcds_s1.inventory;
-556768665838
Time taken: 0.252 seconds, Fetched 1 row(s)
spark-sql> select sum(hash(*)) from new_tpcds_sf1.inventory;
-556768665838
Time taken: 0.232 seconds, Fetched 1 row(s)
```
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2449 from pan3793/tpcds.
Closes #2439
a270bcba [Cheng Pan] Remove the exclusion in source release
7c8d3271 [Cheng Pan] [KYUUBI #2439] Using Pure Java TPC-DS generator
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.gitattributes | 1 -
dev/kyuubi-tpcds/pom.xml | 28 ++++++-
.../src/main/resources/bin/linux/dsdgen | Bin 505793 -> 0 bytes
.../src/main/resources/bin/linux/tpcds.idx | Bin 640585 -> 0 bytes
dev/kyuubi-tpcds/src/main/resources/bin/mac/dsdgen | Bin 313256 -> 0 bytes
.../src/main/resources/bin/mac/tpcds.idx | Bin 640585 -> 0 bytes
.../org/apache/kyuubi/tpcds/TableGenerator.scala | 90 ++++-----------------
.../kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala | 11 ++-
.../scala/org/apache/spark/KyuubiSparkUtils.scala | 35 --------
pom.xml | 13 +++
10 files changed, 59 insertions(+), 119 deletions(-)
diff --git a/.gitattributes b/.gitattributes
index 6e2eb1211..b3623c426 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -21,7 +21,6 @@
.travis.yml export-ignore
_config.yml export-ignore
codecov.yml export-ignore
-dev/kyuubi-tpcds/ export-ignore
licenses-binary/ export-ignore
LICENSE-binary export-ignore
NOTICE-binary export-ignore
diff --git a/dev/kyuubi-tpcds/pom.xml b/dev/kyuubi-tpcds/pom.xml
index 0873b6402..9a1787826 100644
--- a/dev/kyuubi-tpcds/pom.xml
+++ b/dev/kyuubi-tpcds/pom.xml
@@ -51,8 +51,13 @@
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
+ <groupId>io.trino.tpcds</groupId>
+ <artifactId>tpcds</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
</dependency>
</dependencies>
@@ -72,6 +77,25 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>com.github.scopt:scopt_${scala.binary.version}</include>
+ <include>io.trino.tpcds:tpcds</include>
+ <include>com.google.guava:guava</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
+ <includes>
+ <include>com.google.common.**</include>
+ </includes>
+ </relocation>
+ </relocations>
+ </configuration>
<executions>
<execution>
<phase>package</phase>
diff --git a/dev/kyuubi-tpcds/src/main/resources/bin/linux/dsdgen b/dev/kyuubi-tpcds/src/main/resources/bin/linux/dsdgen
deleted file mode 100644
index 5c581713f..000000000
Binary files a/dev/kyuubi-tpcds/src/main/resources/bin/linux/dsdgen and /dev/null differ
diff --git a/dev/kyuubi-tpcds/src/main/resources/bin/linux/tpcds.idx b/dev/kyuubi-tpcds/src/main/resources/bin/linux/tpcds.idx
deleted file mode 100644
index 0f49b70e5..000000000
Binary files a/dev/kyuubi-tpcds/src/main/resources/bin/linux/tpcds.idx and /dev/null differ
diff --git a/dev/kyuubi-tpcds/src/main/resources/bin/mac/dsdgen b/dev/kyuubi-tpcds/src/main/resources/bin/mac/dsdgen
deleted file mode 100755
index 0124f2c93..000000000
Binary files a/dev/kyuubi-tpcds/src/main/resources/bin/mac/dsdgen and /dev/null differ
diff --git a/dev/kyuubi-tpcds/src/main/resources/bin/mac/tpcds.idx b/dev/kyuubi-tpcds/src/main/resources/bin/mac/tpcds.idx
deleted file mode 100644
index 0f49b70e5..000000000
Binary files a/dev/kyuubi-tpcds/src/main/resources/bin/mac/tpcds.idx and /dev/null differ
diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala
index 497a08bb3..ed58486fe 100644
--- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala
+++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala
@@ -17,24 +17,17 @@
package org.apache.kyuubi.tpcds
-import java.io.InputStream
-import java.lang.ProcessBuilder.Redirect
-import java.nio.file.{Files, Paths}
-import java.nio.file.attribute.PosixFilePermissions._
+import scala.collection.JavaConverters._
-import scala.io.Source
-
-import org.apache.spark.{KyuubiSparkUtils, SparkEnv}
+import io.trino.tpcds.{Options, Results}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.slf4j.{Logger, LoggerFactory}
case class TableGenerator(
name: String,
partitionCols: Seq[String],
fields: StructField*) {
- @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
private val schema: StructType = StructType(fields)
private val rawSchema: StructType = StructType(fields.map(f => StructField(f.name, StringType)))
@@ -54,72 +47,19 @@ case class TableGenerator(
private def radix: Int = (scaleFactor / 100) max 5 min parallelism
private def toDF: DataFrame = {
- val rawRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
- val os = System.getProperty("os.name").split(' ')(0).toLowerCase
- val loader = Thread.currentThread().getContextClassLoader
-
- val tempDir = KyuubiSparkUtils.createTempDir(SparkEnv.get.conf)
- tempDir.toPath
- val dsdgen = Paths.get(tempDir.toString, "dsdgen")
- val idx = Paths.get(tempDir.toString, "tpcds.idx")
-
- Seq(dsdgen, idx).foreach { file =>
- val in: InputStream = loader.getResourceAsStream(s"bin/$os/${file.toFile.getName}")
- Files.createFile(file, asFileAttribute(fromString("rwx------")))
- val outputStream = Files.newOutputStream(file)
- try {
- val buffer = new Array[Byte](8192)
- var bytesRead = 0
- val canRead = () => {
- bytesRead = in.read(buffer)
- bytesRead != -1
- }
- while (canRead()) {
- outputStream.write(buffer, 0, bytesRead)
- }
- } finally {
- outputStream.flush()
- outputStream.close()
- in.close()
- }
- }
-
- val cmd = s"./dsdgen" +
- s" -TABLE $name" +
- s" -SCALE $scaleFactor" +
- s" -PARALLEL $parallelism" +
- s" -child $i" +
- s" -DISTRIBUTIONS tpcds.idx" +
- s" -FORCE Y" +
- s" -QUIET Y"
-
- val builder = new ProcessBuilder(cmd.split(" "): _*)
- builder.directory(tempDir)
- builder.redirectError(Redirect.INHERIT)
- logger.info(s"Start $cmd at ${builder.directory()}")
- val process = builder.start()
- val res = process.waitFor()
-
- logger.info(s"Finish w/ $res $cmd")
- val data = Paths.get(tempDir.toString, s"${name}_${i}_$parallelism.dat")
- val iterator =
- if (Files.exists(data)) {
- // The data generated by `dsdgen` encoding in "Cp1252".
- // See detail at https://github.com/databricks/spark-sql-perf/pull/104
- // noinspection SourceNotClosed
- Source.fromFile(data.toFile, "cp1252", 8192).getLines
- } else {
- logger.warn(s"No data generated in child $i")
- Nil
- }
- iterator
- }
-
- val rowRDD = rawRDD.mapPartitions { iter =>
- iter.map { line =>
- val v = line.split("\\|", -1).dropRight(1).map(Option(_).filter(_.nonEmpty).orNull)
- Row.fromSeq(v)
- }
+ val rowRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
+ val opt = new Options
+ opt.table = name
+ opt.scale = scaleFactor
+ opt.parallelism = parallelism
+
+ val session = opt.toSession.withChunkNumber(i)
+ val table = session.getOnlyTableToGenerate
+
+ Results.constructResults(table, session).iterator.asScala
+ .map { _.get(0).asScala } // 1st row is specific table row
+ .map { row => row.map { v => if (v == Options.DEFAULT_NULL_STRING) null else v } }
+ .map { row => Row.fromSeq(row) }
}
val columns = fields.map { f => col(f.name).cast(f.dataType).as(f.name) }
diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
index 24cac4c67..220caafc2 100644
--- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
+++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
@@ -17,9 +17,7 @@
package org.apache.kyuubi.tpcds.benchmark
-import java.nio.charset.StandardCharsets
-
-import org.apache.commons.io.IOUtils
+import scala.io.{Codec, Source}
/**
* This implements the official TPCDS v2.4 queries with only cosmetic modifications.
@@ -135,9 +133,10 @@ trait TPCDS_2_4_Queries extends Benchmark {
"ss_max")
val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
- val queryContent: String = IOUtils.toString(
- getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql"),
- StandardCharsets.UTF_8)
+ val in = getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
+ val queryContent: String = Source.fromInputStream(in)(Codec.UTF8).mkString
+ in.close()
+
Query(
queryName + "-v2.4",
queryContent,
diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/spark/KyuubiSparkUtils.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/spark/KyuubiSparkUtils.scala
deleted file mode 100644
index a0dab1459..000000000
--- a/dev/kyuubi-tpcds/src/main/scala/org/apache/spark/KyuubiSparkUtils.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import org.apache.spark.util.Utils
-
-object KyuubiSparkUtils {
-
- def getLocalDir(conf: SparkConf): String = {
- org.apache.spark.util.Utils.getLocalDir(conf)
- }
-
- def createTempDir(conf: SparkConf): File = {
- val root = Utils.getLocalDir(conf)
- Utils.createTempDir(root, "tpcds")
- }
-
-}
diff --git a/pom.xml b/pom.xml
index d32a25bfa..23875fb64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,7 @@
<swagger.version>2.1.11</swagger.version>
<testcontainers.version>0.39.12</testcontainers.version>
<trino.client.version>363</trino.client.version>
+ <trino.tpcds.version>1.4</trino.tpcds.version>
<zookeeper.version>3.4.14</zookeeper.version>
<!-- apply to kyuubi-hive-jdbc/kyuubi-hive-beeline module -->
@@ -518,6 +519,18 @@
<version>${trino.client.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.trino.tpcds</groupId>
+ <artifactId>tpcds</artifactId>
+ <version>${trino.tpcds.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>