You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2018/02/03 07:45:50 UTC
carbondata git commit: [CARBONDATA-2098]Add Documentation for
Pre-Aggregate tables
Repository: carbondata
Updated Branches:
refs/heads/master da129d527 -> 71f8828be
[CARBONDATA-2098]Add Documentation for Pre-Aggregate tables
Add Documentation for Pre-Aggregate tables
This closes #1886
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/71f8828b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/71f8828b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/71f8828b
Branch: refs/heads/master
Commit: 71f8828be56ae9f3927a5fc4a5047794a740c6d1
Parents: da129d5
Author: Raghunandan S <ca...@gmail.com>
Authored: Mon Jan 29 08:54:49 2018 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Sat Feb 3 15:45:30 2018 +0800
----------------------------------------------------------------------
docs/data-management-on-carbondata.md | 245 +++++++++++++++++++
.../examples/PreAggregateTableExample.scala | 145 +++++++++++
.../TimeSeriesPreAggregateTableExample.scala | 103 ++++++++
3 files changed, 493 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/docs/data-management-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md
index 3119935..0b35ed9 100644
--- a/docs/data-management-on-carbondata.md
+++ b/docs/data-management-on-carbondata.md
@@ -25,6 +25,7 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
* [UPDATE AND DELETE](#update-and-delete)
* [COMPACTION](#compaction)
* [PARTITION](#partition)
+* [PRE-AGGREGATE TABLES](#agg-tables)
* [BUCKETING](#bucketing)
* [SEGMENT MANAGEMENT](#segment-management)
@@ -748,6 +749,250 @@ This tutorial is going to introduce all commands and data operations on CarbonDa
* The partitioned column can be excluded from SORT_COLUMNS, this will let other columns to do the efficient sorting.
* When writing SQL on a partition table, try to use filters on the partition column.
+## PRE-AGGREGATE TABLES
+ Carbondata supports pre aggregating of data so that OLAP kind of queries can fetch data
+ much faster.Aggregate tables are created as datamaps so that the handling is as efficient as
+ other indexing support.Users can create as many aggregate tables they require as datamaps to
+ improve their query performance,provided the storage requirements and loading speeds are
+ acceptable.
+
+ For main table called **sales** which is defined as
+
+ ```
+ CREATE TABLE sales (
+ order_time timestamp,
+ user_id string,
+ sex string,
+ country string,
+ quantity int,
+ price bigint)
+ STORED BY 'carbondata'
+ ```
+
+ user can create pre-aggregate tables using the DDL
+
+ ```
+ CREATE DATAMAP agg_sales
+ ON TABLE sales
+ USING "preaggregate"
+ AS
+ SELECT country, sex, sum(quantity), avg(price)
+ FROM sales
+ GROUP BY country, sex
+ ```
+
+<b><p align="left">Functions supported in pre-aggregate tables</p></b>
+
+| Function | Rollup supported |
+|-----------|----------------|
+| SUM | Yes |
+| AVG | Yes |
+| MAX | Yes |
+| MIN | Yes |
+| COUNT | Yes |
+
+
+##### How pre-aggregate tables are selected
+For the main table **sales** and pre-aggregate table **agg_sales** created above, queries of the
+kind
+```
+SELECT country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex
+
+SELECT sex, sum(quantity) from sales GROUP BY sex
+
+SELECT sum(price), country from sales GROUP BY country
+```
+
+will be transformed by Query Planner to fetch data from pre-aggregate table **agg_sales**
+
+But queries of kind
+```
+SELECT user_id, country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex
+
+SELECT sex, avg(quantity) from sales GROUP BY sex
+
+SELECT max(price), country from sales GROUP BY country
+```
+
+will fetch the data from the main table **sales**
+
+##### Loading data to pre-aggregate tables
+For existing table with loaded data, data load to pre-aggregate table will be triggered by the
+CREATE DATAMAP statement when user creates the pre-aggregate table.
+For incremental loads after aggregates tables are created, loading data to main table triggers
+the load to pre-aggregate tables once main table loading is complete.These loads are automic
+meaning that data on main table and aggregate tables are only visible to the user after all tables
+are loaded
+
+##### Querying data from pre-aggregate tables
+Pre-aggregate tables cannot be queries directly.Queries are to be made on main table.Internally
+carbondata will check associated pre-aggregate tables with the main table and if the
+pre-aggregate tables satisfy the query condition, the plan is transformed automatically to use
+pre-aggregate table to fetch the data
+
+##### Compacting pre-aggregate tables
+Compaction is an optional operation for pre-aggregate table. If compaction is performed on main
+table but not performed on pre-aggregate table, all queries still can benefit from pre-aggregate
+table.To further improve performance on pre-aggregate table, compaction can be triggered on
+pre-aggregate tables directly, it will merge the segments inside pre-aggregation table.
+To do that, use ALTER TABLE COMPACT command on the pre-aggregate table just like the main table
+
+ NOTE:
+ * If the aggregate function used in the pre-aggregate table creation included distinct-count,
+ during compaction, the pre-aggregate table values are recomputed.This would a costly
+ operation as compared to the compaction of pre-aggregate tables containing other aggregate
+ functions alone
+
+##### Update/Delete Operations on pre-aggregate tables
+This functionality is not supported.
+
+ NOTE (<b>RESTRICTION</b>):
+ * Update/Delete operations are <b>not supported</b> on main table which has pre-aggregate tables
+ created on it.All the pre-aggregate tables <b>will have to be dropped</b> before update/delete
+ operations can be performed on the main table.Pre-aggregate tables can be rebuilt manually
+ after update/delete operations are completed
+
+##### Delete Segment Operations on pre-aggregate tables
+This functionality is not supported.
+
+ NOTE (<b>RESTRICTION</b>):
+ * Delete Segment operations are <b>not supported</b> on main table which has pre-aggregate tables
+ created on it.All the pre-aggregate tables <b>will have to be dropped</b> before update/delete
+ operations can be performed on the main table.Pre-aggregate tables can be rebuilt manually
+ after delete segment operations are completed
+
+##### Alter Table Operations on pre-aggregate tables
+This functionality is not supported.
+
+ NOTE (<b>RESTRICTION</b>):
+ * Adding new column in new table does not have any affect on pre-aggregate tables. However if
+ dropping or renaming a column has impact in pre-aggregate table, such operations will be
+ rejected and error will be thrown.All the pre-aggregate tables <b>will have to be dropped</b>
+ before Alter Operations can be performed on the main table.Pre-aggregate tables can be rebuilt
+ manually after Alter Table operations are completed
+
+### Supporting timeseries data
+Carbondata has built-in understanding of time hierarchy and levels: year, month, day, hour, minute.
+Multiple pre-aggregate tables can be created for the hierarchy and Carbondata can do automatic
+roll-up for the queries on these hierarchies.
+
+ ```
+ CREATE DATAMAP agg_year
+ ON TABLE sales
+ USING "timeseries"
+ DMPROPERTIES (
+ 'event_time’=’order_time’,
+ 'year_granualrity’=’1’,
+ ) AS
+ SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+ avg(price) FROM sales GROUP BY order_time, country, sex
+
+ CREATE DATAMAP agg_month
+ ON TABLE sales
+ USING "timeseries"
+ DMPROPERTIES (
+ 'event_time’=’order_time’,
+ 'month_granualrity’=’1’,
+ ) AS
+ SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+ avg(price) FROM sales GROUP BY order_time, country, sex
+
+ CREATE DATAMAP agg_day
+ ON TABLE sales
+ USING "timeseries"
+ DMPROPERTIES (
+ 'event_time’=’order_time’,
+ 'day_granualrity’=’1’,
+ ) AS
+ SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+ avg(price) FROM sales GROUP BY order_time, country, sex
+
+ CREATE DATAMAP agg_sales_hour
+ ON TABLE sales
+ USING "timeseries"
+ DMPROPERTIES (
+ 'event_time’=’order_time’,
+ 'hour_granualrity’=’1’,
+ ) AS
+ SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+ avg(price) FROM sales GROUP BY order_time, country, sex
+
+ CREATE DATAMAP agg_minute
+ ON TABLE sales
+ USING "timeseries"
+ DMPROPERTIES (
+ 'event_time’=’order_time’,
+ 'minute_granualrity’=’1’,
+ ) AS
+ SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+ avg(price) FROM sales GROUP BY order_time, country, sex
+
+ CREATE DATAMAP agg_minute
+ ON TABLE sales
+ USING "timeseries"
+ DMPROPERTIES (
+ 'event_time’=’order_time’,
+ 'minute_granualrity’=’1’,
+ ) AS
+ SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+ avg(price) FROM sales GROUP BY order_time, country, sex
+ ```
+
+ For Querying data and automatically roll-up to the desired aggregation level,Carbondata supports
+ UDF as
+ ```
+ timeseries(timeseries column name, ‘aggregation level’)
+ ```
+ ```
+ Select timeseries(order_time, ‘hour’), sum(quantity) from sales group by timeseries(order_time,
+ ’hour’)
+ ```
+
+ It is **not necessary** to create pre-aggregate tables for each granularity unless required for
+ query
+ .Carbondata
+ can roll-up the data and fetch it
+
+ For Example: For main table **sales** , If pre-aggregate tables were created as
+
+ ```
+ CREATE DATAMAP agg_day
+ ON TABLE sales
+ USING "timeseries"
+ DMPROPERTIES (
+ 'event_time’=’order_time’,
+ 'day_granualrity’=’1’,
+ ) AS
+ SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+ avg(price) FROM sales GROUP BY order_time, country, sex
+
+ CREATE DATAMAP agg_sales_hour
+ ON TABLE sales
+ USING "timeseries"
+ DMPROPERTIES (
+ 'event_time’=’order_time’,
+ 'hour_granualrity’=’1’,
+ ) AS
+ SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price),
+ avg(price) FROM sales GROUP BY order_time, country, sex
+ ```
+
+ Queries like below will be rolled-up and fetched from pre-aggregate tables
+ ```
+ Select timeseries(order_time, ‘month’), sum(quantity) from sales group by timeseries(order_time,
+ ’month’)
+
+ Select timeseries(order_time, ‘year’), sum(quantity) from sales group by timeseries(order_time,
+ ’year’)
+ ```
+
+ NOTE (<b>RESTRICTION</b>):
+ * Only value of 1 is supported for hierarchy levels. Other hierarchy levels are not supported.
+ Other hierarchy levels are not supported
+ * pre-aggregate tables for the desired levels needs to be created one after the other
+ * pre-aggregate tables created for each level needs to be dropped separately
+
+
## BUCKETING
Bucketing feature can be used to distribute/organize the table/partition data into multiple files such
http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala
new file mode 100644
index 0000000..fe3a93d
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.SaveMode
+
+/**
+ * This example is for pre-aggregate tables.
+ */
+
+object PreAggregateTableExample {
+
+ def main(args: Array[String]) {
+
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv"
+ val spark = ExampleUtils.createCarbonSession("PreAggregateTableExample")
+
+ spark.sparkContext.setLogLevel("ERROR")
+
+ // 1. simple usage for Pre-aggregate tables creation and query
+ spark.sql("DROP TABLE IF EXISTS mainTable")
+ spark.sql("""
+ | CREATE TABLE mainTable
+ | (id Int,
+ | name String,
+ | city String,
+ | age Int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ spark.sql(s"""
+ LOAD DATA LOCAL INPATH '$testData' into table mainTable
+ """)
+
+ spark.sql(
+ s"""create datamap preagg_sum on table mainTable using 'preaggregate' as
+ | select id,sum(age) from mainTable group by id"""
+ .stripMargin)
+ spark.sql(
+ s"""create datamap preagg_avg on table mainTable using 'preaggregate' as
+ | select id,avg(age) from mainTable group by id"""
+ .stripMargin)
+ spark.sql(
+ s"""create datamap preagg_count on table mainTable using 'preaggregate' as
+ | select id,count(age) from mainTable group by id"""
+ .stripMargin)
+ spark.sql(
+ s"""create datamap preagg_min on table mainTable using 'preaggregate' as
+ | select id,min(age) from mainTable group by id"""
+ .stripMargin)
+ spark.sql(
+ s"""create datamap preagg_max on table mainTable using 'preaggregate' as
+ | select id,max(age) from mainTable group by id"""
+ .stripMargin)
+
+ spark.sql(
+ s"""
+ | SELECT id,max(age)
+ | FROM mainTable group by id
+ """.stripMargin).show()
+
+ // 2.compare the performance : with pre-aggregate VS main table
+
+ // build test data, if set the data is larger than 100M, it will take 10+ mins.
+ import spark.implicits._
+
+ import scala.util.Random
+ val r = new Random()
+ val df = spark.sparkContext.parallelize(1 to 10 * 1000 * 1000)
+ .map(x => ("No." + r.nextInt(100000), "name" + x % 8, "city" + x % 50, x % 60))
+ .toDF("ID", "name", "city", "age")
+
+ // Create table with pre-aggregate table
+ df.write.format("carbondata")
+ .option("tableName", "personTable")
+ .option("compress", "true")
+ .mode(SaveMode.Overwrite).save()
+
+ // Create table without pre-aggregate table
+ df.write.format("carbondata")
+ .option("tableName", "personTableWithoutAgg")
+ .option("compress", "true")
+ .mode(SaveMode.Overwrite).save()
+
+ // Create pre-aggregate table
+ spark.sql("""
+ CREATE datamap preagg_avg on table personTable using 'preaggregate' as
+ | select id,avg(age) from personTable group by id
+ """.stripMargin)
+
+ // define time function
+ def time(code: => Unit): Double = {
+ val start = System.currentTimeMillis()
+ code
+ // return time in second
+ (System.currentTimeMillis() - start).toDouble / 1000
+ }
+
+ val time_without_aggTable = time {
+ spark.sql(
+ s"""
+ | SELECT id, avg(age)
+ | FROM personTableWithoutAgg group by id
+ """.stripMargin).count()
+ }
+
+ val time_with_aggTable = time {
+ spark.sql(
+ s"""
+ | SELECT id, avg(age)
+ | FROM personTable group by id
+ """.stripMargin).count()
+ }
+ // scalastyle:off
+ println("time for query on table with pre-aggregate table:" + time_with_aggTable.toString)
+ println("time for query on table without pre-aggregate table:" + time_without_aggTable.toString)
+ // scalastyle:on
+
+ spark.sql("DROP TABLE IF EXISTS mainTable")
+ spark.sql("DROP TABLE IF EXISTS personTable")
+ spark.sql("DROP TABLE IF EXISTS personTableWithoutAgg")
+
+ spark.close()
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala
new file mode 100644
index 0000000..470d9ff
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.SaveMode
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * This example is for time series pre-aggregate tables.
+ */
+
+object TimeSeriesPreAggregateTableExample {
+
+ def main(args: Array[String]) {
+
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val testData = s"$rootPath/integration/spark-common-test/src/test/resources/timeseriestest.csv"
+ val spark = ExampleUtils.createCarbonSession("TimeSeriesPreAggregateTableExample")
+
+ spark.sparkContext.setLogLevel("ERROR")
+
+ import spark.implicits._
+
+ import scala.util.Random
+ val r = new Random()
+ val df = spark.sparkContext.parallelize(1 to 10 * 1000 )
+ .map(x => ("" + 20 + "%02d".format(r.nextInt(20)) + "-" + "%02d".format(r.nextInt(11) + 1) +
+ "-" + "%02d".format(r.nextInt(27) + 1) + " " + "%02d".format(r.nextInt(12)) + ":" +
+ "%02d".format(r.nextInt(59)) + ":" + "%02d".format(r.nextInt(59)), "name" + x % 8,
+ r.nextInt(60))).toDF("mytime", "name", "age")
+
+ // 1. usage for time series Pre-aggregate tables creation and query
+ spark.sql("drop table if exists timeSeriesTable")
+ spark.sql("CREATE TABLE timeSeriesTable(mytime timestamp," +
+ " name string, age int) STORED BY 'org.apache.carbondata.format'")
+ spark.sql(
+ s"""
+ | CREATE DATAMAP agg0_hour ON TABLE timeSeriesTable
+ | USING 'timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='mytime',
+ | 'HOUR_GRANULARITY'='1')
+ | AS SELECT mytime, SUM(age) FROM timeSeriesTable
+ | GROUP BY mytime
+ """.stripMargin)
+ spark.sql(
+ s"""
+ | CREATE DATAMAP agg0_day ON TABLE timeSeriesTable
+ | USING 'timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='mytime',
+ | 'DAY_GRANULARITY'='1')
+ | AS SELECT mytime, SUM(age) FROM timeSeriesTable
+ | GROUP BY mytime
+ """.stripMargin)
+
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
+
+ df.write.format("carbondata")
+ .option("tableName", "timeSeriesTable")
+ .option("compress", "true")
+ .mode(SaveMode.Append).save()
+
+ spark.sql(
+ s"""
+ select sum(age), timeseries(mytime,'hour') from timeSeriesTable group by timeseries(mytime,
+ 'hour')
+ """.stripMargin).show()
+
+ spark.sql(
+ s"""
+ select avg(age),timeseries(mytime,'year') from timeSeriesTable group by timeseries(mytime,
+ 'year')
+ """.stripMargin).show()
+
+ spark.sql("DROP TABLE IF EXISTS timeSeriesTable")
+
+ spark.close()
+
+ }
+}