You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by QiangCai <gi...@git.apache.org> on 2018/03/26 07:23:58 UTC
[GitHub] carbondata pull request #2084: [CARBONDATA-1522] Support preaggregate table ...
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2084#discussion_r176998393
--- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---
@@ -249,6 +251,91 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
assertResult(exceptedRow)(row)
}
+ test("test preaggregate table creation on streaming table without handoff") {
+ val identifier = new TableIdentifier("agg_table", Option("streaming"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+ // streaming ingest 10 rows
+ generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+ val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+ identifier)
+ thread.start()
+ Thread.sleep(2000)
+ generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+ Thread.sleep(5000)
+ thread.interrupt()
+ checkAnswer(
+ sql("select count(*) from streaming.agg_table"),
+ Seq(Row(10)))
+ sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary) from agg_table group by name")
+ // No data should be loaded into aggregate table as hand-off is not yet fired
+ checkAnswer(sql("select * from agg_table_p1"), Seq())
+ }
+
+ test("test if data is loaded into preaggregate after handoff is fired") {
+ createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+ val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+ // streaming ingest 10 rows
+ generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+ val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+ identifier)
+ thread.start()
+ Thread.sleep(2000)
+ generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+ Thread.sleep(5000)
+ thread.interrupt()
+ checkAnswer(
+ sql("select count(*) from streaming.agg_table2"),
+ Seq(Row(10)))
+ sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
+ sql("alter table agg_table2 finish streaming")
+ sql("alter table agg_table2 compact 'streaming'")
+ // Data should be loaded into aggregate table as hand-off is fired
+ checkAnswer(sql("select * from agg_table2_p1"),
+ Seq(
+ Row("name_10", 200000.0),
+ Row("name_11", 220000.0),
+ Row("name_12", 240000.0),
+ Row("name_13", 260000.0),
+ Row("name_14", 280000.0)))
+ sql("drop table agg_table2")
+ }
+
+ test("test if data is loaded in aggregate table after handoff is done for streaming table") {
+ createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false)
+ val identifier = new TableIdentifier("agg_table3", Option("streaming"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+ // streaming ingest 10 rows
+ generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+ val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+ identifier)
+ thread.start()
+ Thread.sleep(2000)
+ generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+ Thread.sleep(5000)
+ thread.interrupt()
+ checkAnswer(
+ sql("select count(*) from streaming.agg_table3"),
+ Seq(Row(10)))
+ sql("alter table agg_table3 finish streaming")
+ sql("alter table agg_table3 compact 'streaming'")
+ sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary) from agg_table3 group by name")
+ // Data should be loaded into aggregate table as hand-off is fired
+ checkAnswer(sql("select * from agg_table3_p1"),
+ Seq(
+ Row("name_10", 200000.0),
+ Row("name_11", 220000.0),
+ Row("name_12", 240000.0),
+ Row("name_13", 260000.0),
+ Row("name_14", 280000.0)))
+ }
+
--- End diff --
@manishgupta88
I will try to reduce the run time.
---