You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2018/02/03 07:45:50 UTC

carbondata git commit: [CARBONDATA-2098]Add Documentation for Pre-Aggregate tables

Repository: carbondata
Updated Branches:
  refs/heads/master da129d527 -> 71f8828be


[CARBONDATA-2098]Add Documentation for Pre-Aggregate tables

Add Documentation for Pre-Aggregate tables

This closes #1886


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/71f8828b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/71f8828b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/71f8828b

Branch: refs/heads/master
Commit: 71f8828be56ae9f3927a5fc4a5047794a740c6d1
Parents: da129d5
Author: Raghunandan S <ca...@gmail.com>
Authored: Mon Jan 29 08:54:49 2018 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Sat Feb 3 15:45:30 2018 +0800

----------------------------------------------------------------------
 docs/data-management-on-carbondata.md           | 245 +++++++++++++++++++
 .../examples/PreAggregateTableExample.scala     | 145 +++++++++++
 .../TimeSeriesPreAggregateTableExample.scala    | 103 ++++++++
 3 files changed, 493 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/docs/data-management-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md
index 3119935..0b35ed9 100644
--- a/docs/data-management-on-carbondata.md
+++ b/docs/data-management-on-carbondata.md
@@ -25,6 +25,7 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
 * [UPDATE AND DELETE](#update-and-delete)
 * [COMPACTION](#compaction)
 * [PARTITION](#partition)
+* [PRE-AGGREGATE TABLES](#agg-tables)
 * [BUCKETING](#bucketing)
 * [SEGMENT MANAGEMENT](#segment-management)
 
@@ -748,6 +749,250 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
   * The partitioned column can be excluded from SORT_COLUMNS, this will let other columns to do the efficient sorting.
   * When writing SQL on a partition table, try to use filters on the partition column.
 
+## PRE-AGGREGATE TABLES
+  Carbondata supports pre aggregating of data so that OLAP kind of queries can fetch data 
+  much faster.Aggregate tables are created as datamaps so that the handling is as efficient as 
+  other indexing support.Users can create as many aggregate tables they require as datamaps to 
+  improve their query performance,provided the storage requirements and loading speeds are 
+  acceptable.
+  
+  For main table called **sales** which is defined as 
+  
+  ```
+  CREATE TABLE sales (
+  order_time timestamp,
+  user_id string,
+  sex string,
+  country string,
+  quantity int,
+  price bigint)
+  STORED BY 'carbondata'
+  ```
+  
+  user can create pre-aggregate tables using the DDL
+  
+  ```
+  CREATE DATAMAP agg_sales
+  ON TABLE sales
+  USING "preaggregate"
+  AS
+  SELECT country, sex, sum(quantity), avg(price)
+  FROM sales
+  GROUP BY country, sex
+  ```
+  
+<b><p align="left">Functions supported in pre-aggregate tables</p></b>
+
+| Function | Rollup supported |
+|-----------|----------------|
+| SUM | Yes |
+| AVG | Yes |
+| MAX | Yes |
+| MIN | Yes |
+| COUNT | Yes |
+
+
+##### How pre-aggregate tables are selected
+For the main table **sales** and pre-aggregate table **agg_sales** created above, queries of the 
+kind
+```
+SELECT country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex
+
+SELECT sex, sum(quantity) from sales GROUP BY sex
+
+SELECT sum(price), country from sales GROUP BY country
+``` 
+
+will be transformed by Query Planner to fetch data from pre-aggregate table **agg_sales**
+
+But queries of kind
+```
+SELECT user_id, country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex
+
+SELECT sex, avg(quantity) from sales GROUP BY sex
+
+SELECT max(price), country from sales GROUP BY country
+```
+
+will fetch the data from the main table **sales**
+
+##### Loading data to pre-aggregate tables
+For existing table with loaded data, data load to pre-aggregate table will be triggered by the 
+CREATE DATAMAP statement when user creates the pre-aggregate table.
+For incremental loads after aggregates tables are created, loading data to main table triggers 
+the load to pre-aggregate tables once main table loading is complete.These loads are automic 
+meaning that data on main table and aggregate tables are only visible to the user after all tables 
+are loaded
+
+##### Querying data from pre-aggregate tables
+Pre-aggregate tables cannot be queries directly.Queries are to be made on main table.Internally 
+carbondata will check associated pre-aggregate tables with the main table and if the 
+pre-aggregate tables satisfy the query condition, the plan is transformed automatically to use 
+pre-aggregate table to fetch the data
+
+##### Compacting pre-aggregate tables
+Compaction is an optional operation for pre-aggregate table. If compaction is performed on main 
+table but not performed on pre-aggregate table, all queries still can benefit from pre-aggregate 
+table.To further improve performance on pre-aggregate table, compaction can be triggered on 
+pre-aggregate tables directly, it will merge the segments inside pre-aggregation table. 
+To do that, use ALTER TABLE COMPACT command on the pre-aggregate table just like the main table
+
+  NOTE:
+  * If the aggregate function used in the pre-aggregate table creation included distinct-count,
+     during compaction, the pre-aggregate table values are recomputed.This would a costly 
+     operation as compared to the compaction of pre-aggregate tables containing other aggregate 
+     functions alone
+ 
+##### Update/Delete Operations on pre-aggregate tables
+This functionality is not supported.
+
+  NOTE (<b>RESTRICTION</b>):
+  * Update/Delete operations are <b>not supported</b> on main table which has pre-aggregate tables 
+  created on it.All the pre-aggregate tables <b>will have to be dropped</b> before update/delete 
+  operations can be performed on the main table.Pre-aggregate tables can be rebuilt manually 
+  after update/delete operations are completed
+ 
+##### Delete Segment Operations on pre-aggregate tables
+This functionality is not supported.
+
+  NOTE (<b>RESTRICTION</b>):
+  * Delete Segment operations are <b>not supported</b> on main table which has pre-aggregate tables 
+  created on it.All the pre-aggregate tables <b>will have to be dropped</b> before update/delete 
+  operations can be performed on the main table.Pre-aggregate tables can be rebuilt manually 
+  after delete segment operations are completed
+  
+##### Alter Table Operations on pre-aggregate tables
+This functionality is not supported.
+
+  NOTE (<b>RESTRICTION</b>):
+  * Adding new column in new table does not have any affect on pre-aggregate tables. However if 
+  dropping or renaming a column has impact in pre-aggregate table, such operations will be 
+  rejected and error will be thrown.All the pre-aggregate tables <b>will have to be dropped</b> 
+  before Alter Operations can be performed on the main table.Pre-aggregate tables can be rebuilt 
+  manually after Alter Table operations are completed
+  
+### Supporting timeseries data
+Carbondata has built-in understanding of time hierarchy and levels: year, month, day, hour, minute.
+Multiple pre-aggregate tables can be created for the hierarchy and Carbondata can do automatic 
+roll-up for the queries on these hierarchies.
+
+  ```
+  CREATE DATAMAP agg_year
+  ON TABLE sales
+  USING "timeseries"
+  DMPROPERTIES (
+  'event_time’=’order_time’,
+  'year_granualrity’=’1’,
+  ) AS
+  SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+   avg(price) FROM sales GROUP BY order_time, country, sex
+    
+  CREATE DATAMAP agg_month
+  ON TABLE sales
+  USING "timeseries"
+  DMPROPERTIES (
+  'event_time’=’order_time’,
+  'month_granualrity’=’1’,
+  ) AS
+  SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+   avg(price) FROM sales GROUP BY order_time, country, sex
+    
+  CREATE DATAMAP agg_day
+  ON TABLE sales
+  USING "timeseries"
+  DMPROPERTIES (
+  'event_time’=’order_time’,
+  'day_granualrity’=’1’,
+  ) AS
+  SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+   avg(price) FROM sales GROUP BY order_time, country, sex
+        
+  CREATE DATAMAP agg_sales_hour
+  ON TABLE sales
+  USING "timeseries"
+  DMPROPERTIES (
+  'event_time’=’order_time’,
+  'hour_granualrity’=’1’,
+  ) AS
+  SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+   avg(price) FROM sales GROUP BY order_time, country, sex
+  
+  CREATE DATAMAP agg_minute
+  ON TABLE sales
+  USING "timeseries"
+  DMPROPERTIES (
+  'event_time’=’order_time’,
+  'minute_granualrity’=’1’,
+  ) AS
+  SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+   avg(price) FROM sales GROUP BY order_time, country, sex
+    
+  CREATE DATAMAP agg_minute
+  ON TABLE sales
+  USING "timeseries"
+  DMPROPERTIES (
+  'event_time’=’order_time’,
+  'minute_granualrity’=’1’,
+  ) AS
+  SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+   avg(price) FROM sales GROUP BY order_time, country, sex
+  ```
+  
+  For Querying data and automatically roll-up to the desired aggregation level,Carbondata supports 
+  UDF as
+  ```
+  timeseries(timeseries column name, ‘aggregation level’)
+  ```
+  ```
+  Select timeseries(order_time, ‘hour’), sum(quantity) from sales group by timeseries(order_time,
+  ’hour’)
+  ```
+  
+  It is **not necessary** to create pre-aggregate tables for each granularity unless required for 
+  query
+  .Carbondata
+   can roll-up the data and fetch it
+   
+  For Example: For main table **sales** , If pre-aggregate tables were created as  
+  
+  ```
+  CREATE DATAMAP agg_day
+    ON TABLE sales
+    USING "timeseries"
+    DMPROPERTIES (
+    'event_time’=’order_time’,
+    'day_granualrity’=’1’,
+    ) AS
+    SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+     avg(price) FROM sales GROUP BY order_time, country, sex
+          
+    CREATE DATAMAP agg_sales_hour
+    ON TABLE sales
+    USING "timeseries"
+    DMPROPERTIES (
+    'event_time’=’order_time’,
+    'hour_granualrity’=’1’,
+    ) AS
+    SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+     avg(price) FROM sales GROUP BY order_time, country, sex
+  ```
+  
+  Queries like below will be rolled-up and fetched from pre-aggregate tables
+  ```
+  Select timeseries(order_time, ‘month’), sum(quantity) from sales group by timeseries(order_time,
+    ’month’)
+    
+  Select timeseries(order_time, ‘year’), sum(quantity) from sales group by timeseries(order_time,
+    ’year’)
+  ```
+  
+  NOTE (<b>RESTRICTION</b>):
+  * Only value of 1 is supported for hierarchy levels. Other hierarchy levels are not supported. 
+  Other hierarchy levels are not supported
+  * pre-aggregate tables for the desired levels needs to be created one after the other
+  * pre-aggregate tables created for each level needs to be dropped separately 
+    
+
 ## BUCKETING
 
   Bucketing feature can be used to distribute/organize the table/partition data into multiple files such

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala
new file mode 100644
index 0000000..fe3a93d
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.SaveMode
+
+/**
+ * This example is for pre-aggregate tables.
+ */
+
+object PreAggregateTableExample {
+
+  def main(args: Array[String]) {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv"
+    val spark = ExampleUtils.createCarbonSession("PreAggregateTableExample")
+
+    spark.sparkContext.setLogLevel("ERROR")
+
+    // 1. simple usage for Pre-aggregate tables creation and query
+    spark.sql("DROP TABLE IF EXISTS mainTable")
+    spark.sql("""
+                | CREATE TABLE mainTable
+                | (id Int,
+                | name String,
+                | city String,
+                | age Int)
+                | STORED BY 'org.apache.carbondata.format'
+              """.stripMargin)
+
+    spark.sql(s"""
+       LOAD DATA LOCAL INPATH '$testData' into table mainTable
+       """)
+
+    spark.sql(
+      s"""create datamap preagg_sum on table mainTable using 'preaggregate' as
+         | select id,sum(age) from mainTable group by id"""
+        .stripMargin)
+    spark.sql(
+      s"""create datamap preagg_avg on table mainTable using 'preaggregate' as
+         | select id,avg(age) from mainTable group by id"""
+        .stripMargin)
+    spark.sql(
+      s"""create datamap preagg_count on table mainTable using 'preaggregate' as
+         | select id,count(age) from mainTable group by id"""
+        .stripMargin)
+    spark.sql(
+      s"""create datamap preagg_min on table mainTable using 'preaggregate' as
+         | select id,min(age) from mainTable group by id"""
+        .stripMargin)
+    spark.sql(
+      s"""create datamap preagg_max on table mainTable using 'preaggregate' as
+         | select id,max(age) from mainTable group by id"""
+        .stripMargin)
+
+    spark.sql(
+      s"""
+         | SELECT id,max(age)
+         | FROM mainTable group by id
+      """.stripMargin).show()
+
+    // 2.compare the performance : with pre-aggregate VS main table
+
+    // build test data, if set the data is larger than 100M, it will take 10+ mins.
+    import spark.implicits._
+
+    import scala.util.Random
+    val r = new Random()
+    val df = spark.sparkContext.parallelize(1 to 10 * 1000 * 1000)
+      .map(x => ("No." + r.nextInt(100000), "name" + x % 8, "city" + x % 50, x % 60))
+      .toDF("ID", "name", "city", "age")
+
+    // Create table with pre-aggregate table
+    df.write.format("carbondata")
+      .option("tableName", "personTable")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite).save()
+
+    // Create table without pre-aggregate table
+    df.write.format("carbondata")
+      .option("tableName", "personTableWithoutAgg")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite).save()
+
+    // Create pre-aggregate table
+    spark.sql("""
+       CREATE datamap preagg_avg on table personTable using 'preaggregate' as
+       | select id,avg(age) from personTable group by id
+              """.stripMargin)
+
+    // define time function
+    def time(code: => Unit): Double = {
+      val start = System.currentTimeMillis()
+      code
+      // return time in second
+      (System.currentTimeMillis() - start).toDouble / 1000
+    }
+
+    val time_without_aggTable = time {
+      spark.sql(
+        s"""
+           | SELECT id, avg(age)
+           | FROM personTableWithoutAgg group by id
+      """.stripMargin).count()
+    }
+
+    val time_with_aggTable = time {
+      spark.sql(
+        s"""
+           | SELECT id, avg(age)
+           | FROM personTable group by id
+      """.stripMargin).count()
+    }
+    // scalastyle:off
+    println("time for query on table with pre-aggregate table:" + time_with_aggTable.toString)
+    println("time for query on table without pre-aggregate table:" + time_without_aggTable.toString)
+    // scalastyle:on
+
+    spark.sql("DROP TABLE IF EXISTS mainTable")
+    spark.sql("DROP TABLE IF EXISTS personTable")
+    spark.sql("DROP TABLE IF EXISTS personTableWithoutAgg")
+
+    spark.close()
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala
new file mode 100644
index 0000000..470d9ff
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.SaveMode
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * This example is for time series pre-aggregate tables.
+ */
+
+object TimeSeriesPreAggregateTableExample {
+
+  def main(args: Array[String]) {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val testData = s"$rootPath/integration/spark-common-test/src/test/resources/timeseriestest.csv"
+    val spark = ExampleUtils.createCarbonSession("TimeSeriesPreAggregateTableExample")
+
+    spark.sparkContext.setLogLevel("ERROR")
+
+    import spark.implicits._
+
+    import scala.util.Random
+    val r = new Random()
+    val df = spark.sparkContext.parallelize(1 to 10 * 1000 )
+      .map(x => ("" + 20 + "%02d".format(r.nextInt(20)) + "-" + "%02d".format(r.nextInt(11) + 1) +
+        "-" + "%02d".format(r.nextInt(27) + 1) + " " + "%02d".format(r.nextInt(12)) + ":" +
+        "%02d".format(r.nextInt(59)) + ":" + "%02d".format(r.nextInt(59)), "name" + x % 8,
+        r.nextInt(60))).toDF("mytime", "name", "age")
+
+    // 1. usage for time series Pre-aggregate tables creation and query
+    spark.sql("drop table if exists timeSeriesTable")
+    spark.sql("CREATE TABLE timeSeriesTable(mytime timestamp," +
+      " name string, age int) STORED BY 'org.apache.carbondata.format'")
+    spark.sql(
+      s"""
+         | CREATE DATAMAP agg0_hour ON TABLE timeSeriesTable
+         | USING 'timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'HOUR_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM timeSeriesTable
+         | GROUP BY mytime
+       """.stripMargin)
+    spark.sql(
+      s"""
+         | CREATE DATAMAP agg0_day ON TABLE timeSeriesTable
+         | USING 'timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'DAY_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM timeSeriesTable
+         | GROUP BY mytime
+       """.stripMargin)
+
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
+
+    df.write.format("carbondata")
+      .option("tableName", "timeSeriesTable")
+      .option("compress", "true")
+      .mode(SaveMode.Append).save()
+
+    spark.sql(
+      s"""
+         select sum(age), timeseries(mytime,'hour') from timeSeriesTable group by timeseries(mytime,
+         'hour')
+      """.stripMargin).show()
+
+    spark.sql(
+      s"""
+         select avg(age),timeseries(mytime,'year') from timeSeriesTable group by timeseries(mytime,
+         'year')
+      """.stripMargin).show()
+
+    spark.sql("DROP TABLE IF EXISTS timeSeriesTable")
+
+    spark.close()
+
+  }
+}