You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/15 19:22:21 UTC
carbondata git commit: [CARBONDATA-1736][Pre-Aggregate] Query from
segment set is not effective when pre-aggregate table is present
Repository: carbondata
Updated Branches:
refs/heads/master 02aa39701 -> 399db8f11
[CARBONDATA-1736][Pre-Aggregate] Query from segment set is not effective when pre-aggregate table is present
This closes #1546
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/399db8f1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/399db8f1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/399db8f1
Branch: refs/heads/master
Commit: 399db8f1132b2ecfa975e2ec4cbaa03eb68e935a
Parents: 02aa397
Author: kumarvishal <ku...@gmail.com>
Authored: Wed Dec 13 12:52:23 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Dec 16 00:52:06 2017 +0530
----------------------------------------------------------------------
.../preaggregate/TestPreAggregateMisc.scala | 45 ++++++++++++++++++++
.../sql/hive/CarbonPreAggregateRules.scala | 25 ++++++++++-
2 files changed, 69 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/399db8f1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
new file mode 100644
index 0000000..b716124
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+@Ignore
+class TestPreAggregateMisc extends QueryTest with BeforeAndAfterAll {
+ override def beforeAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+ }
+ test("test PreAggregate With Set Segments property") {
+ sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
+ sql("SET carbon.input.segments.default.mainTable=0")
+ checkAnswer(
+ sql("select sum(age) from mainTable"),
+ Seq(Row(183.0)))
+ sql("RESET")
+ sql("drop datamap agg1 on table mainTable")
+
+ }
+
+ override def afterAll: Unit = {
+ sql("drop table if exists mainTable")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/399db8f1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 4227dcb..19cc711 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
@@ -263,6 +263,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
isValidPlan = false
null
}
+ if (isValidPlan && null != carbonTable) {
+ isValidPlan = isSpecificSegmentPresent(carbonTable)
+ }
// if plan is valid then update the plan with child attributes
if (isValidPlan) {
// getting all the projection columns
@@ -312,6 +315,26 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
}
/**
+ * Below method will be used to check whether specific segment is set for maintable
+ * if it is present then no need to transform the plan and query will be executed on
+ * maintable
+ * @param carbonTable
+ * parent table
+ * @return is specific segment is present in session params
+ */
+ def isSpecificSegmentPresent(carbonTable: CarbonTable) : Boolean = {
+ val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (carbonSessionInfo != null) {
+ carbonSessionInfo.getSessionParams
+ .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getDatabaseName + "." + carbonTable.getTableName, "").isEmpty
+ } else {
+ true
+ }
+ }
+
+ /**
* Below method will be used to extract the query columns from
* filter expression
* @param filterExp