You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/18 08:45:01 UTC
[3/3] carbondata git commit: Fixed delete with subquery issue
Fixed delete with subquery issue
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8ceb069e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8ceb069e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8ceb069e
Branch: refs/heads/branch-1.1
Commit: 8ceb069ed98f97c31dfbdab1be7cde223a6a4a4c
Parents: f701521
Author: ravipesala <ra...@gmail.com>
Authored: Sun Jun 18 00:49:40 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Jun 18 14:14:04 2017 +0530
----------------------------------------------------------------------
.../iud/DeleteCarbonTableTestCase.scala | 24 --------
.../sql/optimizer/CarbonLateDecodeRule.scala | 39 ++++++------
.../iud/DeleteCarbonTableSubqueryTestCase.scala | 63 ++++++++++++++++++++
3 files changed, 85 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 0346067..2e59c9c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -25,8 +25,6 @@ import org.apache.carbondata.core.util.CarbonProperties
class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "false")
sql("use default")
sql("drop database if exists iud_db cascade")
sql("create database iud_db")
@@ -97,26 +95,6 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
)
}
-// test("delete data from carbon table[where IN (sub query) ]") {
-// sql("""drop table if exists iud_db.dest""")
-// sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
-// sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
-// sql("""delete from iud_db.dest where c1 IN (select c11 from source2)""").show(truncate = false)
-// checkAnswer(
-// sql("""select c1 from iud_db.dest"""),
-// Seq(Row("c"), Row("d"), Row("e"))
-// )
-// }
-// test("delete data from carbon table[where IN (sub query with where clause) ]") {
-// sql("""drop table if exists iud_db.dest""")
-// sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
-// sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
-// sql("""delete from iud_db.dest where c1 IN (select c11 from source2 where c11 = 'b')""").show()
-// checkAnswer(
-// sql("""select c1 from iud_db.dest"""),
-// Seq(Row("a"), Row("c"), Row("d"), Row("e"))
-// )
-// }
test("delete data from carbon table[where numeric condition ]") {
sql("""drop table if exists iud_db.dest""")
sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
@@ -128,8 +106,6 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
)
}
override def afterAll {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
sql("use default")
sql("drop database if exists iud_db cascade")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 7a6c513..ae2e46b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -86,29 +86,34 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
}
private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
- val output = plan match {
+ val output = plan.transform {
case proj@Project(cols, Join(
left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
- val newCols = cols.map { col =>
- col match {
- case a@Alias(s: ScalaUDF, name)
- if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
- name.equalsIgnoreCase(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
- projectionToBeAdded :+= a
- AttributeReference(name, StringType, true)().withExprId(a.exprId)
+ var udfExists = false
+ val newCols = cols.map {
+ case a@Alias(s: ScalaUDF, name)
+ if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+ name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+ udfExists = true
+ projectionToBeAdded :+= a
+ AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId)
+ case other => other
+ }
+ if (udfExists) {
+ val newLeft = left match {
+ case Project(columns, logicalPlan) =>
+ Project(columns ++ projectionToBeAdded, logicalPlan)
+ case filter: Filter =>
+ Project(filter.output ++ projectionToBeAdded, filter)
+ case relation: LogicalRelation =>
+ Project(relation.output ++ projectionToBeAdded, relation)
case other => other
}
+ Project(newCols, Join(newLeft, right, jointype, condition))
+ } else {
+ proj
}
- val newLeft = left match {
- case Project(columns, logicalPlan) =>
- Project(columns ++ projectionToBeAdded, logicalPlan)
- case filter: Filter =>
- Project(filter.output ++ projectionToBeAdded, filter)
- case other => other
- }
- Project(newCols, Join(newLeft, right, jointype, condition))
case other => other
}
output
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
new file mode 100644
index 0000000..ff6196c
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.carbondata.iud
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class DeleteCarbonTableSubqueryTestCase extends QueryTest with BeforeAndAfterAll {
+ override def beforeAll {
+ sql("use default")
+ sql("drop database if exists iud_db_sub cascade")
+ sql("create database iud_db_sub")
+
+ sql("""create table iud_db_sub.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table iud_db_sub.source2""")
+ sql("use iud_db_sub")
+ }
+
+ test("delete data from carbon table[where IN (sub query) ]") {
+ sql("""drop table if exists iud_db_sub.dest""")
+ sql("""create table iud_db_sub.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db_sub.dest""")
+ sql("""delete from iud_db_sub.dest where c1 IN (select c11 from source2)""").show(truncate = false)
+ checkAnswer(
+ sql("""select c1 from iud_db_sub.dest"""),
+ Seq(Row("c"), Row("d"), Row("e"))
+ )
+ }
+
+ test("delete data from carbon table[where IN (sub query with where clause) ]") {
+ sql("""drop table if exists iud_db_sub.dest""")
+ sql("""create table iud_db_sub.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db_sub.dest""")
+ sql("""delete from iud_db_sub.dest where c1 IN (select c11 from source2 where c11 = 'b')""").show()
+ checkAnswer(
+ sql("""select c1 from iud_db_sub.dest"""),
+ Seq(Row("a"), Row("c"), Row("d"), Row("e"))
+ )
+ }
+
+ override def afterAll {
+ sql("use default")
+ sql("drop database if exists iud_db_sub cascade")
+ }
+}
\ No newline at end of file