You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2017/12/21 06:41:45 UTC
carbondata git commit: [CARBONDATA-1899] Add CarbonData concurrency
test case
Repository: carbondata
Updated Branches:
refs/heads/master 47aafabb3 -> 71959dad6
[CARBONDATA-1899] Add CarbonData concurrency test case
Add CarbonData concurrency test case
This closes #1670
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/71959dad
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/71959dad
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/71959dad
Branch: refs/heads/master
Commit: 71959dad67cfc3a2542474c11f5b58d9157061ae
Parents: 47aafab
Author: xubo245 <60...@qq.com>
Authored: Fri Dec 15 22:10:00 2017 +0800
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu Dec 21 12:14:40 2017 +0530
----------------------------------------------------------------------
.../carbondata/examples/ConcurrencyTest.scala | 358 +++++++++++++++++++
1 file changed, 358 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/71959dad/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala
new file mode 100644
index 0000000..a0b0e1a
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.util
+import java.util.concurrent.{Callable, Executors, Future, TimeUnit}
+
+import scala.util.Random
+
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+
+// scalastyle:off println
+object ConcurrencyTest {
+
+ var totalNum = 100 * 1000 * 1000
+ var ThreadNum = 16
+ var TaskNum = 100
+ var ResultIsEmpty = true
+ val cardinalityId = 10000 * 10000
+ val cardinalityCity = 6
+
+ def parquetTableName: String = "comparetest_parquet"
+
+ def orcTableName: String = "comparetest_orc"
+
+ def carbonTableName(version: String): String = s"comparetest_carbonV$version"
+
+ // Table schema:
+ // +-------------+-----------+-------------+-------------+------------+
+ // | id | string | 100,000,000 | dimension | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | Column name | Data type | Cardinality | Column type | Dictionary |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | city | string | 6 | dimension | yes |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | country | string | 6 | dimension | yes |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | planet | string | 100,007 | dimension | yes |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m1 | short | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m2 | int | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m3 | big int | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m4 | double | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+ // | m5 | decimal | NA | measure | no |
+ // +-------------+-----------+-------------+-------------+------------+
+
+ private def generateDataFrame(spark: SparkSession): DataFrame = {
+ val rdd = spark.sparkContext
+ .parallelize(1 to totalNum, 4)
+ .map { x =>
+ ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007,
+ (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
+ BigDecimal.valueOf(x.toDouble / 11))
+ }.map { x =>
+ Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
+ }
+
+ val schema = StructType(
+ Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("city", StringType, nullable = false),
+ StructField("country", StringType, nullable = false),
+ StructField("planet", StringType, nullable = false),
+ StructField("m1", ShortType, nullable = false),
+ StructField("m2", IntegerType, nullable = false),
+ StructField("m3", LongType, nullable = false),
+ StructField("m4", DoubleType, nullable = false),
+ StructField("m5", DecimalType(30, 10), nullable = false)
+ )
+ )
+
+ spark.createDataFrame(rdd, schema)
+ }
+
+ // performance test queries, they are designed to test various data access type
+ val r = new Random()
+ val tmpId = r.nextInt(cardinalityId) % totalNum
+ val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum)
+ val queries: Array[Query] = Array(
+ Query(
+ "select * from $table" + s" where id = '$tmpId' ",
+ "filter scan",
+ "filter on high card dimension"
+ ),
+
+ Query(
+ "select id from $table" + s" where id = '$tmpId' ",
+ "filter scan",
+ "filter on high card dimension"
+ ),
+
+ Query(
+ "select * from $table" + s" where city = '$tmpCity' ",
+ "filter scan",
+ "filter on high card dimension"
+ ),
+
+ Query(
+ "select city from $table" + s" where city = '$tmpCity' ",
+ "filter scan",
+ "filter on high card dimension"
+ ),
+
+ Query(
+ "select country, sum(m1) from $table group by country",
+ "aggregate",
+ "group by on big data, on medium card column, medium result set,"
+ ),
+
+ Query(
+ "select country, sum(m1) from $table" +
+ s" where id = '$tmpId' group by country",
+ "aggregate",
+ "group by on big data, on medium card column, medium result set,"
+ ),
+
+ Query(
+ "select t1.country, sum(t1.m1) from $table t1 join $table t2"
+ + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country",
+ "aggregate",
+ "group by on big data, on medium card column, medium result set,"
+ )
+ ,
+ Query(
+ "select t2.country, sum(t2.m1) " +
+ "from $table t1 join $table t2 join $table t3 " +
+ "join $table t4 join $table t5 join $table t6 join $table t7 " +
+ s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " +
+ s"and t1.id=t5.id and t1.id=t6.id and " +
+ s"t1.id=t7.id " +
+ s" where t2.id = '$tmpId' " +
+ s" group by t2.country",
+ "aggregate",
+ "group by on big data, on medium card column, medium result set,"
+ )
+ )
+
+ private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String)
+ : Double = time {
+ // partitioned by last 1 digit of id column
+ val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
+ dfWithPartition.write
+ .partitionBy("partitionCol")
+ .mode(SaveMode.Overwrite)
+ .parquet(table)
+ spark.read.parquet(table).createOrReplaceTempView(table)
+ }
+
+ private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time {
+ // partitioned by last 1 digit of id column
+ input.write
+ .mode(SaveMode.Overwrite)
+ .orc(table)
+ spark.read.orc(table).createOrReplaceTempView(table)
+ }
+
+ private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+ "3"
+ )
+ spark.sql(s"drop table if exists $tableName")
+ time {
+ input.write
+ .format("carbondata")
+ .option("tableName", tableName)
+ .option("tempCSV", "false")
+ .option("single_pass", "true")
+ .option("dictionary_exclude", "id") // id is high cardinality column
+ .option("table_blocksize", "32")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+ }
+
+ // load data into parquet, carbonV2, carbonV3
+ def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
+ val df = generateDataFrame(spark).cache
+ println(s"generating ${df.count} records, schema: ${df.schema}")
+ val table1Time = if (table1.endsWith("parquet")) {
+ loadParquetTable(spark, df, table1)
+ } else if (table1.endsWith("orc")) {
+ loadOrcTable(spark, df, table1)
+ } else {
+ sys.error("invalid table: " + table1)
+ }
+ val table2Time = loadCarbonTable(spark, df, table2)
+ println(s"load completed, time: $table1Time, $table2Time")
+ df.unpersist()
+ }
+
+ // Run all queries for the specified table
+ private def runQueries(spark: SparkSession, tableName: String): Unit = {
+ println(s"start running queries for $tableName...")
+ val start = System.currentTimeMillis()
+ println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " +
+ "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false")
+ queries.zipWithIndex.map { case (query, index) =>
+ val sqlText = query.sqlText.replace("$table", tableName)
+
+ val executorService = Executors.newFixedThreadPool(ThreadNum)
+ val tasks = new util.ArrayList[Callable[Results]]()
+
+ for (num <- 1 to TaskNum) {
+ tasks.add(new QueryTask(spark, sqlText))
+ }
+ val results = executorService.invokeAll(tasks)
+
+ val sql = s"query ${index + 1}: $sqlText "
+ printResult(results, sql)
+ executorService.shutdown()
+ executorService.awaitTermination(600, TimeUnit.SECONDS)
+
+ val taskTime = (System.currentTimeMillis() - start).toDouble / 1000
+ println("task time: " + taskTime.formatted("%.3f") + " s")
+ }
+ }
+
+ def printResult(results: util.List[Future[Results]], sql: String = "") {
+ val timeArray = new Array[Double](results.size())
+ val sqlResult = results.get(0).get().sqlResult
+ for (i <- 0 until results.size()) {
+ results.get(i).get()
+ }
+ for (i <- 0 until results.size()) {
+ timeArray(i) = results.get(i).get().time
+ }
+ val sortTimeArray = timeArray.sorted
+
+ // the time of 90 percent sql are finished
+ val time90 = ((sortTimeArray.length) * 0.9).toInt - 1
+ // the time of 99 percent sql are finished
+ val time99 = ((sortTimeArray.length) * 0.99).toInt - 1
+ print("90%:" + sortTimeArray(time90).formatted("%.3f") + " s," +
+ "\t99%:" + sortTimeArray(time99).formatted("%.3f") + " s," +
+ "\tlast:" + sortTimeArray.last.formatted("%.3f") + " s," +
+ "\t" + sql +
+ "\taverage:" + (timeArray.sum / timeArray.length).formatted("%.3f") + " s," +
+ "\t" + sqlResult.mkString(",") + "\t")
+ }
+
+ case class Results(time: Double, sqlResult: Array[Row])
+
+
+ class QueryTask(spark: SparkSession, query: String)
+ extends Callable[Results] with Serializable {
+ override def call(): Results = {
+ var result: Array[Row] = null
+ val rt = time {
+ result = spark.sql(query).head(1)
+ }
+ if (ResultIsEmpty) {
+ Results(rt, Array.empty[Row])
+ } else {
+ Results(rt, result)
+ }
+ }
+ }
+
+ // run testcases and print comparison result
+ def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
+ // run queries on parquet and carbon
+ runQueries(spark, table1)
+ // do GC and sleep for some time before running next table
+ System.gc()
+ Thread.sleep(1000)
+ System.gc()
+ Thread.sleep(1000)
+ runQueries(spark, table2)
+ }
+
+ def time(code: => Unit): Double = {
+ val start = System.currentTimeMillis()
+ code
+ // return time in second
+ (System.currentTimeMillis() - start).toDouble / 1000
+ }
+
+ def initParameters(arr: Array[String]): Unit = {
+ if (arr.length > 0) {
+ totalNum = arr(0).toInt
+ }
+ if (arr.length > 1) {
+ ThreadNum = arr(1).toInt
+ }
+ if (arr.length > 2) {
+ TaskNum = arr(2).toInt
+ }
+ if (arr.length > 3) {
+ ResultIsEmpty = if (arr(3).equalsIgnoreCase("true")) {
+ true
+ } else if (arr(3).equalsIgnoreCase("false")) {
+ true
+ } else {
+ throw new Exception("error parameter, should be true or false")
+ }
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ CarbonProperties.getInstance()
+ .addProperty("carbon.enable.vector.reader", "true")
+ .addProperty("enable.unsafe.sort", "true")
+ .addProperty("carbon.blockletgroup.size.in.mb", "32")
+ .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, "true")
+ import org.apache.spark.sql.CarbonSession._
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+
+ val spark = SparkSession
+ .builder()
+ .master("local[8]")
+ .enableHiveSupport()
+ .config("spark.driver.host", "127.0.0.1")
+ .getOrCreateCarbonSession(storeLocation)
+ spark.sparkContext.setLogLevel("warn")
+
+ initParameters(args)
+
+ val table1 = parquetTableName
+ val table2 = carbonTableName("3")
+ prepareTable(spark, table1, table2)
+ println("totalNum:" + totalNum + "\tThreadNum:" + ThreadNum +
+ "\tTaskNum:" + TaskNum + "\tResultIsEmpty:" + ResultIsEmpty)
+ runTest(spark, table1, table2)
+
+ CarbonUtil.deleteFoldersAndFiles(new File(table1))
+ spark.sql(s"drop table $table2")
+ spark.close()
+ }
+}
+
+// scalastyle:on println