You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by QiangCai <gi...@git.apache.org> on 2018/03/26 07:23:58 UTC

[GitHub] carbondata pull request #2084: [CARBONDATA-1522] Support preaggregate table ...

Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2084#discussion_r176998393
  
    --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---
    @@ -249,6 +251,91 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         assertResult(exceptedRow)(row)
       }
     
    +  test("test preaggregate table creation on streaming table without handoff") {
    +    val identifier = new TableIdentifier("agg_table", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table"),
    +      Seq(Row(10)))
    +    sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary) from agg_table group by name")
    +    // No data should be loaded into aggregate table as hand-off is not yet fired
    +    checkAnswer(sql("select * from agg_table_p1"), Seq())
    +  }
    +
    +  test("test if data is loaded into preaggregate after handoff is fired") {
    +    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
    +    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table2"),
    +      Seq(Row(10)))
    +    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
    +    sql("alter table agg_table2 finish streaming")
    +    sql("alter table agg_table2 compact 'streaming'")
    +    // Data should be loaded into aggregate table as hand-off is fired
    +    checkAnswer(sql("select * from agg_table2_p1"),
    +      Seq(
    +        Row("name_10", 200000.0),
    +        Row("name_11", 220000.0),
    +        Row("name_12", 240000.0),
    +        Row("name_13", 260000.0),
    +        Row("name_14", 280000.0)))
    +    sql("drop table agg_table2")
    +  }
    +
    +  test("test if data is loaded in aggregate table after handoff is done for streaming table") {
    +    createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false)
    +    val identifier = new TableIdentifier("agg_table3", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table3"),
    +      Seq(Row(10)))
    +    sql("alter table agg_table3 finish streaming")
    +    sql("alter table agg_table3 compact 'streaming'")
    +    sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary) from agg_table3 group by name")
    +    // Data should be loaded into aggregate table as hand-off is fired
    +    checkAnswer(sql("select * from agg_table3_p1"),
    +      Seq(
    +        Row("name_10", 200000.0),
    +        Row("name_11", 220000.0),
    +        Row("name_12", 240000.0),
    +        Row("name_13", 260000.0),
    +        Row("name_14", 280000.0)))
    +  }
    +
    --- End diff --
    
    @manishgupta88 
    I will try to reduce the run time.


---