You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2019/01/10 16:57:10 UTC
[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...
GitHub user jackylk opened a pull request:
https://github.com/apache/carbondata/pull/3066
[CARBONDATA-3244] Add benchmark for Change Data Capture scenario
CDC (change data capture) is a common scenario for analyzing slowly changed table in data warehouse.
It is good to add benchmark test comparing two update methods:
1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse.
2. carbon_solution, which uses CarbonData's update syntax to update the history table directly.
This test simulates updates to history table using CDC table.
When running in a 8-cores laptop, the benchmark shows:
1. test one
History table 1M records, update 10K records everyday and insert 10K records everyday, simulated 3 days.
hive_solution: total process time takes 13,516 ms
carbon_solution: total process time takes 7,521 ms
2. test two
History table 10M records, update 10K records everyday and insert 10K records everyday,
simulated 3 days.
hive_solution: total process time takes 104,250 ms
carbon_solution: total process time takes 17,384 ms
- [X] Any interfaces changed?
No
- [X] Any backward compatibility impacted?
No
- [X] Document update required?
No
- [X] Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
Only example is added
- [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
NA
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jackylk/incubator-carbondata cdc
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/3066.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3066
----
commit ebb5ef79ac85a6c736496fe19f719bfed74902c1
Author: Jacky Li <ja...@...>
Date: 2019-01-10T16:44:58Z
add benchmark for Change Data Capture scenario
----
---
[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3066
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2258/
---
[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...
Posted by qiuchenjian <gi...@git.apache.org>.
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3066#discussion_r246977629
--- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import java.io.File
+import java.sql.Date
+
+import org.apache.commons.lang3.time.DateUtils
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+/**
+ * Benchmark for Change Data Capture scenario.
+ * This test simulates updates to history table using CDC table.
+ *
+ * The benchmark shows performance of two update methods:
+ * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse.
+ * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly.
+ *
+ * When running in a 8-cores laptop, the benchmark shows:
+ *
+ * 1. test one
+ * History table 1M records, update 10K records everyday and insert 10K records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 13,516 ms
+ * carbon_solution: total process time takes 7,521 ms
+ *
+ *
+ * 2. test two
+ * History table 10M records, update 10K records everyday and insert 10K records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 104,250 ms
+ * carbon_solution: total process time takes 17,384 ms
+ *
+ */
+object CDCBenchmark {
+
+ // Schema for history table
+ // Table name: dw_order
+ // +-------------+-----------+-------------+
+ // | Column name | Data type | Cardinality |
+ // +-------------+-----------+-------------+
+ // | order_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | customer_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | start_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | end_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | state | int | 4 |
+ // +-------------+-----------+-------------+
+ case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date,
+ state: Int)
+
+ // Schema for CDC data which is used for update to history table every day
+ // Table name: ods_order
+ // +-------------+-----------+-------------+
+ // | Column name | Data type | Cardinality |
+ // +-------------+-----------+-------------+
+ // | order_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | customer_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | update_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | state | int | 4 |
+ // +-------------+-----------+-------------+
+ case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int)
+
+ // number of records for first day
+ val numOrders = 10000000
+
+ // number of records to update every day
+ val numUpdateOrdersDaily = 10000
+
+ // number of new records to insert every day
+ val newNewOrdersDaily = 10000
+
+ // number of days to simulate
+ val numDays = 3
+
+ // print eveyday result or not to console
+ val printDetail = false
+
+ def generateDataForDay0(
+ sparkSession: SparkSession,
+ numOrders: Int = 1000000,
+ startDate: Date = Date.valueOf("2018-05-01")): DataFrame = {
+ import sparkSession.implicits._
+ sparkSession.sparkContext.parallelize(1 to numOrders, 4)
+ .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1)
+ }.toDS().toDF()
+ }
+
+ def generateDailyCDC(
+ sparkSession: SparkSession,
+ numUpdatedOrders: Int,
+ startDate: Date,
+ updateDate: Date,
+ newState: Int,
+ numNewOrders: Int
+ ): DataFrame = {
+ import sparkSession.implicits._
+ val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4)
+ .map {x => CDC(s"order$x", s"customer$x", updateDate, newState)
+ }.toDS().toDF()
+ val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4)
+ .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1)
+ }.toDS().toDF()
+ ds1.union(ds2)
+ }
+
+ def main(args: Array[String]): Unit = {
+ import org.apache.spark.sql.CarbonSession._
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val master = Option(System.getProperty("spark.master"))
+ .orElse(sys.env.get("MASTER"))
+ .orElse(Option("local[8]"))
+
+ val spark = SparkSession
+ .builder()
+ .master(master.get)
+ .enableHiveSupport()
+ .config("spark.driver.host", "127.0.0.1")
+ .getOrCreateCarbonSession(storeLocation)
+ spark.sparkContext.setLogLevel("warn")
+
+ spark.sql("drop table if exists dw_order")
+ spark.sql("drop table if exists ods_order")
+
+ // prepare base data for first day
+ val df = generateDataForDay0(
+ sparkSession = spark,
+ numOrders = numOrders,
+ startDate = Date.valueOf("2018-05-01"))
+
+ spark.sql(s"drop table if exists dw_order")
+ df.write
+ .format("carbondata")
+ .option("tableName", "dw_order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ var startDate = Date.valueOf("2018-05-01")
+ var state = 2
+ var updateTime = 0L
+
+ if (printDetail) {
+ println("## day0")
+ spark.sql("select * from dw_order").show(100, false)
+ }
+
+ for (i <- 1 to numDays) {
+ // prepare for incremental update data for day-i
+ val newDate = new Date(DateUtils.addDays(startDate, 1).getTime)
+ val cdc = generateDailyCDC(
+ sparkSession = spark,
+ numUpdatedOrders = numUpdateOrdersDaily,
+ startDate = startDate,
+ updateDate = newDate,
+ newState = state,
+ numNewOrders = newNewOrdersDaily)
+ cdc.write
+ .format("carbondata")
+ .option("tableName", "ods_order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ if (printDetail) {
+ println(s"day$i CDC")
+ spark.sql("select * from ods_order").show(100, false)
+ }
+
+ // update dw table using CDC data
+ val start = System.nanoTime()
+ hive_solution(spark)
+ // carbon_solution(spark)
--- End diff --
Why is carbon_solution commented, if either hive_solution or carbon_solution is tested once, better to add a flag to control in "args: Array[String]"
---
[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3066#discussion_r247119761
--- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import java.io.File
+import java.sql.Date
+
+import org.apache.commons.lang3.time.DateUtils
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+/**
+ * Benchmark for Change Data Capture scenario.
+ * This test simulates updates to history table using CDC table.
+ *
+ * The benchmark shows performance of two update methods:
+ * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse.
+ * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly.
+ *
+ * When running in a 8-cores laptop, the benchmark shows:
+ *
+ * 1. test one
+ * History table 1M records, update 10K records everyday and insert 10K records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 13,516 ms
+ * carbon_solution: total process time takes 7,521 ms
+ *
+ *
+ * 2. test two
+ * History table 10M records, update 10K records everyday and insert 10K records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 104,250 ms
+ * carbon_solution: total process time takes 17,384 ms
+ *
+ */
+object CDCBenchmark {
+
+ // Schema for history table
+ // Table name: dw_order
+ // +-------------+-----------+-------------+
+ // | Column name | Data type | Cardinality |
+ // +-------------+-----------+-------------+
+ // | order_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | customer_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | start_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | end_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | state | int | 4 |
+ // +-------------+-----------+-------------+
+ case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date,
+ state: Int)
+
+ // Schema for CDC data which is used for update to history table every day
+ // Table name: ods_order
+ // +-------------+-----------+-------------+
+ // | Column name | Data type | Cardinality |
+ // +-------------+-----------+-------------+
+ // | order_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | customer_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | update_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | state | int | 4 |
+ // +-------------+-----------+-------------+
+ case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int)
+
+ // number of records for first day
+ val numOrders = 10000000
+
+ // number of records to update every day
+ val numUpdateOrdersDaily = 10000
+
+ // number of new records to insert every day
+ val newNewOrdersDaily = 10000
+
+ // number of days to simulate
+ val numDays = 3
+
+ // print eveyday result or not to console
+ val printDetail = false
+
+ def generateDataForDay0(
+ sparkSession: SparkSession,
+ numOrders: Int = 1000000,
+ startDate: Date = Date.valueOf("2018-05-01")): DataFrame = {
+ import sparkSession.implicits._
+ sparkSession.sparkContext.parallelize(1 to numOrders, 4)
+ .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1)
+ }.toDS().toDF()
+ }
+
+ def generateDailyCDC(
+ sparkSession: SparkSession,
+ numUpdatedOrders: Int,
+ startDate: Date,
+ updateDate: Date,
+ newState: Int,
+ numNewOrders: Int
+ ): DataFrame = {
+ import sparkSession.implicits._
+ val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4)
+ .map {x => CDC(s"order$x", s"customer$x", updateDate, newState)
+ }.toDS().toDF()
+ val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4)
+ .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1)
+ }.toDS().toDF()
+ ds1.union(ds2)
+ }
+
+ def main(args: Array[String]): Unit = {
+ import org.apache.spark.sql.CarbonSession._
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val master = Option(System.getProperty("spark.master"))
+ .orElse(sys.env.get("MASTER"))
+ .orElse(Option("local[8]"))
+
+ val spark = SparkSession
+ .builder()
+ .master(master.get)
+ .enableHiveSupport()
+ .config("spark.driver.host", "127.0.0.1")
+ .getOrCreateCarbonSession(storeLocation)
+ spark.sparkContext.setLogLevel("warn")
+
+ spark.sql("drop table if exists dw_order")
+ spark.sql("drop table if exists ods_order")
+
+ // prepare base data for first day
+ val df = generateDataForDay0(
+ sparkSession = spark,
+ numOrders = numOrders,
+ startDate = Date.valueOf("2018-05-01"))
+
+ spark.sql(s"drop table if exists dw_order")
+ df.write
+ .format("carbondata")
+ .option("tableName", "dw_order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ var startDate = Date.valueOf("2018-05-01")
+ var state = 2
+ var updateTime = 0L
+
+ if (printDetail) {
+ println("## day0")
+ spark.sql("select * from dw_order").show(100, false)
+ }
+
+ for (i <- 1 to numDays) {
+ // prepare for incremental update data for day-i
+ val newDate = new Date(DateUtils.addDays(startDate, 1).getTime)
+ val cdc = generateDailyCDC(
+ sparkSession = spark,
+ numUpdatedOrders = numUpdateOrdersDaily,
+ startDate = startDate,
+ updateDate = newDate,
+ newState = state,
+ numNewOrders = newNewOrdersDaily)
+ cdc.write
+ .format("carbondata")
+ .option("tableName", "ods_order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ if (printDetail) {
+ println(s"day$i CDC")
+ spark.sql("select * from ods_order").show(100, false)
+ }
+
+ // update dw table using CDC data
+ val start = System.nanoTime()
+ hive_solution(spark)
+ // carbon_solution(spark)
+ val end = System.nanoTime()
+ updateTime += end - start
+
+ if (printDetail) {
+ println(s"day$i result")
+ spark.sql("select * from dw_order").show(100, false)
+ }
+
+ startDate = newDate
+ state = state + 1
+ }
+
+ println(s"simulated $numDays days, total process time takes ${updateTime / 1000 / 1000} ms")
+ spark.close()
+ }
+
+ /**
+ * Typical solution when using hive
+ * This solution uses INSERT OVERWRITE to rewrite the whole table every day
+ */
+ private def hive_solution(spark: SparkSession) = {
+ spark.sql(
+ """
+ | insert overwrite table dw_order
--- End diff --
Maybe it is better to create a partition table on date column and insert overwrite only on the partition would be better.
---
[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/3066
> i think the query performance of carbon_solution is lower than hive_solution's, because carbon_solution has more segment (insert generates a segment and update generates more segment)
> Do we have some method to optimize this?
Since we are updating the existing data it creates extra files like delete delta and incremental carbondata files It may degrade query performance a little but when do the compaction it will get improved.
---
[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3066
Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10516/
---
[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...
Posted by qiuchenjian <gi...@git.apache.org>.
Github user qiuchenjian commented on the issue:
https://github.com/apache/carbondata/pull/3066
i think the query performance of carbon_solution is lower than hive_solution's, because carbon_solution has more segment (insert generates a segment and update generates more segment)
Do we have some method to optimize this?
---
[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...
Posted by qiuchenjian <gi...@git.apache.org>.
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3066#discussion_r246995496
--- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import java.io.File
+import java.sql.Date
+
+import org.apache.commons.lang3.time.DateUtils
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+/**
+ * Benchmark for Change Data Capture scenario.
+ * This test simulates updates to history table using CDC table.
+ *
+ * The benchmark shows performance of two update methods:
+ * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse.
+ * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly.
+ *
+ * When running in a 8-cores laptop, the benchmark shows:
+ *
+ * 1. test one
+ * History table 1M records, update 10K records everyday and insert 10K records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 13,516 ms
+ * carbon_solution: total process time takes 7,521 ms
+ *
+ *
+ * 2. test two
+ * History table 10M records, update 10K records everyday and insert 10K records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 104,250 ms
+ * carbon_solution: total process time takes 17,384 ms
+ *
+ */
+object CDCBenchmark {
+
+ // Schema for history table
+ // Table name: dw_order
+ // +-------------+-----------+-------------+
+ // | Column name | Data type | Cardinality |
+ // +-------------+-----------+-------------+
+ // | order_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | customer_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | start_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | end_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | state | int | 4 |
+ // +-------------+-----------+-------------+
+ case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date,
+ state: Int)
+
+ // Schema for CDC data which is used for update to history table every day
+ // Table name: ods_order
+ // +-------------+-----------+-------------+
+ // | Column name | Data type | Cardinality |
+ // +-------------+-----------+-------------+
+ // | order_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | customer_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | update_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | state | int | 4 |
+ // +-------------+-----------+-------------+
+ case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int)
+
+ // number of records for first day
+ val numOrders = 10000000
+
+ // number of records to update every day
+ val numUpdateOrdersDaily = 10000
+
+ // number of new records to insert every day
+ val newNewOrdersDaily = 10000
+
+ // number of days to simulate
+ val numDays = 3
+
+ // print eveyday result or not to console
+ val printDetail = false
+
+ def generateDataForDay0(
+ sparkSession: SparkSession,
+ numOrders: Int = 1000000,
+ startDate: Date = Date.valueOf("2018-05-01")): DataFrame = {
+ import sparkSession.implicits._
+ sparkSession.sparkContext.parallelize(1 to numOrders, 4)
+ .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1)
+ }.toDS().toDF()
+ }
+
+ def generateDailyCDC(
+ sparkSession: SparkSession,
+ numUpdatedOrders: Int,
+ startDate: Date,
+ updateDate: Date,
+ newState: Int,
+ numNewOrders: Int
+ ): DataFrame = {
+ import sparkSession.implicits._
+ val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4)
+ .map {x => CDC(s"order$x", s"customer$x", updateDate, newState)
+ }.toDS().toDF()
+ val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4)
+ .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1)
+ }.toDS().toDF()
+ ds1.union(ds2)
+ }
+
+ def main(args: Array[String]): Unit = {
+ import org.apache.spark.sql.CarbonSession._
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val master = Option(System.getProperty("spark.master"))
+ .orElse(sys.env.get("MASTER"))
+ .orElse(Option("local[8]"))
+
+ val spark = SparkSession
+ .builder()
+ .master(master.get)
+ .enableHiveSupport()
+ .config("spark.driver.host", "127.0.0.1")
+ .getOrCreateCarbonSession(storeLocation)
+ spark.sparkContext.setLogLevel("warn")
+
+ spark.sql("drop table if exists dw_order")
+ spark.sql("drop table if exists ods_order")
+
+ // prepare base data for first day
+ val df = generateDataForDay0(
+ sparkSession = spark,
+ numOrders = numOrders,
+ startDate = Date.valueOf("2018-05-01"))
+
+ spark.sql(s"drop table if exists dw_order")
+ df.write
+ .format("carbondata")
+ .option("tableName", "dw_order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ var startDate = Date.valueOf("2018-05-01")
+ var state = 2
+ var updateTime = 0L
+
+ if (printDetail) {
+ println("## day0")
+ spark.sql("select * from dw_order").show(100, false)
+ }
+
+ for (i <- 1 to numDays) {
+ // prepare for incremental update data for day-i
+ val newDate = new Date(DateUtils.addDays(startDate, 1).getTime)
+ val cdc = generateDailyCDC(
+ sparkSession = spark,
+ numUpdatedOrders = numUpdateOrdersDaily,
+ startDate = startDate,
+ updateDate = newDate,
+ newState = state,
+ numNewOrders = newNewOrdersDaily)
+ cdc.write
+ .format("carbondata")
+ .option("tableName", "ods_order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ if (printDetail) {
+ println(s"day$i CDC")
+ spark.sql("select * from ods_order").show(100, false)
+ }
+
+ // update dw table using CDC data
+ val start = System.nanoTime()
+ hive_solution(spark)
+ // carbon_solution(spark)
--- End diff --
better to add a variable to run hive_solution or carbon_solution separately, using comment is confused to others
---
[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3066
Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2477/
---