You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/24 03:04:05 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2439] Using Pure Java TPC-DS generator

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ecdd4228 [KYUUBI #2439] Using Pure Java TPC-DS generator
3ecdd4228 is described below

commit 3ecdd42286d2b47b3c2b7b090f4041e8c52e5e99
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Sun Apr 24 11:03:55 2022 +0800

    [KYUUBI #2439] Using Pure Java TPC-DS generator
    
    ### _Why are the changes needed?_
    
    This PR proposes change the Kyuubi TPC-DS generator to pure Java implementation instead of the original C binary.
    
    The new pure Java TPC-DS generator is under Apache License, and in fact, I don't know the original C binary License, so we exclude them from release in the past.
    
    Since the change removes the License issue of Kyuubi TPC-DS module, we can bundle the TPC-DS tool in the future release.
    
    And after migration, I haven't see "error=26, Text file busy" described in #2439 any more.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    1. Use old C binary based TPC-DS generator generate 1GB data under database `tpcds_s1`
    2. Use new pure Java base TPC-DS generator generate 1GB data under database `new_tpcds_sf1`
    3. Compare results of `select count(*)`, and `select sum(hash(*))`
    
    ```
    spark-sql> select count(*) from tpcds_s1.inventory;
    11745000
    Time taken: 0.161 seconds, Fetched 1 row(s)
    spark-sql> select count(*) from new_tpcds_sf1.inventory;
    11745000
    Time taken: 0.141 seconds, Fetched 1 row(s)
    spark-sql> select sum(hash(*)) from tpcds_s1.inventory;
    -556768665838
    Time taken: 0.252 seconds, Fetched 1 row(s)
    spark-sql> select sum(hash(*)) from new_tpcds_sf1.inventory;
    -556768665838
    Time taken: 0.232 seconds, Fetched 1 row(s)
    ```
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2449 from pan3793/tpcds.
    
    Closes #2439
    
    a270bcba [Cheng Pan] Remove the exclusion in source release
    7c8d3271 [Cheng Pan] [KYUUBI #2439] Using Pure Java TPC-DS generator
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .gitattributes                                     |   1 -
 dev/kyuubi-tpcds/pom.xml                           |  28 ++++++-
 .../src/main/resources/bin/linux/dsdgen            | Bin 505793 -> 0 bytes
 .../src/main/resources/bin/linux/tpcds.idx         | Bin 640585 -> 0 bytes
 dev/kyuubi-tpcds/src/main/resources/bin/mac/dsdgen | Bin 313256 -> 0 bytes
 .../src/main/resources/bin/mac/tpcds.idx           | Bin 640585 -> 0 bytes
 .../org/apache/kyuubi/tpcds/TableGenerator.scala   |  90 ++++-----------------
 .../kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala |  11 ++-
 .../scala/org/apache/spark/KyuubiSparkUtils.scala  |  35 --------
 pom.xml                                            |  13 +++
 10 files changed, 59 insertions(+), 119 deletions(-)

diff --git a/.gitattributes b/.gitattributes
index 6e2eb1211..b3623c426 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -21,7 +21,6 @@
 .travis.yml export-ignore
 _config.yml export-ignore
 codecov.yml export-ignore
-dev/kyuubi-tpcds/ export-ignore
 licenses-binary/ export-ignore
 LICENSE-binary export-ignore
 NOTICE-binary export-ignore
diff --git a/dev/kyuubi-tpcds/pom.xml b/dev/kyuubi-tpcds/pom.xml
index 0873b6402..9a1787826 100644
--- a/dev/kyuubi-tpcds/pom.xml
+++ b/dev/kyuubi-tpcds/pom.xml
@@ -51,8 +51,13 @@
         </dependency>
 
         <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
+            <groupId>io.trino.tpcds</groupId>
+            <artifactId>tpcds</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
         </dependency>
     </dependencies>
 
@@ -72,6 +77,25 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <shadedArtifactAttached>false</shadedArtifactAttached>
+                    <artifactSet>
+                        <includes>
+                            <include>com.github.scopt:scopt_${scala.binary.version}</include>
+                            <include>io.trino.tpcds:tpcds</include>
+                            <include>com.google.guava:guava</include>
+                        </includes>
+                    </artifactSet>
+                    <relocations>
+                        <relocation>
+                            <pattern>com.google.common</pattern>
+                            <shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
+                            <includes>
+                                <include>com.google.common.**</include>
+                            </includes>
+                        </relocation>
+                    </relocations>
+                </configuration>
                 <executions>
                     <execution>
                         <phase>package</phase>
diff --git a/dev/kyuubi-tpcds/src/main/resources/bin/linux/dsdgen b/dev/kyuubi-tpcds/src/main/resources/bin/linux/dsdgen
deleted file mode 100644
index 5c581713f..000000000
Binary files a/dev/kyuubi-tpcds/src/main/resources/bin/linux/dsdgen and /dev/null differ
diff --git a/dev/kyuubi-tpcds/src/main/resources/bin/linux/tpcds.idx b/dev/kyuubi-tpcds/src/main/resources/bin/linux/tpcds.idx
deleted file mode 100644
index 0f49b70e5..000000000
Binary files a/dev/kyuubi-tpcds/src/main/resources/bin/linux/tpcds.idx and /dev/null differ
diff --git a/dev/kyuubi-tpcds/src/main/resources/bin/mac/dsdgen b/dev/kyuubi-tpcds/src/main/resources/bin/mac/dsdgen
deleted file mode 100755
index 0124f2c93..000000000
Binary files a/dev/kyuubi-tpcds/src/main/resources/bin/mac/dsdgen and /dev/null differ
diff --git a/dev/kyuubi-tpcds/src/main/resources/bin/mac/tpcds.idx b/dev/kyuubi-tpcds/src/main/resources/bin/mac/tpcds.idx
deleted file mode 100644
index 0f49b70e5..000000000
Binary files a/dev/kyuubi-tpcds/src/main/resources/bin/mac/tpcds.idx and /dev/null differ
diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala
index 497a08bb3..ed58486fe 100644
--- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala
+++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala
@@ -17,24 +17,17 @@
 
 package org.apache.kyuubi.tpcds
 
-import java.io.InputStream
-import java.lang.ProcessBuilder.Redirect
-import java.nio.file.{Files, Paths}
-import java.nio.file.attribute.PosixFilePermissions._
+import scala.collection.JavaConverters._
 
-import scala.io.Source
-
-import org.apache.spark.{KyuubiSparkUtils, SparkEnv}
+import io.trino.tpcds.{Options, Results}
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.slf4j.{Logger, LoggerFactory}
 
 case class TableGenerator(
     name: String,
     partitionCols: Seq[String],
     fields: StructField*) {
-  @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
 
   private val schema: StructType = StructType(fields)
   private val rawSchema: StructType = StructType(fields.map(f => StructField(f.name, StringType)))
@@ -54,72 +47,19 @@ case class TableGenerator(
   private def radix: Int = (scaleFactor / 100) max 5 min parallelism
 
   private def toDF: DataFrame = {
-    val rawRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
-      val os = System.getProperty("os.name").split(' ')(0).toLowerCase
-      val loader = Thread.currentThread().getContextClassLoader
-
-      val tempDir = KyuubiSparkUtils.createTempDir(SparkEnv.get.conf)
-      tempDir.toPath
-      val dsdgen = Paths.get(tempDir.toString, "dsdgen")
-      val idx = Paths.get(tempDir.toString, "tpcds.idx")
-
-      Seq(dsdgen, idx).foreach { file =>
-        val in: InputStream = loader.getResourceAsStream(s"bin/$os/${file.toFile.getName}")
-        Files.createFile(file, asFileAttribute(fromString("rwx------")))
-        val outputStream = Files.newOutputStream(file)
-        try {
-          val buffer = new Array[Byte](8192)
-          var bytesRead = 0
-          val canRead = () => {
-            bytesRead = in.read(buffer)
-            bytesRead != -1
-          }
-          while (canRead()) {
-            outputStream.write(buffer, 0, bytesRead)
-          }
-        } finally {
-          outputStream.flush()
-          outputStream.close()
-          in.close()
-        }
-      }
-
-      val cmd = s"./dsdgen" +
-        s" -TABLE $name" +
-        s" -SCALE $scaleFactor" +
-        s" -PARALLEL $parallelism" +
-        s" -child $i" +
-        s" -DISTRIBUTIONS tpcds.idx" +
-        s" -FORCE Y" +
-        s" -QUIET Y"
-
-      val builder = new ProcessBuilder(cmd.split(" "): _*)
-      builder.directory(tempDir)
-      builder.redirectError(Redirect.INHERIT)
-      logger.info(s"Start $cmd at ${builder.directory()}")
-      val process = builder.start()
-      val res = process.waitFor()
-
-      logger.info(s"Finish w/ $res $cmd")
-      val data = Paths.get(tempDir.toString, s"${name}_${i}_$parallelism.dat")
-      val iterator =
-        if (Files.exists(data)) {
-          // The data generated by `dsdgen` encoding in "Cp1252".
-          // See detail at https://github.com/databricks/spark-sql-perf/pull/104
-          // noinspection SourceNotClosed
-          Source.fromFile(data.toFile, "cp1252", 8192).getLines
-        } else {
-          logger.warn(s"No data generated in child $i")
-          Nil
-        }
-      iterator
-    }
-
-    val rowRDD = rawRDD.mapPartitions { iter =>
-      iter.map { line =>
-        val v = line.split("\\|", -1).dropRight(1).map(Option(_).filter(_.nonEmpty).orNull)
-        Row.fromSeq(v)
-      }
+    val rowRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
+      val opt = new Options
+      opt.table = name
+      opt.scale = scaleFactor
+      opt.parallelism = parallelism
+
+      val session = opt.toSession.withChunkNumber(i)
+      val table = session.getOnlyTableToGenerate
+
+      Results.constructResults(table, session).iterator.asScala
+        .map { _.get(0).asScala } // 1st row is specific table row
+        .map { row => row.map { v => if (v == Options.DEFAULT_NULL_STRING) null else v } }
+        .map { row => Row.fromSeq(row) }
     }
 
     val columns = fields.map { f => col(f.name).cast(f.dataType).as(f.name) }
diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
index 24cac4c67..220caafc2 100644
--- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
+++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala
@@ -17,9 +17,7 @@
 
 package org.apache.kyuubi.tpcds.benchmark
 
-import java.nio.charset.StandardCharsets
-
-import org.apache.commons.io.IOUtils
+import scala.io.{Codec, Source}
 
 /**
  * This implements the official TPCDS v2.4 queries with only cosmetic modifications.
@@ -135,9 +133,10 @@ trait TPCDS_2_4_Queries extends Benchmark {
     "ss_max")
 
   val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
-    val queryContent: String = IOUtils.toString(
-      getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql"),
-      StandardCharsets.UTF_8)
+    val in = getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
+    val queryContent: String = Source.fromInputStream(in)(Codec.UTF8).mkString
+    in.close()
+
     Query(
       queryName + "-v2.4",
       queryContent,
diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/spark/KyuubiSparkUtils.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/spark/KyuubiSparkUtils.scala
deleted file mode 100644
index a0dab1459..000000000
--- a/dev/kyuubi-tpcds/src/main/scala/org/apache/spark/KyuubiSparkUtils.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import org.apache.spark.util.Utils
-
-object KyuubiSparkUtils {
-
-  def getLocalDir(conf: SparkConf): String = {
-    org.apache.spark.util.Utils.getLocalDir(conf)
-  }
-
-  def createTempDir(conf: SparkConf): File = {
-    val root = Utils.getLocalDir(conf)
-    Utils.createTempDir(root, "tpcds")
-  }
-
-}
diff --git a/pom.xml b/pom.xml
index d32a25bfa..23875fb64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,7 @@
         <swagger.version>2.1.11</swagger.version>
         <testcontainers.version>0.39.12</testcontainers.version>
         <trino.client.version>363</trino.client.version>
+        <trino.tpcds.version>1.4</trino.tpcds.version>
         <zookeeper.version>3.4.14</zookeeper.version>
 
         <!-- apply to kyuubi-hive-jdbc/kyuubi-hive-beeline module -->
@@ -518,6 +519,18 @@
                 <version>${trino.client.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>io.trino.tpcds</groupId>
+                <artifactId>tpcds</artifactId>
+                <version>${trino.tpcds.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>*</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
             <dependency>
                 <groupId>com.dimafeng</groupId>
                 <artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>