You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2019/01/10 16:57:10 UTC

[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...

GitHub user jackylk opened a pull request:

    https://github.com/apache/carbondata/pull/3066

    [CARBONDATA-3244] Add benchmark for Change Data Capture scenario

    CDC (change data capture) is a common scenario for analyzing slowly changed table in data warehouse.
    It is good to add benchmark test comparing two update methods:
    1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse.
    2. carbon_solution, which uses CarbonData's update syntax to update the history table directly.
    
    This test simulates updates to history table using CDC table.
    When running in a 8-cores laptop, the benchmark shows:
    1. test one
    History table 1M records, update 10K records everyday and insert 10K records everyday, simulated 3 days.
    hive_solution: total process time takes 13,516 ms
    carbon_solution: total process time takes 7,521 ms
    
    
    2. test two
    History table 10M records, update 10K records everyday and insert 10K records everyday,
    simulated 3 days.
    hive_solution: total process time takes 104,250 ms
    carbon_solution: total process time takes 17,384 ms
    
     - [X] Any interfaces changed?
     No
     - [X] Any backward compatibility impacted?
     No
     - [X] Document update required?
    No
     - [X] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
      Only example is added     
     - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    NA


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jackylk/incubator-carbondata cdc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/3066.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3066
    
----
commit ebb5ef79ac85a6c736496fe19f719bfed74902c1
Author: Jacky Li <ja...@...>
Date:   2019-01-10T16:44:58Z

    add benchmark for Change Data Capture scenario

----


---

[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3066
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2258/



---

[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...

Posted by qiuchenjian <gi...@git.apache.org>.
Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3066#discussion_r246977629
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.benchmark
    +
    +import java.io.File
    +import java.sql.Date
    +
    +import org.apache.commons.lang3.time.DateUtils
    +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    +
    +/**
    + * Benchmark for Change Data Capture scenario.
    + * This test simulates updates to history table using CDC table.
    + *
    + * The benchmark shows performance of two update methods:
    + * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse.
    + * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly.
    + *
    + * When running in a 8-cores laptop, the benchmark shows:
    + *
    + * 1. test one
    + * History table 1M records, update 10K records everyday and insert 10K records everyday,
    + * simulated 3 days.
    + * hive_solution: total process time takes 13,516 ms
    + * carbon_solution: total process time takes 7,521 ms
    + *
    + *
    + * 2. test two
    + * History table 10M records, update 10K records everyday and insert 10K records everyday,
    + * simulated 3 days.
    + * hive_solution: total process time takes 104,250 ms
    + * carbon_solution: total process time takes 17,384 ms
    + *
    + */
    +object CDCBenchmark {
    +
    +  // Schema for history table
    +  // Table name: dw_order
    +  // +-------------+-----------+-------------+
    +  // | Column name | Data type | Cardinality |
    +  // +-------------+-----------+-------------+
    +  // | order_id    | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | customer_id | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | start_date  | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | end_date    | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | state       | int       | 4           |
    +  // +-------------+-----------+-------------+
    +  case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date,
    +      state: Int)
    +
    +  // Schema for CDC data which is used for update to history table every day
    +  // Table name: ods_order
    +  // +-------------+-----------+-------------+
    +  // | Column name | Data type | Cardinality |
    +  // +-------------+-----------+-------------+
    +  // | order_id    | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | customer_id | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | update_date | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | state       | int       | 4           |
    +  // +-------------+-----------+-------------+
    +  case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int)
    +
    +  // number of records for first day
    +  val numOrders = 10000000
    +
    +  // number of records to update every day
    +  val numUpdateOrdersDaily = 10000
    +
    +  // number of new records to insert every day
    +  val newNewOrdersDaily = 10000
    +
    +  // number of days to simulate
    +  val numDays = 3
    +
    +  // print eveyday result or not to console
    +  val printDetail = false
    +
    +  def generateDataForDay0(
    +      sparkSession: SparkSession,
    +      numOrders: Int = 1000000,
    +      startDate: Date = Date.valueOf("2018-05-01")): DataFrame = {
    +    import sparkSession.implicits._
    +    sparkSession.sparkContext.parallelize(1 to numOrders, 4)
    +      .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1)
    +      }.toDS().toDF()
    +  }
    +
    +  def generateDailyCDC(
    +      sparkSession: SparkSession,
    +      numUpdatedOrders: Int,
    +      startDate: Date,
    +      updateDate: Date,
    +      newState: Int,
    +      numNewOrders: Int
    +      ): DataFrame = {
    +    import sparkSession.implicits._
    +    val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4)
    +      .map {x => CDC(s"order$x", s"customer$x", updateDate, newState)
    +      }.toDS().toDF()
    +    val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4)
    +      .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1)
    +      }.toDS().toDF()
    +    ds1.union(ds2)
    +  }
    +
    +  def main(args: Array[String]): Unit = {
    +    import org.apache.spark.sql.CarbonSession._
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +                            + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val master = Option(System.getProperty("spark.master"))
    +      .orElse(sys.env.get("MASTER"))
    +      .orElse(Option("local[8]"))
    +
    +    val spark = SparkSession
    +      .builder()
    +      .master(master.get)
    +      .enableHiveSupport()
    +      .config("spark.driver.host", "127.0.0.1")
    +      .getOrCreateCarbonSession(storeLocation)
    +    spark.sparkContext.setLogLevel("warn")
    +
    +    spark.sql("drop table if exists dw_order")
    +    spark.sql("drop table if exists ods_order")
    +
    +    // prepare base data for first day
    +    val df = generateDataForDay0(
    +      sparkSession = spark,
    +      numOrders = numOrders,
    +      startDate = Date.valueOf("2018-05-01"))
    +
    +    spark.sql(s"drop table if exists dw_order")
    +    df.write
    +      .format("carbondata")
    +      .option("tableName", "dw_order")
    +      .mode(SaveMode.Overwrite)
    +      .save()
    +
    +    var startDate = Date.valueOf("2018-05-01")
    +    var state = 2
    +    var updateTime = 0L
    +
    +    if (printDetail) {
    +      println("## day0")
    +      spark.sql("select * from dw_order").show(100, false)
    +    }
    +
    +    for (i <- 1 to numDays) {
    +      // prepare for incremental update data for day-i
    +      val newDate = new Date(DateUtils.addDays(startDate, 1).getTime)
    +      val cdc = generateDailyCDC(
    +        sparkSession = spark,
    +        numUpdatedOrders = numUpdateOrdersDaily,
    +        startDate = startDate,
    +        updateDate = newDate,
    +        newState = state,
    +        numNewOrders = newNewOrdersDaily)
    +      cdc.write
    +        .format("carbondata")
    +        .option("tableName", "ods_order")
    +        .mode(SaveMode.Overwrite)
    +        .save()
    +
    +      if (printDetail) {
    +        println(s"day$i CDC")
    +        spark.sql("select * from ods_order").show(100, false)
    +      }
    +
    +      // update dw table using CDC data
    +      val start = System.nanoTime()
    +      hive_solution(spark)
    +      // carbon_solution(spark)
    --- End diff --
    
    Why is carbon_solution commented, if either hive_solution or carbon_solution is tested once, better to add a flag to control in "args: Array[String]"


---

[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3066#discussion_r247119761
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.benchmark
    +
    +import java.io.File
    +import java.sql.Date
    +
    +import org.apache.commons.lang3.time.DateUtils
    +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    +
    +/**
    + * Benchmark for Change Data Capture scenario.
    + * This test simulates updates to history table using CDC table.
    + *
    + * The benchmark shows performance of two update methods:
    + * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse.
    + * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly.
    + *
    + * When running in a 8-cores laptop, the benchmark shows:
    + *
    + * 1. test one
    + * History table 1M records, update 10K records everyday and insert 10K records everyday,
    + * simulated 3 days.
    + * hive_solution: total process time takes 13,516 ms
    + * carbon_solution: total process time takes 7,521 ms
    + *
    + *
    + * 2. test two
    + * History table 10M records, update 10K records everyday and insert 10K records everyday,
    + * simulated 3 days.
    + * hive_solution: total process time takes 104,250 ms
    + * carbon_solution: total process time takes 17,384 ms
    + *
    + */
    +object CDCBenchmark {
    +
    +  // Schema for history table
    +  // Table name: dw_order
    +  // +-------------+-----------+-------------+
    +  // | Column name | Data type | Cardinality |
    +  // +-------------+-----------+-------------+
    +  // | order_id    | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | customer_id | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | start_date  | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | end_date    | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | state       | int       | 4           |
    +  // +-------------+-----------+-------------+
    +  case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date,
    +      state: Int)
    +
    +  // Schema for CDC data which is used for update to history table every day
    +  // Table name: ods_order
    +  // +-------------+-----------+-------------+
    +  // | Column name | Data type | Cardinality |
    +  // +-------------+-----------+-------------+
    +  // | order_id    | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | customer_id | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | update_date | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | state       | int       | 4           |
    +  // +-------------+-----------+-------------+
    +  case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int)
    +
    +  // number of records for first day
    +  val numOrders = 10000000
    +
    +  // number of records to update every day
    +  val numUpdateOrdersDaily = 10000
    +
    +  // number of new records to insert every day
    +  val newNewOrdersDaily = 10000
    +
    +  // number of days to simulate
    +  val numDays = 3
    +
    +  // print eveyday result or not to console
    +  val printDetail = false
    +
    +  def generateDataForDay0(
    +      sparkSession: SparkSession,
    +      numOrders: Int = 1000000,
    +      startDate: Date = Date.valueOf("2018-05-01")): DataFrame = {
    +    import sparkSession.implicits._
    +    sparkSession.sparkContext.parallelize(1 to numOrders, 4)
    +      .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1)
    +      }.toDS().toDF()
    +  }
    +
    +  def generateDailyCDC(
    +      sparkSession: SparkSession,
    +      numUpdatedOrders: Int,
    +      startDate: Date,
    +      updateDate: Date,
    +      newState: Int,
    +      numNewOrders: Int
    +      ): DataFrame = {
    +    import sparkSession.implicits._
    +    val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4)
    +      .map {x => CDC(s"order$x", s"customer$x", updateDate, newState)
    +      }.toDS().toDF()
    +    val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4)
    +      .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1)
    +      }.toDS().toDF()
    +    ds1.union(ds2)
    +  }
    +
    +  def main(args: Array[String]): Unit = {
    +    import org.apache.spark.sql.CarbonSession._
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +                            + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val master = Option(System.getProperty("spark.master"))
    +      .orElse(sys.env.get("MASTER"))
    +      .orElse(Option("local[8]"))
    +
    +    val spark = SparkSession
    +      .builder()
    +      .master(master.get)
    +      .enableHiveSupport()
    +      .config("spark.driver.host", "127.0.0.1")
    +      .getOrCreateCarbonSession(storeLocation)
    +    spark.sparkContext.setLogLevel("warn")
    +
    +    spark.sql("drop table if exists dw_order")
    +    spark.sql("drop table if exists ods_order")
    +
    +    // prepare base data for first day
    +    val df = generateDataForDay0(
    +      sparkSession = spark,
    +      numOrders = numOrders,
    +      startDate = Date.valueOf("2018-05-01"))
    +
    +    spark.sql(s"drop table if exists dw_order")
    +    df.write
    +      .format("carbondata")
    +      .option("tableName", "dw_order")
    +      .mode(SaveMode.Overwrite)
    +      .save()
    +
    +    var startDate = Date.valueOf("2018-05-01")
    +    var state = 2
    +    var updateTime = 0L
    +
    +    if (printDetail) {
    +      println("## day0")
    +      spark.sql("select * from dw_order").show(100, false)
    +    }
    +
    +    for (i <- 1 to numDays) {
    +      // prepare for incremental update data for day-i
    +      val newDate = new Date(DateUtils.addDays(startDate, 1).getTime)
    +      val cdc = generateDailyCDC(
    +        sparkSession = spark,
    +        numUpdatedOrders = numUpdateOrdersDaily,
    +        startDate = startDate,
    +        updateDate = newDate,
    +        newState = state,
    +        numNewOrders = newNewOrdersDaily)
    +      cdc.write
    +        .format("carbondata")
    +        .option("tableName", "ods_order")
    +        .mode(SaveMode.Overwrite)
    +        .save()
    +
    +      if (printDetail) {
    +        println(s"day$i CDC")
    +        spark.sql("select * from ods_order").show(100, false)
    +      }
    +
    +      // update dw table using CDC data
    +      val start = System.nanoTime()
    +      hive_solution(spark)
    +      // carbon_solution(spark)
    +      val end = System.nanoTime()
    +      updateTime += end - start
    +
    +      if (printDetail) {
    +        println(s"day$i result")
    +        spark.sql("select * from dw_order").show(100, false)
    +      }
    +
    +      startDate = newDate
    +      state = state + 1
    +    }
    +
    +    println(s"simulated $numDays days, total process time takes ${updateTime / 1000 / 1000} ms")
    +    spark.close()
    +  }
    +
    +  /**
    +   * Typical solution when using hive
    +   * This solution uses INSERT OVERWRITE to rewrite the whole table every day
    +   */
    +  private def hive_solution(spark: SparkSession) = {
    +    spark.sql(
    +      """
    +        | insert overwrite table dw_order
    --- End diff --
    
    Maybe it is better to create a partition table on date column and insert overwrite only on the partition would be better.


---

[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/3066
  
    > i think the query performance of carbon_solution is lower than hive_solution's, because carbon_solution has more segment (insert generates a segment and update generates more segment)
    > Do we have some method to optimize this?
    
    Since we are updating the existing data it creates extra files like delete delta and incremental carbondata files It may degrade query performance a little but when do the compaction it will get improved.


---

[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3066
  
    Build Failed  with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10516/



---

[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...

Posted by qiuchenjian <gi...@git.apache.org>.
Github user qiuchenjian commented on the issue:

    https://github.com/apache/carbondata/pull/3066
  
    i think the query performance of carbon_solution is lower than hive_solution's, because carbon_solution has more segment (insert generates a segment and update generates more segment)
    Do we have some method to optimize this?


---

[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...

Posted by qiuchenjian <gi...@git.apache.org>.
Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3066#discussion_r246995496
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.benchmark
    +
    +import java.io.File
    +import java.sql.Date
    +
    +import org.apache.commons.lang3.time.DateUtils
    +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    +
    +/**
    + * Benchmark for Change Data Capture scenario.
    + * This test simulates updates to history table using CDC table.
    + *
    + * The benchmark shows performance of two update methods:
    + * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse.
    + * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly.
    + *
    + * When running in a 8-cores laptop, the benchmark shows:
    + *
    + * 1. test one
    + * History table 1M records, update 10K records everyday and insert 10K records everyday,
    + * simulated 3 days.
    + * hive_solution: total process time takes 13,516 ms
    + * carbon_solution: total process time takes 7,521 ms
    + *
    + *
    + * 2. test two
    + * History table 10M records, update 10K records everyday and insert 10K records everyday,
    + * simulated 3 days.
    + * hive_solution: total process time takes 104,250 ms
    + * carbon_solution: total process time takes 17,384 ms
    + *
    + */
    +object CDCBenchmark {
    +
    +  // Schema for history table
    +  // Table name: dw_order
    +  // +-------------+-----------+-------------+
    +  // | Column name | Data type | Cardinality |
    +  // +-------------+-----------+-------------+
    +  // | order_id    | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | customer_id | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | start_date  | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | end_date    | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | state       | int       | 4           |
    +  // +-------------+-----------+-------------+
    +  case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date,
    +      state: Int)
    +
    +  // Schema for CDC data which is used for update to history table every day
    +  // Table name: ods_order
    +  // +-------------+-----------+-------------+
    +  // | Column name | Data type | Cardinality |
    +  // +-------------+-----------+-------------+
    +  // | order_id    | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | customer_id | string    | 10,000,000  |
    +  // +-------------+-----------+-------------+
    +  // | update_date | date      | NA          |
    +  // +-------------+-----------+-------------+
    +  // | state       | int       | 4           |
    +  // +-------------+-----------+-------------+
    +  case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int)
    +
    +  // number of records for first day
    +  val numOrders = 10000000
    +
    +  // number of records to update every day
    +  val numUpdateOrdersDaily = 10000
    +
    +  // number of new records to insert every day
    +  val newNewOrdersDaily = 10000
    +
    +  // number of days to simulate
    +  val numDays = 3
    +
    +  // print eveyday result or not to console
    +  val printDetail = false
    +
    +  def generateDataForDay0(
    +      sparkSession: SparkSession,
    +      numOrders: Int = 1000000,
    +      startDate: Date = Date.valueOf("2018-05-01")): DataFrame = {
    +    import sparkSession.implicits._
    +    sparkSession.sparkContext.parallelize(1 to numOrders, 4)
    +      .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("9999-01-01"), 1)
    +      }.toDS().toDF()
    +  }
    +
    +  def generateDailyCDC(
    +      sparkSession: SparkSession,
    +      numUpdatedOrders: Int,
    +      startDate: Date,
    +      updateDate: Date,
    +      newState: Int,
    +      numNewOrders: Int
    +      ): DataFrame = {
    +    import sparkSession.implicits._
    +    val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4)
    +      .map {x => CDC(s"order$x", s"customer$x", updateDate, newState)
    +      }.toDS().toDF()
    +    val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4)
    +      .map {x => CDC(s"newOrder${System.currentTimeMillis()}", s"customer$x", updateDate, 1)
    +      }.toDS().toDF()
    +    ds1.union(ds2)
    +  }
    +
    +  def main(args: Array[String]): Unit = {
    +    import org.apache.spark.sql.CarbonSession._
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +                            + "../../../..").getCanonicalPath
    +    val storeLocation = s"$rootPath/examples/spark2/target/store"
    +    val master = Option(System.getProperty("spark.master"))
    +      .orElse(sys.env.get("MASTER"))
    +      .orElse(Option("local[8]"))
    +
    +    val spark = SparkSession
    +      .builder()
    +      .master(master.get)
    +      .enableHiveSupport()
    +      .config("spark.driver.host", "127.0.0.1")
    +      .getOrCreateCarbonSession(storeLocation)
    +    spark.sparkContext.setLogLevel("warn")
    +
    +    spark.sql("drop table if exists dw_order")
    +    spark.sql("drop table if exists ods_order")
    +
    +    // prepare base data for first day
    +    val df = generateDataForDay0(
    +      sparkSession = spark,
    +      numOrders = numOrders,
    +      startDate = Date.valueOf("2018-05-01"))
    +
    +    spark.sql(s"drop table if exists dw_order")
    +    df.write
    +      .format("carbondata")
    +      .option("tableName", "dw_order")
    +      .mode(SaveMode.Overwrite)
    +      .save()
    +
    +    var startDate = Date.valueOf("2018-05-01")
    +    var state = 2
    +    var updateTime = 0L
    +
    +    if (printDetail) {
    +      println("## day0")
    +      spark.sql("select * from dw_order").show(100, false)
    +    }
    +
    +    for (i <- 1 to numDays) {
    +      // prepare for incremental update data for day-i
    +      val newDate = new Date(DateUtils.addDays(startDate, 1).getTime)
    +      val cdc = generateDailyCDC(
    +        sparkSession = spark,
    +        numUpdatedOrders = numUpdateOrdersDaily,
    +        startDate = startDate,
    +        updateDate = newDate,
    +        newState = state,
    +        numNewOrders = newNewOrdersDaily)
    +      cdc.write
    +        .format("carbondata")
    +        .option("tableName", "ods_order")
    +        .mode(SaveMode.Overwrite)
    +        .save()
    +
    +      if (printDetail) {
    +        println(s"day$i CDC")
    +        spark.sql("select * from ods_order").show(100, false)
    +      }
    +
    +      // update dw table using CDC data
    +      val start = System.nanoTime()
    +      hive_solution(spark)
    +      // carbon_solution(spark)
    --- End diff --
    
    better to add a variable to run hive_solution or carbon_solution separately, using comment is confused to others


---

[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3066
  
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2477/



---