You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/15 19:22:21 UTC

carbondata git commit: [CARBONDATA-1736][Pre-Aggregate] Query from segment set is not effective when pre-aggregate table is present

Repository: carbondata
Updated Branches:
  refs/heads/master 02aa39701 -> 399db8f11


[CARBONDATA-1736][Pre-Aggregate] Query from segment set is not effective when pre-aggregate table is present

This closes #1546


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/399db8f1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/399db8f1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/399db8f1

Branch: refs/heads/master
Commit: 399db8f1132b2ecfa975e2ec4cbaa03eb68e935a
Parents: 02aa397
Author: kumarvishal <ku...@gmail.com>
Authored: Wed Dec 13 12:52:23 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Dec 16 00:52:06 2017 +0530

----------------------------------------------------------------------
 .../preaggregate/TestPreAggregateMisc.scala     | 45 ++++++++++++++++++++
 .../sql/hive/CarbonPreAggregateRules.scala      | 25 ++++++++++-
 2 files changed, 69 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/399db8f1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
new file mode 100644
index 0000000..b716124
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+@Ignore
+class TestPreAggregateMisc extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+  }
+  test("test PreAggregate With Set Segments property") {
+    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
+    sql("SET carbon.input.segments.default.mainTable=0")
+    checkAnswer(
+      sql("select sum(age) from mainTable"),
+      Seq(Row(183.0)))
+    sql("RESET")
+    sql("drop datamap agg1 on table mainTable")
+
+  }
+
+  override def afterAll: Unit = {
+    sql("drop table if exists mainTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/399db8f1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 4227dcb..19cc711 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.CarbonReflectionUtils
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
@@ -263,6 +263,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           isValidPlan = false
           null
       }
+      if (isValidPlan && null != carbonTable) {
+        isValidPlan = isSpecificSegmentPresent(carbonTable)
+      }
       // if plan is valid then update the plan with child attributes
       if (isValidPlan) {
         // getting all the projection columns
@@ -312,6 +315,26 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   }
 
   /**
+   * Below method will be used to check whether specific segment is set for maintable
+   * if it is present then no need to transform the plan and query will be executed on
+   * maintable
+   * @param carbonTable
+   *                    parent table
+   * @return is specific segment is present in session params
+   */
+  def isSpecificSegmentPresent(carbonTable: CarbonTable) : Boolean = {
+    val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (carbonSessionInfo != null) {
+      carbonSessionInfo.getSessionParams
+        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                     carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+                       .getDatabaseName + "." + carbonTable.getTableName, "").isEmpty
+    } else {
+      true
+    }
+  }
+
+  /**
    * Below method will be used to extract the query columns from
    * filter expression
    * @param filterExp