You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/27 10:23:21 UTC
[1/2] incubator-carbondata git commit: Supporting scalar subquery in
carbon
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 740358c13 -> 565bc9b8f
Supporting scalar subquery in carbon
Removed unused imports
Changed format
Changed format
changed testcases
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/538b64ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/538b64ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/538b64ec
Branch: refs/heads/master
Commit: 538b64ec4d9467fc41a953af7bac1ea46fcd9d84
Parents: 740358c
Author: ravipesala <ra...@gmail.com>
Authored: Wed Feb 1 23:04:05 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Mon Feb 27 00:29:52 2017 +0800
----------------------------------------------------------------------
.../sql/CarbonDatasourceHadoopRelation.scala | 3 +-
.../spark/sql/hive/CarbonSessionState.scala | 37 ++++++++++++++++++--
.../sql/optimizer/CarbonLateDecodeRule.scala | 6 ++++
.../bucketing/TableBucketingTestCase.scala | 14 ++++++++
4 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/538b64ec/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index eb1730f..1491cff 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -39,7 +39,8 @@ case class CarbonDatasourceHadoopRelation(
sparkSession: SparkSession,
paths: Array[String],
parameters: Map[String, String],
- tableSchema: Option[StructType])
+ tableSchema: Option[StructType],
+ var isSubquery: Boolean = false)
extends BaseRelation with InsertableRelation {
lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/538b64ec/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 066acce..331efe9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -16,10 +16,16 @@
*/
package org.apache.spark.sql.hive
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions.ScalarSubquery
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.{CarbonLateDecodeStrategy, SparkOptimizer}
import org.apache.spark.sql.execution.command.DDLStrategy
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSparkSqlParser
@@ -35,4 +41,31 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+}
+
+class CarbonOptimizer(
+ catalog: SessionCatalog,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods)
+ extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+ // optimize whole plan at once.
+ val transFormedPlan = plan.transform {
+ case filter: Filter =>
+ filter.transformExpressions {
+ case s: ScalarSubquery =>
+ val tPlan = s.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery = true
+ lr
+ }
+ ScalarSubquery(tPlan, s.children, s.exprId)
+ }
+ }
+ super.execute(transFormedPlan)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/538b64ec/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index afbbae1..a20326f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -58,6 +58,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = {
relations = collectCarbonRelation(plan)
if (relations.nonEmpty && !isOptimized(plan)) {
+ // In case scalar subquery skip the transformation and update the flag.
+ if (relations.exists(_.carbonRelation.isSubquery)) {
+ relations.foreach(p => p.carbonRelation.isSubquery = false)
+ LOGGER.info("Skip CarbonOptimizer for scalar sub query")
+ return plan
+ }
LOGGER.info("Starting to optimize plan")
val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
val queryStatistic = new QueryStatistic()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/538b64ec/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 65a726a..33f710b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -191,6 +191,20 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
assert(shuffleExists, "shuffle should exist on non bucket tables")
}
+ test("test scalar subquery with equal") {
+ sql(
+ """select sum(salary) from t4 t1
+ |where ID = (select sum(ID) from t4 t2 where t1.name = t2.name)""".stripMargin)
+ .count()
+ }
+
+ test("test scalar subquery with lessthan") {
+ sql(
+ """select sum(salary) from t4 t1
+ |where ID < (select sum(ID) from t4 t2 where t1.name = t2.name)""".stripMargin)
+ .count()
+ }
+
override def afterAll {
sql("DROP TABLE IF EXISTS t3")
sql("DROP TABLE IF EXISTS t4")
[2/2] incubator-carbondata git commit: [CARBONDATA-692]Supporting
scalar subquery in carbon This closes #583
Posted by ja...@apache.org.
[CARBONDATA-692]Supporting scalar subquery in carbon This closes #583
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/565bc9b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/565bc9b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/565bc9b8
Branch: refs/heads/master
Commit: 565bc9b8f1893ba586791118c560ccfba4263b66
Parents: 740358c 538b64e
Author: jackylk <ja...@huawei.com>
Authored: Mon Feb 27 18:22:15 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Mon Feb 27 18:22:15 2017 +0800
----------------------------------------------------------------------
.../sql/CarbonDatasourceHadoopRelation.scala | 3 +-
.../spark/sql/hive/CarbonSessionState.scala | 37 ++++++++++++++++++--
.../sql/optimizer/CarbonLateDecodeRule.scala | 6 ++++
.../bucketing/TableBucketingTestCase.scala | 14 ++++++++
4 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------