You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/07/05 13:36:03 UTC
[1/2] carbondata git commit: modify compare test
Repository: carbondata
Updated Branches:
refs/heads/master d9c3b4837 -> 26d2f1c83
modify compare test
fix
fix style
change table
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/327b307f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/327b307f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/327b307f
Branch: refs/heads/master
Commit: 327b307fdddc7b0fffe5b86049d1a2d08dfb182a
Parents: d9c3b48
Author: jackylk <ja...@huawei.com>
Authored: Mon Jul 3 21:54:39 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 5 21:34:56 2017 +0800
----------------------------------------------------------------------
.../carbondata/examples/CompareTest.scala | 103 ++++++++++++-------
1 file changed, 67 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/327b307f/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
index ee53c31..ffc4b22 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
@@ -41,6 +41,7 @@ case class Query(sqlText: String, queryType: String, desc: String)
object CompareTest {
def parquetTableName: String = "comparetest_parquet"
+ def orcTableName: String = "comparetest_orc"
def carbonTableName(version: String): String = s"comparetest_carbonV$version"
// Table schema:
@@ -63,7 +64,7 @@ object CompareTest {
// +-------------+-----------+-------------+-------------+------------+
// | m4 | double | NA | measure | no |
// +-------------+-----------+-------------+-------------+------------+
- // | m5 | double | NA | measure | no |
+ // | m5 | decimal | NA | measure | no |
// +-------------+-----------+-------------+-------------+------------+
private def generateDataFrame(spark: SparkSession): DataFrame = {
val r = new Random()
@@ -71,10 +72,11 @@ object CompareTest {
.parallelize(1 to 10 * 1000 * 1000, 4)
.map { x =>
("city" + x % 8, "country" + x % 1103, "planet" + x % 10007, "IDENTIFIER" + x.toString,
- (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, x.toDouble / 11)
+ (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
+ BigDecimal.valueOf(x.toDouble / 11))
}.map { x =>
- Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
- }
+ Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
+ }
val schema = StructType(
Seq(
@@ -86,7 +88,7 @@ object CompareTest {
StructField("m2", IntegerType, nullable = false),
StructField("m3", LongType, nullable = false),
StructField("m4", DoubleType, nullable = false),
- StructField("m5", DoubleType, nullable = false)
+ StructField("m5", DecimalType(30, 10), nullable = false)
)
)
@@ -142,12 +144,12 @@ object CompareTest {
// == FULL SCAN GROUP BY AGGREGATE ==
// ===========================================================================
Query(
- "select country, sum(m1) from $table group by country",
+ "select country, sum(m1) as metric from $table group by country order by metric",
"aggregate",
"group by on big data, on medium card column, medium result set,"
),
Query(
- "select city, sum(m1) from $table group by city",
+ "select city, sum(m1) as metric from $table group by city order by metric",
"aggregate",
"group by on big data, on low card column, small result set,"
),
@@ -170,17 +172,20 @@ object CompareTest {
// == FILTER SCAN GROUP BY AGGREGATION ==
// ===========================================================================
Query(
- "select country, sum(m1) from $table where city='city8' group by country ",
+ "select country, sum(m1) as metric from $table where city='city8' group by country " +
+ "order by metric",
"filter scan and aggregate",
"group by on large data, small result set"
),
Query(
- "select id, sum(m1) from $table where planet='planet10' group by id",
+ "select id, sum(m1) as metric from $table where planet='planet10' group by id " +
+ "order by metric",
"filter scan and aggregate",
"group by on medium data, large result set"
),
Query(
- "select city, sum(m1) from $table where country='country12' group by city ",
+ "select city, sum(m1) as metric from $table where country='country12' group by city " +
+ "order by metric",
"filter scan and aggregate",
"group by on medium data, small result set"
),
@@ -244,25 +249,35 @@ object CompareTest {
)
)
- private def loadParquetTable(spark: SparkSession, input: DataFrame): Double = time {
+ private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String)
+ : Double = time {
// partitioned by last 1 digit of id column
val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
dfWithPartition.write
.partitionBy("partitionCol")
.mode(SaveMode.Overwrite)
- .parquet(parquetTableName)
+ .parquet(table)
+ spark.read.parquet(table).registerTempTable(table)
+ }
+
+ private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time {
+ // partitioned by last 1 digit of id column
+ input.write
+ .mode(SaveMode.Overwrite)
+ .orc(table)
+ spark.read.orc(table).registerTempTable(table)
}
- private def loadCarbonTable(spark: SparkSession, input: DataFrame, version: String): Double = {
+ private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = {
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
- version
+ "3"
)
- spark.sql(s"drop table if exists ${carbonTableName(version)}")
+ spark.sql(s"drop table if exists $tableName")
time {
input.write
.format("carbondata")
- .option("tableName", carbonTableName(version))
+ .option("tableName", tableName)
.option("tempCSV", "false")
.option("single_pass", "true")
.option("dictionary_exclude", "id") // id is high cardinality column
@@ -273,18 +288,23 @@ object CompareTest {
}
// load data into parquet, carbonV2, carbonV3
- private def prepareTable(spark: SparkSession): Unit = {
+ private def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
val df = generateDataFrame(spark).cache
println(s"loading ${df.count} records, schema: ${df.schema}")
- val loadParquetTime = loadParquetTable(spark, df)
- val loadCarbonV3Time = loadCarbonTable(spark, df, version = "3")
- println(s"load completed, time: $loadParquetTime, $loadCarbonV3Time")
+ val table1Time = if (table1.endsWith("parquet")) {
+ loadParquetTable(spark, df, table1)
+ } else if (table1.endsWith("orc")) {
+ loadOrcTable(spark, df, table1)
+ } else {
+ sys.error("invalid table: " + table1)
+ }
+ val table2Time = loadCarbonTable(spark, df, table2)
+ println(s"load completed, time: $table1Time, $table2Time")
df.unpersist()
- spark.read.parquet(parquetTableName).registerTempTable(parquetTableName)
}
// Run all queries for the specified table
- private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Int)] = {
+ private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Array[Row])] = {
println(s"start running queries for $tableName...")
var result: Array[Row] = null
queries.zipWithIndex.map { case (query, index) =>
@@ -294,37 +314,46 @@ object CompareTest {
result = spark.sql(sqlText).collect()
}
println(s"=> $rt sec")
- (rt, result.length)
+ (rt, result)
+ }
+ }
+
+ private def printErrorIfNotMatch(index: Int, table1: String, result1: Array[Row],
+ table2: String, result2: Array[Row]): Unit = {
+ if (!result1.sameElements(result2)) {
+ val num = index + 1
+ println(s"$table1 result for query $num: ")
+ println(s"""${result1.mkString(",")}""")
+ println(s"$table2 result for query $num: ")
+ println(s"""${result2.mkString(",")}""")
+ sys.error(s"result not matching for query $num (${queries(index).desc})")
}
}
// run testcases and print comparison result
- private def runTest(spark: SparkSession): Unit = {
+ private def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = new Date
val timestamp = date.getTime
// run queries on parquet and carbon
- val parquetResult: Array[(Double, Int)] = runQueries(spark, parquetTableName)
+ val table1Result: Array[(Double, Array[Row])] = runQueries(spark, table1)
// do GC and sleep for some time before running next table
System.gc()
Thread.sleep(1000)
System.gc()
Thread.sleep(1000)
- val carbonResult: Array[(Double, Int)] = runQueries(spark, carbonTableName("3"))
+ val table2Result: Array[(Double, Array[Row])] = runQueries(spark, table2)
// check result by comparing output from parquet and carbon
- parquetResult.zipWithIndex.foreach { case (result, index) =>
- if (result._2 != carbonResult(index)._2) {
- sys.error(s"result not matching for query ${index + 1} (${queries(index).desc}): " +
- s"${result._2} and ${carbonResult(index)._2}")
- }
+ table1Result.zipWithIndex.foreach { case (result, index) =>
+ printErrorIfNotMatch(index, table1, result._2, table2, table2Result(index)._2)
}
// print all response time in JSON format, so that it can be analyzed later
queries.zipWithIndex.foreach { case (query, index) =>
println("{" +
s""""query":"${index + 1}", """ +
- s""""parquetTime":${parquetResult(index)._1}, """ +
- s""""carbonTime":${carbonResult(index)._1}, """ +
- s""""fetched":${parquetResult(index)._2}, """ +
+ s""""$table1 time":${table1Result(index)._1}, """ +
+ s""""$table2 time":${table2Result(index)._1}, """ +
+ s""""fetched":${table1Result(index)._2.length}, """ +
s""""type":"${query.queryType}", """ +
s""""desc":"${query.desc}", """ +
s""""date": "${formatter.format(date)}" """ +
@@ -351,8 +380,10 @@ object CompareTest {
.getOrCreateCarbonSession(storeLocation)
spark.sparkContext.setLogLevel("warn")
- prepareTable(spark)
- runTest(spark)
+ val table1 = parquetTableName
+ val table2 = carbonTableName("3")
+ prepareTable(spark, table1, table2)
+ runTest(spark, table1, table2)
spark.close()
}
[2/2] carbondata git commit: [CARBONDATA-1259] CompareTest
improvement This closes #1129
Posted by ch...@apache.org.
[CARBONDATA-1259] CompareTest improvement This closes #1129
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/26d2f1c8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/26d2f1c8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/26d2f1c8
Branch: refs/heads/master
Commit: 26d2f1c83c64a677221012319d1ec86aee429103
Parents: d9c3b48 327b307
Author: chenliang613 <ch...@apache.org>
Authored: Wed Jul 5 21:35:45 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 5 21:35:45 2017 +0800
----------------------------------------------------------------------
.../carbondata/examples/CompareTest.scala | 103 ++++++++++++-------
1 file changed, 67 insertions(+), 36 deletions(-)
----------------------------------------------------------------------