You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/15 11:50:26 UTC
[21/42] carbondata git commit: Fixed all testcases of IUD in spark 2.1
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
deleted file mode 100644
index 9da3913..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDCompactionTestCases.scala
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.iud
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-
-class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
- override def beforeAll {
-
- sql("""drop database if exists iud4 cascade""")
- sql("""create database iud4""")
- sql("""use iud4""")
- sql(
- """create table iud4.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/comp1.csv' INTO table iud4.dest""")
- sql(
- """create table iud4.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table iud4.source2""")
- sql("""create table iud4.other (c1 string,c2 int) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/other.csv' INTO table iud4.other""")
- sql(
- """create table iud4.hdest (c1 string,c2 int,c3 string,c5 string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/comp1.csv' INTO table iud4.hdest""")
- sql(
- """CREATE TABLE iud4.update_01(imei string,age int,task bigint,num double,level decimal(10,3),name string)STORED BY 'org.apache.carbondata.format' """)
- sql(
- s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/update01.csv' INTO TABLE iud4.update_01 OPTIONS('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE') """)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled, "true")
- }
-
-
-
- test("test IUD Horizontal Compaction Update Alter Clean") {
- sql("""drop database if exists iud4 cascade""")
- sql("""create database iud4""")
- sql("""use iud4""")
- sql(
- """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-
- sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""")
- sql(
- """create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""")
- sql(
- """update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""")
- .show()
- sql(
- """update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and (s.c22 > 3 and s.c22 < 5) or (s.c22 > 13 and s.c22 < 15) or (s.c22 > 23 and s.c22 < 25) or (s.c22 > 33 and s.c22 < 35))""")
- .show()
- sql(
- """update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and (s.c22 > 5 and c22 < 8) or (s.c22 > 15 and s.c22 < 18 ) or (s.c22 > 25 and c22 < 28) or (s.c22 > 35 and c22 < 38))""")
- .show()
- sql("""alter table dest2 compact 'minor'""")
- sql("""clean files for table dest2""")
- checkAnswer(
- sql("""select c1,c2,c3,c5 from dest2 order by c2"""),
- Seq(Row("a", 1, "MGM", "Disco"),
- Row("b", 2, "RGK", "Music"),
- Row("c", 3, "cc", "ccc"),
- Row("d", 4, "YDY", "Weather"),
- Row("e", 5, "ee", "eee"),
- Row("f", 6, "ff", "fff"),
- Row("g", 7, "YTY", "Hello"),
- Row("h", 8, "hh", "hhh"),
- Row("i", 9, "ii", "iii"),
- Row("j", 10, "jj", "jjj"),
- Row("a", 11, "MGM", "Disco"),
- Row("b", 12, "RGK", "Music"),
- Row("c", 13, "cc", "ccc"),
- Row("d", 14, "YDY", "Weather"),
- Row("e", 15, "ee", "eee"),
- Row("f", 16, "ff", "fff"),
- Row("g", 17, "YTY", "Hello"),
- Row("h", 18, "hh", "hhh"),
- Row("i", 19, "ii", "iii"),
- Row("j", 20, "jj", "jjj"),
- Row("a", 21, "MGM", "Disco"),
- Row("b", 22, "RGK", "Music"),
- Row("c", 23, "cc", "ccc"),
- Row("d", 24, "YDY", "Weather"),
- Row("e", 25, "ee", "eee"),
- Row("f", 26, "ff", "fff"),
- Row("g", 27, "YTY", "Hello"),
- Row("h", 28, "hh", "hhh"),
- Row("i", 29, "ii", "iii"),
- Row("j", 30, "jj", "jjj"),
- Row("a", 31, "MGM", "Disco"),
- Row("b", 32, "RGK", "Music"),
- Row("c", 33, "cc", "ccc"),
- Row("d", 34, "YDY", "Weather"),
- Row("e", 35, "ee", "eee"),
- Row("f", 36, "ff", "fff"),
- Row("g", 37, "YTY", "Hello"),
- Row("h", 38, "hh", "hhh"),
- Row("i", 39, "ii", "iii"),
- Row("j", 40, "jj", "jjj"))
- )
- sql("""drop table dest2""")
- }
-
-
- test("test IUD Horizontal Compaction Delete") {
- sql("""drop database if exists iud4 cascade""")
- sql("""create database iud4""")
- sql("""use iud4""")
- sql(
- """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""")
- sql("""select * from dest2""")
- sql(
- """create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""")
- sql("""select * from source2""")
- sql("""delete from dest2 where (c2 < 3) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""").show()
- sql("""select * from dest2 order by 2""")
- sql("""delete from dest2 where (c2 > 3 and c2 < 5) or (c2 > 13 and c2 < 15) or (c2 > 23 and c2 < 25) or (c2 > 33 and c2 < 35)""").show()
- sql("""select * from dest2 order by 2""")
- sql("""delete from dest2 where (c2 > 5 and c2 < 8) or (c2 > 15 and c2 < 18 ) or (c2 > 25 and c2 < 28) or (c2 > 35 and c2 < 38)""").show()
- sql("""clean files for table dest2""")
- checkAnswer(
- sql("""select c1,c2,c3,c5 from dest2 order by c2"""),
- Seq(Row("c", 3, "cc", "ccc"),
- Row("e", 5, "ee", "eee"),
- Row("h", 8, "hh", "hhh"),
- Row("i", 9, "ii", "iii"),
- Row("j", 10, "jj", "jjj"),
- Row("c", 13, "cc", "ccc"),
- Row("e", 15, "ee", "eee"),
- Row("h", 18, "hh", "hhh"),
- Row("i", 19, "ii", "iii"),
- Row("j", 20, "jj", "jjj"),
- Row("c", 23, "cc", "ccc"),
- Row("e", 25, "ee", "eee"),
- Row("h", 28, "hh", "hhh"),
- Row("i", 29, "ii", "iii"),
- Row("j", 30, "jj", "jjj"),
- Row("c", 33, "cc", "ccc"),
- Row("e", 35, "ee", "eee"),
- Row("h", 38, "hh", "hhh"),
- Row("i", 39, "ii", "iii"),
- Row("j", 40, "jj", "jjj"))
- )
- sql("""drop table dest2""")
- }
-
- test("test IUD Horizontal Compaction Multiple Update Vertical Compaction and Clean") {
- sql("""drop database if exists iud4 cascade""")
- sql("""create database iud4""")
- sql("""use iud4""")
- sql(
- """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""")
- sql(
- """create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""")
- sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
- sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c11,s.c66 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
- sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and (s.c22 > 3 and s.c22 < 5) or (s.c22 > 13 and s.c22 < 15) or (s.c22 > 23 and s.c22 < 25) or (s.c22 > 33 and s.c22 < 35))""").show()
- sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c11,s.c66 from source2 s where d.c1 = s.c11 and (s.c22 > 3 and s.c22 < 5) or (s.c22 > 13 and s.c22 < 15) or (s.c22 > 23 and s.c22 < 25) or (s.c22 > 33 and s.c22 < 35))""").show()
- sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and (s.c22 > 5 and c22 < 8) or (s.c22 > 15 and s.c22 < 18 ) or (s.c22 > 25 and c22 < 28) or (s.c22 > 35 and c22 < 38))""").show()
- sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c11,s.c66 from source2 s where d.c1 = s.c11 and (s.c22 > 5 and c22 < 8) or (s.c22 > 15 and s.c22 < 18 ) or (s.c22 > 25 and c22 < 28) or (s.c22 > 35 and c22 < 38))""").show()
- sql("""alter table dest2 compact 'major'""")
- sql("""clean files for table dest2""")
- checkAnswer(
- sql("""select c1,c2,c3,c5 from dest2 order by c2"""),
- Seq(Row("a", 1, "a", "10"),
- Row("b", 2, "b", "8"),
- Row("c", 3, "cc", "ccc"),
- Row("d", 4, "d", "9"),
- Row("e", 5, "ee", "eee"),
- Row("f", 6, "ff", "fff"),
- Row("g", 7, "g", "12"),
- Row("h", 8, "hh", "hhh"),
- Row("i", 9, "ii", "iii"),
- Row("j", 10, "jj", "jjj"),
- Row("a", 11, "a", "10"),
- Row("b", 12, "b", "8"),
- Row("c", 13, "cc", "ccc"),
- Row("d", 14, "d", "9"),
- Row("e", 15, "ee", "eee"),
- Row("f", 16, "ff", "fff"),
- Row("g", 17, "g", "12"),
- Row("h", 18, "hh", "hhh"),
- Row("i", 19, "ii", "iii"),
- Row("j", 20, "jj", "jjj"),
- Row("a", 21, "a", "10"),
- Row("b", 22, "b", "8"),
- Row("c", 23, "cc", "ccc"),
- Row("d", 24, "d", "9"),
- Row("e", 25, "ee", "eee"),
- Row("f", 26, "ff", "fff"),
- Row("g", 27, "g", "12"),
- Row("h", 28, "hh", "hhh"),
- Row("i", 29, "ii", "iii"),
- Row("j", 30, "jj", "jjj"),
- Row("a", 31, "a", "10"),
- Row("b", 32, "b", "8"),
- Row("c", 33, "cc", "ccc"),
- Row("d", 34, "d", "9"),
- Row("e", 35, "ee", "eee"),
- Row("f", 36, "ff", "fff"),
- Row("g", 37, "g", "12"),
- Row("h", 38, "hh", "hhh"),
- Row("i", 39, "ii", "iii"),
- Row("j", 40, "jj", "jjj"))
- )
- sql("""drop table dest2""")
- }
-
- test("test IUD Horizontal Compaction Update Delete and Clean") {
- sql("""drop database if exists iud4 cascade""")
- sql("""create database iud4""")
- sql("""use iud4""")
- sql(
- """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""")
- sql(
- """create table source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source3.csv' INTO table source2""")
- sql("""update dest2 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from source2 s where d.c1 = s.c11 and s.c22 < 3 or (s.c22 > 10 and s.c22 < 13) or (s.c22 > 20 and s.c22 < 23) or (s.c22 > 30 and s.c22 < 33))""").show()
- sql("""delete from dest2 where (c2 < 2) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""").show()
- sql("""delete from dest2 where (c2 > 3 and c2 < 5) or (c2 > 13 and c2 < 15) or (c2 > 23 and c2 < 25) or (c2 > 33 and c2 < 35)""").show()
- sql("""delete from dest2 where (c2 > 5 and c2 < 8) or (c2 > 15 and c2 < 18 ) or (c2 > 25 and c2 < 28) or (c2 > 35 and c2 < 38)""").show()
- sql("""clean files for table dest2""")
- checkAnswer(
- sql("""select c1,c2,c3,c5 from dest2 order by c2"""),
- Seq(Row("b", 2, "RGK", "Music"),
- Row("c", 3, "cc", "ccc"),
- Row("e", 5, "ee", "eee"),
- Row("h", 8, "hh", "hhh"),
- Row("i", 9, "ii", "iii"),
- Row("j", 10, "jj", "jjj"),
- Row("c", 13, "cc", "ccc"),
- Row("e", 15, "ee", "eee"),
- Row("h", 18, "hh", "hhh"),
- Row("i", 19, "ii", "iii"),
- Row("j", 20, "jj", "jjj"),
- Row("c", 23, "cc", "ccc"),
- Row("e", 25, "ee", "eee"),
- Row("h", 28, "hh", "hhh"),
- Row("i", 29, "ii", "iii"),
- Row("j", 30, "jj", "jjj"),
- Row("c", 33, "cc", "ccc"),
- Row("e", 35, "ee", "eee"),
- Row("h", 38, "hh", "hhh"),
- Row("i", 39, "ii", "iii"),
- Row("j", 40, "jj", "jjj"))
- )
- sql("""drop table dest2""")
- }
-
- test("test IUD Horizontal Compaction Check Column Cardinality") {
- sql("""drop database if exists iud4 cascade""")
- sql("""create database iud4""")
- sql("""use iud4""")
- sql(
- """create table T_Carbn01(Active_status String,Item_type_cd INT,Qty_day_avg INT,Qty_total INT,Sell_price BIGINT,Sell_pricep DOUBLE,Discount_price DOUBLE,Profit DECIMAL(3,2),Item_code String,Item_name String,Outlet_name String,Update_time TIMESTAMP,Create_date String)STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/T_Hive1.csv' INTO table t_carbn01 options ('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE','DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='Active_status,Item_type_cd,Qty_day_avg,Qty_total,Sell_price,Sell_pricep,Discount_price,Profit,Item_code,Item_name,Outlet_name,Update_time,Create_date')""")
- sql("""update t_carbn01 set (item_code) = ('Orange') where item_type_cd = 14""").show()
- sql("""update t_carbn01 set (item_code) = ('Banana') where item_type_cd = 2""").show()
- sql("""delete from t_carbn01 where item_code in ('RE3423ee','Orange','Banana')""").show()
- checkAnswer(
- sql("""select item_code from t_carbn01 where item_code not in ('RE3423ee','Orange','Banana')"""),
- Seq(Row("SAD423ee"),
- Row("DE3423ee"),
- Row("SE3423ee"),
- Row("SE3423ee"),
- Row("SE3423ee"),
- Row("SE3423ee"))
- )
- sql("""drop table t_carbn01""")
- }
-
-
- test("test IUD Horizontal Compaction Segment Delete Test Case") {
- sql("""drop database if exists iud4 cascade""")
- sql("""create database iud4""")
- sql("""use iud4""")
- sql(
- """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""")
- sql(
- """delete from dest2 where (c2 < 3) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""").show()
- sql("""DELETE SEGMENT 0 FROM TABLE dest2""")
- sql("""clean files for table dest2""")
- sql(
- """update dest2 set (c5) = ('8RAM size') where (c2 > 3 and c2 < 5) or (c2 > 13 and c2 < 15) or (c2 > 23 and c2 < 25) or (c2 > 33 and c2 < 35)""")
- .show()
- checkAnswer(
- sql("""select count(*) from dest2"""),
- Seq(Row(24))
- )
- sql("""drop table dest2""")
- }
-
- test("test case full table delete") {
- sql("""drop database if exists iud4 cascade""")
- sql("""create database iud4""")
- sql("""use iud4""")
- sql(
- """create table dest2 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp2.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp3.csv' INTO table dest2""")
- sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""")
- sql("""delete from dest2 where c2 < 41""").show()
- sql("""alter table dest2 compact 'major'""")
- checkAnswer(
- sql("""select count(*) from dest2"""),
- Seq(Row(0))
- )
- sql("""drop table dest2""")
- }
-
-
- override def afterAll {
- sql("use default")
- sql("drop database if exists iud4 cascade")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
deleted file mode 100644
index 2fc51b5..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.iud
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
- override def beforeAll {
-
- sql("drop database if exists iud cascade")
- sql("create database iud")
- sql("use iud")
- sql("""create table iud.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest""")
- sql("""create table iud.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table iud.source2""")
- sql("""create table iud.other (c1 string,c2 int) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/other.csv' INTO table iud.other""")
- sql("""create table iud.hdest (c1 string,c2 int,c3 string,c5 string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE""").show()
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.hdest""")
- sql("""CREATE TABLE iud.update_01(imei string,age int,task bigint,num double,level decimal(10,3),name string)STORED BY 'org.apache.carbondata.format' """)
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/update01.csv' INTO TABLE iud.update_01 OPTIONS('BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', 'BAD_RECORDS_ACTION' = 'FORCE') """)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")
- }
-
-
- test("test update operation with 0 rows updation.") {
- sql("""drop table iud.zerorows""").show
- sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""")
- sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
- sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'xxx'""").show()
- checkAnswer(
- sql("""select c1,c2,c3,c5 from iud.zerorows"""),
- Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
- )
- sql("""drop table iud.zerorows""").show
-
-
- }
-
-
- test("update carbon table[select from source table with where and exist]") {
- sql("""drop table iud.dest11""").show
- sql("""create table iud.dest11 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest11""")
- sql("""update iud.dest11 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
- checkAnswer(
- sql("""select c3,c5 from iud.dest11"""),
- Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), Row("MGM","Disco"),Row("RGK","Music"))
- )
- sql("""drop table iud.dest11""").show
- }
-
- test("update carbon table[using destination table columns with where and exist]") {
- sql("""drop table iud.dest22""")
- sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest22""")
- checkAnswer(
- sql("""select c2 from iud.dest22 where c1='a'"""),
- Seq(Row(1))
- )
- sql("""update dest22 d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
- checkAnswer(
- sql("""select c2 from iud.dest22 where c1='a'"""),
- Seq(Row(2))
- )
- sql("""drop table iud.dest22""")
- }
-
- test("update carbon table without alias in set columns") {
- sql("""drop table iud.dest33""")
- sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
- sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
- checkAnswer(
- sql("""select c3,c5 from iud.dest33 where c1='a'"""),
- Seq(Row("MGM","Disco"))
- )
- sql("""drop table iud.dest33""")
- }
-
- test("update carbon table without alias in set columns with mulitple loads") {
- sql("""drop table iud.dest33""")
- sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
- sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
- checkAnswer(
- sql("""select c3,c5 from iud.dest33 where c1='a'"""),
- Seq(Row("MGM","Disco"),Row("MGM","Disco"))
- )
- sql("""drop table iud.dest33""")
- }
-
- test("update carbon table without alias in set three columns") {
- sql("""drop table iud.dest44""")
- sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest44""")
- sql("""update iud.dest44 d set (c1,c3,c5 ) = (select s.c11, s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
- checkAnswer(
- sql("""select c1,c3,c5 from iud.dest44 where c1='a'"""),
- Seq(Row("a","MGM","Disco"))
- )
- sql("""drop table iud.dest44""")
- }
-
- test("update carbon table[single column select from source with where and exist]") {
- sql("""drop table iud.dest55""")
- sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""")
- sql("""update iud.dest55 d set (c3) = (select s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
- checkAnswer(
- sql("""select c1,c3 from iud.dest55 """),
- Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
- )
- sql("""drop table iud.dest55""")
- }
-
- test("update carbon table[single column SELECT from source with where and exist]") {
- sql("""drop table iud.dest55""")
- sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""")
- sql("""update iud.dest55 d set (c3) = (SELECT s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
- checkAnswer(
- sql("""select c1,c3 from iud.dest55 """),
- Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
- )
- sql("""drop table iud.dest55""")
- }
-
- test("update carbon table[using destination table columns without where clause]") {
- sql("""drop table iud.dest66""")
- sql("""create table iud.dest66 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest66""")
- sql("""update iud.dest66 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))""").show()
- checkAnswer(
- sql("""select c2,c5 from iud.dest66 """),
- Seq(Row(2,"aaaz"),Row(3,"bbbz"),Row(4,"cccz"),Row(5,"dddz"),Row(6,"eeez"))
- )
- sql("""drop table iud.dest66""")
- }
-
- test("update carbon table[using destination table columns with where clause]") {
- sql("""drop table iud.dest77""")
- sql("""create table iud.dest77 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest77""")
- sql("""update iud.dest77 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c3 = 'dd'""").show()
- checkAnswer(
- sql("""select c2,c5 from iud.dest77 where c3 = 'dd'"""),
- Seq(Row(5,"dddz"))
- )
- sql("""drop table iud.dest77""")
- }
-
- test("update carbon table[using destination table( no alias) columns without where clause]") {
- sql("""drop table iud.dest88""")
- sql("""create table iud.dest88 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest88""")
- sql("""update iud.dest88 set (c2, c5 ) = (c2 + 1, concat(c5 , "y" ))""").show()
- checkAnswer(
- sql("""select c2,c5 from iud.dest88 """),
- Seq(Row(2,"aaay"),Row(3,"bbby"),Row(4,"cccy"),Row(5,"dddy"),Row(6,"eeey"))
- )
- sql("""drop table iud.dest88""")
- }
-
- test("update carbon table[using destination table columns with hard coded value ]") {
- sql("""drop table iud.dest99""")
- sql("""create table iud.dest99 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest99""")
- sql("""update iud.dest99 d set (c2, c5 ) = (c2 + 1, "xyx")""").show()
- checkAnswer(
- sql("""select c2,c5 from iud.dest99 """),
- Seq(Row(2,"xyx"),Row(3,"xyx"),Row(4,"xyx"),Row(5,"xyx"),Row(6,"xyx"))
- )
- sql("""drop table iud.dest99""")
- }
-
- test("update carbon tableusing destination table columns with hard coded value and where condition]") {
- sql("""drop table iud.dest110""")
- sql("""create table iud.dest110 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest110""")
- sql("""update iud.dest110 d set (c2, c5 ) = (c2 + 1, "xyx") where d.c1 = 'e'""").show()
- checkAnswer(
- sql("""select c2,c5 from iud.dest110 where c1 = 'e' """),
- Seq(Row(6,"xyx"))
- )
- sql("""drop table iud.dest110""")
- }
-
- test("update carbon table[using source table columns with where and exist and no destination table condition]") {
- sql("""drop table iud.dest120""")
- sql("""create table iud.dest120 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest120""")
- sql("""update iud.dest120 d set (c3, c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11)""").show()
- checkAnswer(
- sql("""select c3,c5 from iud.dest120 """),
- Seq(Row("MGM","Disco"),Row("RGK","Music"),Row("cc","ccc"),Row("dd","ddd"),Row("ee","eee"))
- )
- sql("""drop table iud.dest120""")
- }
-
- test("update carbon table[using destination table where and exist]") {
- sql("""drop table iud.dest130""")
- sql("""create table iud.dest130 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest130""")
- sql("""update iud.dest130 dd set (c2, c5 ) = (c2 + 1, "xyx") where dd.c1 = 'a'""").show()
- checkAnswer(
- sql("""select c2,c5 from iud.dest130 where c1 = 'a' """),
- Seq(Row(2,"xyx"))
- )
- sql("""drop table iud.dest130""")
- }
-
- test("update carbon table[using destination table (concat) where and exist]") {
- sql("""drop table iud.dest140""")
- sql("""create table iud.dest140 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest140""")
- sql("""update iud.dest140 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c1 = 'a'""").show()
- checkAnswer(
- sql("""select c2,c5 from iud.dest140 where c1 = 'a'"""),
- Seq(Row(2,"aaaz"))
- )
- sql("""drop table iud.dest140""")
- }
-
- test("update carbon table[using destination table (concat) with where") {
- sql("""drop table iud.dest150""")
- sql("""create table iud.dest150 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest150""")
- sql("""update iud.dest150 d set (c5) = (concat(c5 , "z")) where d.c1 = 'b'""").show()
- checkAnswer(
- sql("""select c5 from iud.dest150 where c1 = 'b' """),
- Seq(Row("bbbz"))
- )
- sql("""drop table iud.dest150""")
- }
-
- test("update table with data for datatype mismatch with column ") {
- sql("""update iud.update_01 set (imei) = ('skt') where level = 'aaa'""")
- checkAnswer(
- sql("""select * from iud.update_01 where imei = 'skt'"""),
- Seq()
- )
- }
-
- test("update carbon table-error[more columns in source table not allowed") {
- val exception = intercept[Exception] {
- sql("""update iud.dest d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"), "abc")""").show()
- }
- assertResult("Number of source and destination columns are not matching")(exception.getMessage)
- }
-
- test("update carbon table-error[no set columns") {
- intercept[Exception] {
- sql("""update iud.dest d set () = ()""").show()
- }
- }
-
- test("update carbon table-error[no set columns with updated column") {
- intercept[Exception] {
- sql("""update iud.dest d set = (c1+1)""").show()
- }
- }
- test("update carbon table-error[one set column with two updated column") {
- intercept[Exception] {
- sql("""update iud.dest set c2 = (c2 + 1, concat(c5 , "z") )""").show()
- }
- }
-
- test("""update carbon [special characters in value- test parsing logic ]""") {
- sql("""drop table iud.dest160""")
- sql("""create table iud.dest160 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest160""")
- sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
- sql("""update iud.dest160 set(c1) = ('abd$asjdh$adasj$l;sdf$*)$*)(&^')""").show()
- sql("""update iud.dest160 set(c1) =("\\")""").show()
- sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
- sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'a\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show()
- sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show()
- sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show()
- sql("""update iud.dest160 d set (c3,c5) = (select s.c33,'a\\a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show()
- sql("""update iud.dest160 d set (c3,c5) =(select s.c33,'a\'a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show()
- sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a\'a\"' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show()
- sql("""drop table iud.dest160""")
- }
-
- test("""update carbon [sub query, between and existing in outer condition.(Customer query ) ]""") {
- sql("""drop table iud.dest170""")
- sql("""create table iud.dest170 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest170""")
- sql("""update iud.dest170 d set (c3)=(select s.c33 from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where d.c2 between 1 and 3""").show()
- checkAnswer(
- sql("""select c3 from iud.dest170 as d where d.c2 between 1 and 3"""),
- Seq(Row("MGM"), Row("RGK"), Row("cc"))
- )
- sql("""drop table iud.dest170""")
- }
-
- test("""update carbon [self join select query ]""") {
- sql("""drop table iud.dest171""")
- sql("""create table iud.dest171 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest171""")
- sql("""update iud.dest171 d set (c3)=(select concat(s.c3 , "z") from iud.dest171 s where d.c2 = s.c2)""").show
- sql("""drop table iud.dest172""")
- sql("""create table iud.dest172 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
- sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest172""")
- sql("""update iud.dest172 d set (c3)=( concat(c3 , "z"))""").show
- checkAnswer(
- sql("""select c3 from iud.dest171"""),
- sql("""select c3 from iud.dest172""")
- )
- sql("""drop table iud.dest171""")
- sql("""drop table iud.dest172""")
- }
-
- test("update carbon table-error[closing bracket missed") {
- intercept[Exception] {
- sql("""update iud.dest d set (c2) = (194""").show()
- }
- }
-
- test("update carbon table-error[starting bracket missed") {
- intercept[Exception] {
- sql("""update iud.dest d set (c2) = 194)""").show()
- }
- }
-
- test("update carbon table-error[missing starting and closing bracket") {
- intercept[Exception] {
- sql("""update iud.dest d set (c2) = 194""").show()
- }
- }
-
- test("test create table with column name as tupleID"){
- intercept[Exception] {
- sql("CREATE table carbontable (empno int, tupleID String, " +
- "designation String, doj Timestamp, workgroupcategory int, " +
- "workgroupcategoryname String, deptno int, deptname String, projectcode int, " +
- "projectjoindate Timestamp, projectenddate Timestamp, attendance int, " +
- "utilization int,salary int) STORED BY 'org.apache.carbondata.format' " +
- "TBLPROPERTIES('DICTIONARY_INCLUDE'='empno,workgroupcategory,deptno,projectcode'," +
- "'DICTIONARY_EXCLUDE'='empname')")
- }
- }
-
- test("Failure of update operation due to bad record with proper error message") {
- try {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
- val errorMessage = intercept[Exception] {
- sql("drop table if exists update_with_bad_record")
- sql("create table update_with_bad_record(item int, name String) stored by 'carbondata'")
- sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/IUD/bad_record.csv' into table " +
- s"update_with_bad_record")
- sql("update update_with_bad_record set (item)=(3.45)").show()
- sql("drop table if exists update_with_bad_record")
- }
- assert(errorMessage.getMessage.contains("Data load failed due to bad record"))
- } finally {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
- }
- }
-
- override def afterAll {
- sql("use default")
- sql("drop database if exists iud cascade")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index e8627a1..bbdbe4f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, UpdateCoalescedRDD}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD, UpdateCoalescedRDD}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
import org.apache.spark.sql.hive.DistributionUtil
@@ -46,9 +46,9 @@ import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarForma
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvload.BlockDetails
+import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -56,7 +56,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
/**
* This is the factory class which can create different RDD depends on user needs.
@@ -76,7 +76,8 @@ object CarbonDataRDDFactory {
if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
compactionType = CompactionType.MAJOR_COMPACTION
- } else if (alterTableModel.compactionType.equalsIgnoreCase("IUD_UPDDEL_DELTA_COMPACTION")) {
+ } else if (alterTableModel.compactionType
+ .equalsIgnoreCase(CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
if (alterTableModel.segmentUpdateStatusManager.get != None) {
carbonLoadModel
@@ -653,6 +654,114 @@ object CarbonDataRDDFactory {
}
}
+ def loadDataFrameForUpdate(): Unit = {
+ def triggerDataLoadForSegment(key: String,
+ iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+ val rddResult = new updateResultImpl()
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
+ var partitionID = "0"
+ val loadMetadataDetails = new LoadMetadataDetails
+ val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+ var uniqueLoadStatusId = ""
+ try {
+ val segId = key
+ val taskNo = CarbonUpdateUtil
+ .getLatestTaskIdForSegment(segId,
+ CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
+ carbonTable.getCarbonTableIdentifier))
+ val index = taskNo + 1
+ uniqueLoadStatusId = carbonLoadModel.getTableName +
+ CarbonCommonConstants.UNDERSCORE +
+ (index + "_0")
+
+ // convert timestamp
+ val timeStampInLong = updateModel.get.updatedTimeStamp + ""
+ loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setLoadName(segId)
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+ carbonLoadModel.setPartitionId(partitionID)
+ carbonLoadModel.setSegmentId(segId)
+ carbonLoadModel.setTaskNo(String.valueOf(index))
+ carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
+
+ // During Block Spill case Increment of File Count and proper adjustment of Block
+ // naming is only done when AbstractFactDataWriter.java : initializeWriter get
+ // CarbondataFileName as null. For handling Block Spill not setting the
+ // CarbondataFileName in case of Update.
+ // carbonLoadModel.setCarbondataFileName(newBlockName)
+
+ // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index)
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY +
+ UUID.randomUUID().toString
+ UpdateDataLoad.DataLoadForUpdate(segId,
+ index,
+ iter,
+ carbonLoadModel,
+ loadMetadataDetails)
+ } catch {
+ case e: Exception =>
+ LOGGER.info("DataLoad failure")
+ LOGGER.error(e)
+ throw e
+ }
+
+ var finished = false
+
+ override def hasNext: Boolean = !finished
+
+ override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
+ finished = true
+ rddResult
+ .getKey(uniqueLoadStatusId,
+ (loadMetadataDetails, executionErrors))
+ }
+ }
+ resultIter
+ }
+
+ val updateRdd = dataFrame.get.rdd
+
+
+ val keyRDD = updateRdd.map(row =>
+ // splitting as (key, value) i.e., (segment, updatedRows)
+ (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))
+ )
+ val groupBySegmentRdd = keyRDD.groupByKey()
+
+ val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, Array[String]] { p =>
+ DataLoadPartitionCoalescer.getPreferredLocs(groupBySegmentRdd, p).map(_.host)
+ }.distinct.size
+ val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
+ sqlContext.sparkContext)
+ val groupBySegmentAndNodeRdd =
+ new UpdateCoalescedRDD[(String, scala.Iterable[Row])](groupBySegmentRdd,
+ nodes.distinct.toArray)
+
+ res = groupBySegmentAndNodeRdd.map(x =>
+ triggerDataLoadForSegment(x._1, x._2.toIterator).toList
+ ).collect()
+
+ }
+
+ def loadDataForPartitionTable(): Unit = {
+ try {
+ val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
+ status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ rdd).collect()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, "load data failed for partition table")
+ throw ex
+ }
+ }
+
if (!updateModel.isDefined) {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
@@ -661,10 +770,11 @@ object CarbonDataRDDFactory {
var errorMessage: String = "DataLoad failure"
var executorMessage: String = ""
try {
- if (dataFrame.isDefined) {
+ if (updateModel.isDefined) {
+ loadDataFrameForUpdate()
+ } else if (dataFrame.isDefined) {
loadDataFrame()
- }
- else {
+ } else {
loadDataFile()
}
if (updateModel.isDefined) {
@@ -743,15 +853,18 @@ object CarbonDataRDDFactory {
// handle the status file updation for the update cmd.
if (updateModel.isDefined) {
- if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
- // updateModel.get.executorErrors.errorMsg = errorMessage
+ if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+ // updateModel.get.executorErrors.errorMsg = errorMessage
if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) {
updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
- updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
+ if (null != executorMessage && !executorMessage.isEmpty) {
+ updateModel.get.executorErrors.errorMsg = executorMessage
+ } else {
+ updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
+ }
}
return
- }
- else {
+ } else {
// in success case handle updation of the table status file.
// success case.
val segmentDetails = new util.HashSet[String]()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 6651abe..0c3414a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -90,7 +90,7 @@ case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit
case class ProjectForUpdate(
table: UnresolvedRelation,
columns: List[String],
- child: Seq[LogicalPlan] ) extends Command {
+ children: Seq[LogicalPlan] ) extends LogicalPlan {
override def output: Seq[AttributeReference] = Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 39d03bb..01395ff 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, DataFrame, Dataset, Row, SparkSession, getDB}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -566,8 +566,10 @@ object deleteExecution {
CarbonUpdateUtil
.createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
- val rowContRdd = sparkSession.sparkContext.parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
- keyRdd.partitions.size)
+ val rowContRdd =
+ sparkSession.sparkContext.parallelize(
+ blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
+ keyRdd.partitions.length)
// val rowContRdd = sqlContext.sparkContext
// .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
@@ -820,9 +822,9 @@ object UpdateExecution {
}
val ex = dataFrame.queryExecution.analyzed
val res = ex find {
- case relation: LogicalRelation if (relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- isDestinationRelation(relation.relation
- .asInstanceOf[CarbonDatasourceHadoopRelation])) =>
+ case relation: LogicalRelation
+ if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ isDestinationRelation(relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) =>
true
case _ => false
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 7d94c92..0fb5c47 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedE
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
import org.apache.spark.sql.execution.{ProjectExec, SparkSqlParser, SubqueryExec}
+import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -87,6 +87,8 @@ object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
this.sparkSession = sparkSession
}
+ private val parser = new SparkSqlParser(sparkSession.sessionState.conf)
+
private def processUpdateQuery(
table: UnresolvedRelation,
columns: List[String],
@@ -102,12 +104,13 @@ object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
val projList = Seq(
UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId)
// include tuple id and rest of the required columns in subqury
- SubqueryAlias(table.alias.getOrElse(""), Project(projList, relation), Option(table.tableIdentifier))
+ SubqueryAlias(table.alias.getOrElse(""),
+ Project(projList, relation), Option(table.tableIdentifier))
}
// get the un-analyzed logical plan
val targetTable = prepareTargetReleation(table)
- val selectPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan(selectStmt) transform {
- case Project(projectList, child) if (!includedDestColumns) =>
+ val selectPlan = parser.parsePlan(selectStmt) transform {
+ case Project(projectList, child) if !includedDestColumns =>
includedDestColumns = true
if (projectList.size != columns.size) {
sys.error("Number of source and destination columns are not matching")
@@ -126,11 +129,10 @@ object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
val list = Seq(
UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq)))) ++ renamedProjectList
Project(list, child)
- case Filter(cond, child) if (!includedDestRelation) =>
+ case Filter(cond, child) if !includedDestRelation =>
includedDestRelation = true
Filter(cond, Join(child, targetTable, Inner, None))
- case r @ UnresolvedRelation(t, a) if (!includedDestRelation &&
- t != table.tableIdentifier) =>
+ case r @ UnresolvedRelation(t, a) if !includedDestRelation && t != table.tableIdentifier =>
includedDestRelation = true
Join(r, targetTable, Inner, None)
}
@@ -138,8 +140,8 @@ object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
// special case to handle self join queries
// Eg. update tableName SET (column1) = (column1+1)
selectPlan transform {
- case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
- addedTupleId == false) =>
+ case relation: UnresolvedRelation
+ if table.tableIdentifier == relation.tableIdentifier && !addedTupleId =>
addedTupleId = true
targetTable
}
@@ -152,22 +154,17 @@ object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
// Create a dummy projection to include filter conditions
var newPlan: LogicalPlan = null
if (table.tableIdentifier.database.isDefined) {
- newPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan("select * from " +
- table.tableIdentifier.database
- .getOrElse("") + "." +
- table.tableIdentifier.table +
- " " + alias + " " +
- filter)
+ newPlan = parser.parsePlan("select * from " +
+ table.tableIdentifier.database.getOrElse("") + "." +
+ table.tableIdentifier.table + " " + alias + " " + filter)
}
else {
- newPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan("select * from " +
- table.tableIdentifier.table +
- " " + alias + " " +
- filter)
+ newPlan = parser.parsePlan("select * from " +
+ table.tableIdentifier.table + " " + alias + " " + filter)
}
newPlan transform {
- case UnresolvedRelation(t, Some(a)) if (
- !transformed && t == table.tableIdentifier && a == alias) =>
+ case UnresolvedRelation(t, Some(a))
+ if !transformed && t == table.tableIdentifier && a == alias =>
transformed = true
// Add the filter condition of update statement on destination table
SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier))
@@ -182,18 +179,22 @@ object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
}
def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
- // val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
val tidSeq = Seq(getDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
table.tableIdentifier.table)
var addedTupleId = false
- val selectPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan(selectStmt) transform {
- case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
- addedTupleId == false) =>
+ val parsePlan = parser.parsePlan(selectStmt)
+ val selectPlan = parsePlan transform {
+ case relation: UnresolvedRelation
+ if table.tableIdentifier == relation.tableIdentifier && !addedTupleId =>
addedTupleId = true
val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
Seq.empty, isDistinct = false), "tupleId")())
+ val alias = table.alias match {
+ case Some(alias) => Some(table.alias.toSeq)
+ case _ => None
+ }
val projList = Seq(
- UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId)
+ UnresolvedAlias(UnresolvedStar(alias)), tupleId)
// include tuple id in subqury
Project(projList, relation)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index cc27181..7a6c513 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -122,17 +122,22 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
case Project(pList, child) if (!isTransformed) =>
val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
.splitAt(pList.size - cols.size)
- val diff = cols.diff(dest.map(_.name))
+ val diff = cols.diff(dest.map(_.name.toLowerCase))
if (diff.size > 0) {
sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
}
isTransformed = true
- Project(dest.filter(a => !cols.contains(a.name)) ++ source, child)
+ Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child)
}
- ProjectForUpdateCommand(newPlan, Seq(table.tableIdentifier.toString()))
+ val identifier = table.tableIdentifier.database match {
+ case Some(db) => Seq(db, table.tableIdentifier.table)
+ case _ => Seq(table.tableIdentifier.table)
+ }
+ ProjectForUpdateCommand(newPlan, identifier)
}
}
+
def isOptimized(plan: LogicalPlan): Boolean = {
plan find {
case cd: CarbonDictionaryCatalystDecoder => true
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 367aab4..bff1af3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -125,7 +125,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
case Seq(dbName, tableName) => Some(tableName)
case Seq(tableName) => Some(tableName)
}
- UnresolvedRelation(tableIdentifier, Option(tableAlias.toString))
+ UnresolvedRelation(tableIdentifier, tableAlias)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d16d504/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index cda907c..0145c2d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -520,7 +520,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
getMaxOfBlockAndFileSize(fileSizeInBytes, localCarbonFile.getSize()));
} catch (IOException e) {
throw new CarbonDataWriterException(
- "Problem while copying file from local store to carbon store");
+ "Problem while copying file from local store to carbon store", e);
}
LOGGER.info(
"Total copy time (ms) to copy file " + localFileName + " is " + (System.currentTimeMillis()