You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:29:08 UTC
[02/56] [abbrv] carbondata git commit: add EncodingStrategy
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxV2Format.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxV2Format.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxV2Format.scala
new file mode 100644
index 0000000..a3193c3
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxV2Format.scala
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for data loading with hive syntax and old syntax
+ *
+ */
+class TestLoadDataWithHiveSyntaxV2Format extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+ "V2"
+ )
+ sql("drop table if exists escapechar1")
+ sql("drop table if exists escapechar2")
+ sql("drop table if exists escapechar3")
+ sql("drop table if exists specialcharacter1")
+ sql("drop table if exists specialcharacter2")
+ sql("drop table if exists collessthanschema")
+ sql("drop table if exists decimalarray")
+ sql("drop table if exists decimalstruct")
+ sql("drop table if exists carbontable")
+ sql("drop table if exists hivetable")
+ sql("drop table if exists testtable")
+ sql("drop table if exists testhivetable")
+ sql("drop table if exists testtable1")
+ sql("drop table if exists testhivetable1")
+ sql("drop table if exists complexcarbontable")
+ sql("drop table if exists complex_t3")
+ sql("drop table if exists complex_hive_t3")
+ sql("drop table if exists header_test")
+ sql("drop table if exists duplicateColTest")
+ sql("drop table if exists mixed_header_test")
+ sql("drop table if exists primitivecarbontable")
+ sql("drop table if exists UPPERCASEcube")
+ sql("drop table if exists lowercaseCUBE")
+ sql("drop table if exists carbontable1")
+ sql("drop table if exists hivetable1")
+ sql("drop table if exists comment_test")
+ sql("drop table if exists smallinttable")
+ sql("drop table if exists smallinthivetable")
+ sql(
+ "CREATE table carbontable (empno int, empname String, designation String, doj String, " +
+ "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
+ "projectcode int, projectjoindate String, projectenddate String, attendance int," +
+ "utilization int,salary int) STORED BY 'org.apache.carbondata.format'"
+ )
+ sql(
+ "create table hivetable(empno int, empname String, designation string, doj String, " +
+ "workgroupcategory int, workgroupcategoryname String,deptno int, deptname String, " +
+ "projectcode int, projectjoindate String,projectenddate String, attendance String," +
+ "utilization String,salary String)row format delimited fields terminated by ','"
+ )
+
+ }
+
+ test("create table with smallint type and query smallint table") {
+ sql("drop table if exists smallinttable")
+ sql("drop table if exists smallinthivetable")
+ sql(
+ "create table smallinttable(empno smallint, empname String, designation string, " +
+ "doj String, workgroupcategory int, workgroupcategoryname String,deptno int, " +
+ "deptname String, projectcode int, projectjoindate String,projectenddate String, " +
+ "attendance String, utilization String,salary String)" +
+ "STORED BY 'org.apache.carbondata.format'"
+ )
+
+ sql(
+ "create table smallinthivetable(empno smallint, empname String, designation string, " +
+ "doj String, workgroupcategory int, workgroupcategoryname String,deptno int, " +
+ "deptname String, projectcode int, projectjoindate String,projectenddate String, " +
+ "attendance String, utilization String,salary String)" +
+ "row format delimited fields terminated by ','"
+ )
+
+ sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table smallinttable ")
+ sql(s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' overwrite " +
+ "INTO table smallinthivetable")
+
+ checkAnswer(
+ sql("select empno from smallinttable"),
+ sql("select empno from smallinthivetable")
+ )
+
+ sql("drop table if exists smallinttable")
+ sql("drop table if exists smallinthivetable")
+ }
+
+ test("test data loading and validate query output") {
+ sql("drop table if exists testtable")
+ sql("drop table if exists testhivetable")
+ //Create test cube and hive table
+ sql(
+ "CREATE table testtable (empno string, empname String, designation String, doj String, " +
+ "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " +
+ "projectcode string, projectjoindate String, projectenddate String,attendance double," +
+ "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" +
+ "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," +
+ "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')"
+ )
+ sql(
+ "create table testhivetable(empno string, empname String, designation string, doj String, " +
+ "workgroupcategory string, workgroupcategoryname String,deptno string, deptname String, " +
+ "projectcode string, projectjoindate String,projectenddate String, attendance double," +
+ "utilization double,salary double)row format delimited fields terminated by ','"
+ )
+ //load data into test cube and hive table and validate query result
+ sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table testtable")
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' overwrite INTO table " +
+ "testhivetable"
+ )
+ checkAnswer(sql("select * from testtable"), sql("select * from testhivetable"))
+ //load data incrementally and validate query result
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE testtable OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO table testhivetable"
+ )
+ checkAnswer(sql("select * from testtable"), sql("select * from testhivetable"))
+ //drop test cube and table
+ sql("drop table if exists testtable")
+ sql("drop table if exists testhivetable")
+ }
+
+ /**
+ * TODO: temporarily changing cube names to different names,
+ * however deletion and creation of cube with same name
+ */
+ test("test data loading with different case file header and validate query output") {
+ sql("drop table if exists testtable1")
+ sql("drop table if exists testhivetable1")
+ //Create test cube and hive table
+ sql(
+ "CREATE table testtable1 (empno string, empname String, designation String, doj String, " +
+ "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " +
+ "projectcode string, projectjoindate String, projectenddate String,attendance double," +
+ "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" +
+ "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," +
+ "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')"
+ )
+ sql(
+ "create table testhivetable1(empno string, empname String, designation string, doj String, " +
+ "workgroupcategory string, workgroupcategoryname String,deptno string, deptname String, " +
+ "projectcode string, projectjoindate String,projectenddate String, attendance double," +
+ "utilization double,salary double)row format delimited fields terminated by ','"
+ )
+ //load data into test cube and hive table and validate query result
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO table testtable1 " +
+ "options('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='EMPno, empname,designation,doj," +
+ "workgroupcategory,workgroupcategoryname, deptno,deptname,projectcode,projectjoindate," +
+ "projectenddate, attendance, utilization,SALARY')"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' overwrite INTO table " +
+ "testhivetable1"
+ )
+ checkAnswer(sql("select * from testtable1"), sql("select * from testhivetable1"))
+ //drop test cube and table
+ sql("drop table if exists testtable1")
+ sql("drop table if exists testhivetable1")
+ }
+
+ test("test hive table data loading") {
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' overwrite INTO table " +
+ "hivetable"
+ )
+ sql(s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO table hivetable")
+ }
+
+ test("test carbon table data loading using old syntax") {
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbontable OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ }
+
+ test("test carbon table data loading using new syntax compatible with hive") {
+ sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table carbontable")
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table carbontable options" +
+ "('DELIMITER'=',', 'QUOTECHAR'='\"')"
+ )
+ }
+
+ test("test carbon table data loading using new syntax with overwrite option compatible with hive")
+ {
+ try {
+ sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' overwrite INTO table carbontable")
+ } catch {
+ case e: Throwable => {
+ assert(e.getMessage
+ .equals("Overwrite is not supported for carbon table with default.carbontable")
+ )
+ }
+ }
+ }
+
+ test("complex types data loading") {
+ sql("drop table if exists complexcarbontable")
+ sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
+ "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
+ "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+ "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
+ "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+ "double,contractNumber double) " +
+ "STORED BY 'org.apache.carbondata.format' " +
+ "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/complexdata.csv' INTO table " +
+ "complexcarbontable " +
+ "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
+ "ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
+ "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')"
+ )
+ sql("drop table if exists complexcarbontable")
+ }
+
+ test(
+ "complex types data loading with more unused columns and different order of complex columns " +
+ "in csv and create table"
+ ) {
+ sql("drop table if exists complexcarbontable")
+ sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
+ "mobile struct<imei:string, imsi:string>, ROMSize string, purchasedate string," +
+ "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+ "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
+ "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+ "double,contractNumber double) " +
+ "STORED BY 'org.apache.carbondata.format' " +
+ "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId','DICTIONARY_EXCLUDE'='channelsId')"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/complextypediffentcolheaderorder.csv' INTO " +
+ "table complexcarbontable " +
+ "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
+ "ROMSize,purchasedate,MAC,abc,mobile,locationinfo,proddate,gamePointId,contractNumber'," +
+ "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')"
+ )
+ sql("select count(*) from complexcarbontable")
+ sql("drop table if exists complexcarbontable")
+ }
+
+ test("test carbon table data loading with csv file Header in caps") {
+ sql("drop table if exists header_test")
+ sql(
+ "create table header_test(empno int, empname String, designation string, doj String, " +
+ "workgroupcategory int, workgroupcategoryname String,deptno int, deptname String, " +
+ "projectcode int, projectjoindate String,projectenddate String, attendance String," +
+ "utilization String,salary String) STORED BY 'org.apache.carbondata.format'"
+ )
+ val csvFilePath = s"$resourcesPath/data_withCAPSHeader.csv"
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO table header_test OPTIONS " +
+ "('DELIMITER'=',', 'QUOTECHAR'='\"')");
+ checkAnswer(sql("select empno from header_test"),
+ Seq(Row(11), Row(12))
+ )
+ }
+
+ test("test duplicate column validation") {
+ try {
+ sql("create table duplicateColTest(col1 string, Col1 string)")
+ }
+ catch {
+ case e: Exception => {
+ assert(e.getMessage.contains("Duplicate column name") ||
+ e.getMessage.contains("Found duplicate column"))
+ }
+ }
+ }
+
+ test(
+ "test carbon table data loading with csv file Header in Mixed Case and create table columns " +
+ "in mixed case"
+ ) {
+ sql("drop table if exists mixed_header_test")
+ sql(
+ "create table mixed_header_test(empno int, empname String, Designation string, doj String, " +
+ "Workgroupcategory int, workgroupcategoryname String,deptno int, deptname String, " +
+ "projectcode int, projectjoindate String,projectenddate String, attendance String," +
+ "utilization String,salary String) STORED BY 'org.apache.carbondata.format'"
+ )
+ val csvFilePath = s"$resourcesPath/data_withMixedHeader.csv"
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO table mixed_header_test OPTIONS " +
+ "('DELIMITER'=',', 'QUOTECHAR'='\"')");
+ checkAnswer(sql("select empno from mixed_header_test"),
+ Seq(Row(11), Row(12))
+ )
+ }
+
+
+ test("complex types data loading with hive column having more than required column values") {
+ sql("drop table if exists complexcarbontable")
+ sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
+ "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
+ "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+ "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
+ "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+ "double,contractNumber double) " +
+ "STORED BY 'org.apache.carbondata.format' " +
+ "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/complexdatastructextra.csv' INTO table " +
+ "complexcarbontable " +
+ "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
+ "ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
+ "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')"
+ )
+ sql("drop table if exists complexcarbontable")
+ }
+
+ test("complex types & no dictionary columns data loading") {
+ sql("drop table if exists complexcarbontable")
+ sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
+ "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
+ "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+ "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
+ "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+ "double,contractNumber double) " +
+ "STORED BY 'org.apache.carbondata.format' " +
+ "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId', 'DICTIONARY_EXCLUDE'='ROMSize," +
+ "purchasedate')"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/complexdata.csv' INTO table " +
+ "complexcarbontable " +
+ "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
+ "ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
+ "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')"
+ );
+ sql("drop table if exists complexcarbontable")
+ }
+
+ test("array<string> and string datatype for same column is not working properly") {
+ sql("drop table if exists complexcarbontable")
+ sql("create table complexcarbontable(deviceInformationId int, MAC array<string>, channelsId string, "+
+ "ROMSize string, purchasedate string, gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' "+
+ "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')")
+ sql(s"LOAD DATA local inpath '$resourcesPath/complexdatareordered.csv' INTO table complexcarbontable "+
+ "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,MAC,channelsId,ROMSize,purchasedate,gamePointId,contractNumber',"+
+ "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+ sql("drop table if exists complexcarbontable")
+ sql("create table primitivecarbontable(deviceInformationId int, MAC string, channelsId string, "+
+ "ROMSize string, purchasedate string, gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' "+
+ "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')")
+ sql(s"LOAD DATA local inpath '$resourcesPath/complexdatareordered.csv' INTO table primitivecarbontable "+
+ "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,MAC,channelsId,ROMSize,purchasedate,gamePointId,contractNumber',"+
+ "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+ sql("drop table if exists primitivecarbontable")
+ }
+
+ test(
+ "test carbon table data loading when table name is in different case with create table, for " +
+ "UpperCase"
+ ) {
+ sql("drop table if exists UPPERCASEcube")
+ sql("create table UPPERCASEcube(empno Int, empname String, designation String, " +
+ "doj String, workgroupcategory Int, workgroupcategoryname String, deptno Int, " +
+ "deptname String, projectcode Int, projectjoindate String, projectenddate String, " +
+ "attendance Int,utilization Double,salary Double) STORED BY 'org.apache.carbondata.format'"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table uppercasecube OPTIONS" +
+ "('DELIMITER'=',', 'QUOTECHAR'='\"')"
+ )
+ sql("drop table if exists UpperCaseCube")
+ }
+
+ test(
+ "test carbon table data loading when table name is in different case with create table ,for " +
+ "LowerCase"
+ ) {
+ sql("drop table if exists lowercaseCUBE")
+ sql("create table lowercaseCUBE(empno Int, empname String, designation String, " +
+ "doj String, workgroupcategory Int, workgroupcategoryname String, deptno Int, " +
+ "deptname String, projectcode Int, projectjoindate String, projectenddate String, " +
+ "attendance Int,utilization Double,salary Double) STORED BY 'org.apache.carbondata.format'"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table LOWERCASECUBE OPTIONS" +
+ "('DELIMITER'=',', 'QUOTECHAR'='\"')"
+ )
+ sql("drop table if exists LowErcasEcube")
+ }
+
+ test("test carbon table data loading using escape char 1") {
+ sql("DROP TABLE IF EXISTS escapechar1")
+
+ sql(
+ """
+ CREATE TABLE IF NOT EXISTS escapechar1
+ (ID Int, date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int)
+ STORED BY 'org.apache.carbondata.format'
+ """
+ )
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ sql(
+ s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/datawithbackslash.csv' into table escapechar1
+ OPTIONS('ESCAPECHAR'='@')
+ """
+ )
+ checkAnswer(sql("select count(*) from escapechar1"), Seq(Row(10)))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ sql("DROP TABLE IF EXISTS escapechar1")
+ }
+
+ test("test carbon table data loading using escape char 2") {
+ sql("DROP TABLE IF EXISTS escapechar2")
+
+ sql(
+ """
+ CREATE TABLE escapechar2(imei string,specialchar string)
+ STORED BY 'org.apache.carbondata.format'
+ """
+ )
+
+ sql(
+ s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/datawithescapecharacter.csv' into table escapechar2
+ options ('DELIMITER'=',', 'QUOTECHAR'='"','ESCAPECHAR'='\')
+ """
+ )
+ checkAnswer(sql("select count(*) from escapechar2"), Seq(Row(21)))
+ checkAnswer(sql("select specialchar from escapechar2 where imei = '1AA44'"), Seq(Row("escapeesc")))
+ sql("DROP TABLE IF EXISTS escapechar2")
+ }
+
+ test("test carbon table data loading using escape char 3") {
+ sql("DROP TABLE IF EXISTS escapechar3")
+
+ sql(
+ """
+ CREATE TABLE escapechar3(imei string,specialchar string)
+ STORED BY 'org.apache.carbondata.format'
+ """
+ )
+
+ sql(
+ s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/datawithescapecharacter.csv' into table escapechar3
+ options ('DELIMITER'=',', 'QUOTECHAR'='"','ESCAPECHAR'='@')
+ """
+ )
+ checkAnswer(sql("select count(*) from escapechar3"), Seq(Row(21)))
+ checkAnswer(sql("select specialchar from escapechar3 where imei in ('1232','12323')"), Seq(Row
+ ("ayush@b.com"), Row("ayushb.com")
+ )
+ )
+ sql("DROP TABLE IF EXISTS escapechar3")
+ }
+
+ test("test carbon table data loading with special character 1") {
+ sql("DROP TABLE IF EXISTS specialcharacter1")
+
+ sql(
+ """
+ CREATE TABLE specialcharacter1(imei string,specialchar string)
+ STORED BY 'org.apache.carbondata.format'
+ """
+ )
+
+ sql(
+ s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/datawithspecialcharacter.csv' into table specialcharacter1
+ options ('DELIMITER'=',', 'QUOTECHAR'='"')
+ """
+ )
+ checkAnswer(sql("select count(*) from specialcharacter1"), Seq(Row(37)))
+ checkAnswer(sql("select specialchar from specialcharacter1 where imei='1AA36'"), Seq(Row("\"i\"")))
+ sql("DROP TABLE IF EXISTS specialcharacter1")
+ }
+
+ test("test carbon table data loading with special character 2") {
+ sql("DROP TABLE IF EXISTS specialcharacter2")
+
+ sql(
+ """
+ CREATE table specialcharacter2(customer_id int, 124_string_level_province String, date_level String,
+ Time_level String, lname String, fname String, mi String, address1 String, address2
+ String, address3 String, address4 String, city String, country String, phone1 String,
+ phone2 String, marital_status String, yearly_income String, gender String, education
+ String, member_card String, occupation String, houseowner String, fullname String,
+ numeric_level double, account_num double, customer_region_id int, total_children int,
+ num_children_at_home int, num_cars_owned int)
+ STORED BY 'org.apache.carbondata.format'
+ """
+ )
+
+ sql(
+ s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/datawithcomplexspecialchar.csv' into
+ table specialcharacter2 options ('DELIMITER'=',', 'QUOTECHAR'='"','ESCAPECHAR'='"')
+ """
+ )
+ checkAnswer(sql("select count(*) from specialcharacter2"), Seq(Row(150)))
+ checkAnswer(sql("select 124_string_level_province from specialcharacter2 where customer_id=103"),
+ Seq(Row("\"state province # 124\""))
+ )
+ sql("DROP TABLE IF EXISTS specialcharacter2")
+ }
+
+ test("test data which contain column less than schema"){
+ sql("DROP TABLE IF EXISTS collessthanschema")
+
+ sql(
+ """
+ CREATE TABLE IF NOT EXISTS collessthanschema
+ (ID Int, date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int)
+ STORED BY 'org.apache.carbondata.format'
+ """)
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ sql(s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/lessthandatacolumndata.csv' into table collessthanschema
+ """)
+ checkAnswer(sql("select count(*) from collessthanschema"),Seq(Row(10)))
+ sql("DROP TABLE IF EXISTS collessthanschema")
+ }
+
+ test("test data which contain column with decimal data type in array."){
+ sql("DROP TABLE IF EXISTS decimalarray")
+
+ sql(
+ """
+ CREATE TABLE IF NOT EXISTS decimalarray
+ (ID decimal(5,5), date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int, complex
+ array<decimal(4,2)>)
+ STORED BY 'org.apache.carbondata.format'
+ """
+ )
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ sql(s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/complexTypeDecimal.csv' into table decimalarray
+ """)
+ checkAnswer(sql("select count(*) from decimalarray"),Seq(Row(8)))
+ sql("DROP TABLE IF EXISTS decimalarray")
+ }
+
+ test("test data which contain column with decimal data type in struct."){
+ sql("DROP TABLE IF EXISTS decimalstruct")
+
+ sql(
+ """
+ CREATE TABLE IF NOT EXISTS decimalstruct
+ (ID decimal(5,5), date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int, complex
+ struct<a:decimal(4,2)>)
+ STORED BY 'org.apache.carbondata.format'
+ """
+ )
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ sql(s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/complexTypeDecimal.csv' into table decimalstruct
+ """)
+ checkAnswer(sql("select count(*) from decimalstruct"),Seq(Row(8)))
+ sql("DROP TABLE IF EXISTS decimalstruct")
+ }
+
+ test("test data which contain column with decimal data type in array of struct."){
+ sql("DROP TABLE IF EXISTS complex_t3")
+ sql("DROP TABLE IF EXISTS complex_hive_t3")
+
+ sql(
+ """
+ CREATE TABLE complex_t3
+ (ID decimal, date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int, complex
+ array<struct<a:decimal(4,2),str:string>>)
+ STORED BY 'org.apache.carbondata.format'
+ """
+ )
+ sql(
+ """
+ CREATE TABLE complex_hive_t3
+ (ID decimal, date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int, complex
+ array<struct<a:decimal(4,2),str:string>>)
+ row format delimited fields terminated by ','
+ """
+ )
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ sql(s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/complexTypeDecimalNested.csv' into table complex_t3
+ """)
+ sql(s"""
+ LOAD DATA LOCAL INPATH '$resourcesPath/complexTypeDecimalNestedHive.csv' into table complex_hive_t3
+ """)
+ checkAnswer(sql("select count(*) from complex_t3"),sql("select count(*) from complex_hive_t3"))
+ checkAnswer(sql("select id from complex_t3 where salary = 15000"),sql("select id from complex_hive_t3 where salary = 15000"))
+ }
+
+ test("test data loading when delimiter is '|' and data with header") {
+ sql(
+ "CREATE table carbontable1 (empno string, empname String, designation String, doj String, " +
+ "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " +
+ "projectcode string, projectjoindate String, projectenddate String,attendance double," +
+ "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" +
+ "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," +
+ "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')"
+ )
+ sql(
+ "create table hivetable1 (empno string, empname String, designation string, doj String, " +
+ "workgroupcategory string, workgroupcategoryname String,deptno string, deptname String, " +
+ "projectcode string, projectjoindate String,projectenddate String, attendance double," +
+ "utilization double,salary double)row format delimited fields terminated by ','"
+ )
+
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/datadelimiter.csv' INTO TABLE carbontable1 OPTIONS" +
+ "('DELIMITER'= '|', 'QUOTECHAR'= '\"')"
+ )
+
+ sql(s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO table hivetable1")
+
+ checkAnswer(sql("select * from carbontable1"), sql("select * from hivetable1"))
+ }
+
+ test("test data loading with comment option") {
+ sql("drop table if exists comment_test")
+ sql(
+ "create table comment_test(imei string, age int, task bigint, num double, level decimal(10," +
+ "3), productdate timestamp, mark int, name string) STORED BY 'org.apache.carbondata.format'"
+ )
+ sql(
+ s"LOAD DATA local inpath '$resourcesPath/comment.csv' INTO TABLE comment_test " +
+ "options('DELIMITER' = ',', 'QUOTECHAR' = '.', 'COMMENTCHAR' = '?','FILEHEADER'='imei,age,task,num,level,productdate,mark,name', 'maxcolumns'='180')"
+ )
+ checkAnswer(sql("select imei from comment_test"),Seq(Row("\".carbon"),Row("#?carbon"), Row(""),
+ Row("~carbon,")))
+ }
+
+
+ override def afterAll {
+ sql("drop table if exists escapechar1")
+ sql("drop table if exists escapechar2")
+ sql("drop table if exists escapechar3")
+ sql("drop table if exists specialcharacter1")
+ sql("drop table if exists specialcharacter2")
+ sql("drop table if exists collessthanschema")
+ sql("drop table if exists decimalarray")
+ sql("drop table if exists decimalstruct")
+ sql("drop table if exists carbontable")
+ sql("drop table if exists hivetable")
+ sql("drop table if exists testtable")
+ sql("drop table if exists testhivetable")
+ sql("drop table if exists testtable1")
+ sql("drop table if exists testhivetable1")
+ sql("drop table if exists complexcarbontable")
+ sql("drop table if exists complex_t3")
+ sql("drop table if exists complex_hive_t3")
+ sql("drop table if exists header_test")
+ sql("drop table if exists duplicateColTest")
+ sql("drop table if exists mixed_header_test")
+ sql("drop table if exists primitivecarbontable")
+ sql("drop table if exists UPPERCASEcube")
+ sql("drop table if exists lowercaseCUBE")
+ sql("drop table if exists carbontable1")
+ sql("drop table if exists hivetable1")
+ sql("drop table if exists comment_test")
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+ CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 7a6c513..4879e49 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -174,7 +174,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
* Steps for changing the plan.
* 1. It finds out the join condition columns and dimension aggregate columns which are need to
* be decoded just before that plan executes.
- * 2. Plan starts transform by adding the decoder to the plan where it needs the decoded data
+ * 2. Plan starts encode by adding the decoder to the plan where it needs the decoded data
* like dimension aggregate columns decoder under aggregator and join condition decoder under
* join children.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 8c345cb..ccd5afd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
@@ -208,7 +209,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
try {
sortDataRow.initialize();
- } catch (CarbonSortKeyAndGroupByException e) {
+ } catch (MemoryException e) {
throw new CarbonDataLoadingException(e);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index e2623bd..3d494de 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
@@ -88,7 +89,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
final int batchSize = CarbonProperties.getInstance().getBatchSize();
try {
sortDataRow.initialize();
- } catch (CarbonSortKeyAndGroupByException e) {
+ } catch (MemoryException e) {
throw new CarbonDataLoadingException(e);
}
ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
index 0716796..aed2bc1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.schema.BucketingInfo;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -41,7 +42,6 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -91,7 +91,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
sortDataRows[i].initialize();
}
- } catch (CarbonSortKeyAndGroupByException e) {
+ } catch (MemoryException e) {
throw new CarbonDataLoadingException(e);
}
ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
deleted file mode 100644
index cc59495..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-/**
- * Holds the pointers for rows.
- */
-public class IntPointerBuffer {
-
- private int length;
-
- private int actualSize;
-
- private int[] pointerBlock;
-
- private MemoryBlock baseBlock;
-
- private MemoryBlock pointerMemoryBlock;
-
- public IntPointerBuffer(MemoryBlock baseBlock) {
- // TODO can be configurable, it is initial size and it can grow automatically.
- this.length = 100000;
- pointerBlock = new int[length];
- this.baseBlock = baseBlock;
- }
-
- public IntPointerBuffer(int length) {
- this.length = length;
- pointerBlock = new int[length];
- }
-
- public void set(int index, int value) {
- pointerBlock[index] = value;
- }
-
- public void set(int value) {
- ensureMemory();
- pointerBlock[actualSize] = value;
- actualSize++;
- }
-
- /**
- * Returns the value at position {@code index}.
- */
- public int get(int index) {
- assert index >= 0 : "index (" + index + ") should >= 0";
- assert index < length : "index (" + index + ") should < length (" + length + ")";
- if (pointerBlock == null) {
- return CarbonUnsafe.unsafe.getInt(pointerMemoryBlock.getBaseObject(),
- pointerMemoryBlock.getBaseOffset() + (index * 4));
- }
- return pointerBlock[index];
- }
-
- public void loadToUnsafe() throws CarbonSortKeyAndGroupByException {
- pointerMemoryBlock = UnsafeSortDataRows.getMemoryBlock(pointerBlock.length * 4);
- for (int i = 0; i < pointerBlock.length; i++) {
- CarbonUnsafe.unsafe
- .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
- pointerBlock[i]);
- }
- pointerBlock = null;
- }
-
- public int getActualSize() {
- return actualSize;
- }
-
- public MemoryBlock getBaseBlock() {
- return baseBlock;
- }
-
- public int[] getPointerBlock() {
- return pointerBlock;
- }
-
- private void ensureMemory() {
- if (actualSize >= length) {
- // Expand by quarter, may be we can correct the logic later
- int localLength = length + (int) (length * (0.25));
- int[] memoryAddress = new int[localLength];
- System.arraycopy(pointerBlock, 0, memoryAddress, 0, length);
- pointerBlock = memoryAddress;
- length = localLength;
- }
- }
-
- public void freeMemory() {
- pointerBlock = null;
- if (pointerMemoryBlock != null) {
- UnsafeMemoryManager.INSTANCE.freeMemory(pointerMemoryBlock);
- }
- if (baseBlock != null) {
- UnsafeMemoryManager.INSTANCE.freeMemory(baseBlock);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index f2877ae..2ac138b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -23,6 +23,7 @@ import java.math.BigDecimal;
import java.util.Arrays;
import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.DataTypeUtil;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
deleted file mode 100644
index af49978..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.memory.MemoryAllocator;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-/**
- * Manages memory for instance.
- */
-public class UnsafeMemoryManager {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName());
-
- static {
- long size;
- try {
- size = Long.parseLong(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
- CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
- } catch (Exception e) {
- size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
- LOGGER.info("Wrong memory size given, "
- + "so setting default value to " + size);
- }
- if (size < 1024) {
- size = 1024;
- LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
- + "so setting default value to " + size);
- }
-
-
- boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
- CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
- long takenSize = size * 1024 * 1024;
- MemoryAllocator allocator;
- if (offHeap) {
- allocator = MemoryAllocator.UNSAFE;
- } else {
- long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
- if (takenSize > maxMemory) {
- takenSize = maxMemory;
- }
- allocator = MemoryAllocator.HEAP;
- }
- INSTANCE = new UnsafeMemoryManager(takenSize, allocator);
- }
-
- public static final UnsafeMemoryManager INSTANCE;
-
- private long totalMemory;
-
- private long memoryUsed;
-
- private MemoryAllocator allocator;
-
- private long minimumMemory;
-
- private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
- this.totalMemory = totalMemory;
- this.allocator = allocator;
- long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
- long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
- sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
- long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
- if (totalWorkingMemoryForAllThreads >= totalMemory) {
- throw new RuntimeException("Working memory should be less than total memory configured, "
- + "so either reduce the loading threads or increase the memory size. "
- + "(Number of threads * number of threads) should be less than total unsafe memory");
- }
- minimumMemory = totalWorkingMemoryForAllThreads;
- LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
- + " and minimum reserve memory " + minimumMemory);
- }
- public synchronized MemoryBlock allocateMemory(long memoryRequested) {
- if (memoryUsed + memoryRequested <= totalMemory) {
- MemoryBlock allocate = allocator.allocate(memoryRequested);
- memoryUsed += allocate.size();
- LOGGER.info("Memory block is created with size " + allocate.size() +
- " Total memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
- return allocate;
- }
- return null;
- }
-
- public synchronized void freeMemory(MemoryBlock memoryBlock) {
- allocator.free(memoryBlock);
- memoryUsed -= memoryBlock.size();
- memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
- LOGGER.info(
- "Memory released, memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
- }
-
- public synchronized long getAvailableMemory() {
- return totalMemory - memoryUsed;
- }
-
- public boolean isMemoryAvailable() {
- return getAvailableMemory() > minimumMemory;
- }
-
- public long getUsableMemory() {
- return totalMemory - minimumMemory;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 074bb3b..a42d0ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -31,7 +31,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
@@ -111,8 +114,8 @@ public class UnsafeSortDataRows {
/**
* This method will be used to initialize
*/
- public void initialize() throws CarbonSortKeyAndGroupByException {
- MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSize);
+ public void initialize() throws MemoryException {
+ MemoryBlock baseBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
parameters.getNoDictionarySortColumn(),
parameters.getDimColCount() + parameters.getComplexDimColCount(),
@@ -130,28 +133,6 @@ public class UnsafeSortDataRows {
semaphore = new Semaphore(parameters.getNumberOfCores());
}
- public static MemoryBlock getMemoryBlock(long size) throws CarbonSortKeyAndGroupByException {
- MemoryBlock baseBlock = null;
- int tries = 0;
- while (tries < 100) {
- baseBlock = UnsafeMemoryManager.INSTANCE.allocateMemory(size);
- if (baseBlock == null) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- throw new CarbonSortKeyAndGroupByException(e);
- }
- } else {
- break;
- }
- tries++;
- }
- if (baseBlock == null) {
- throw new CarbonSortKeyAndGroupByException("Not enough memory to create page");
- }
- return baseBlock;
- }
-
public boolean canAdd() {
return bytesAdded < maxSizeAllowed;
}
@@ -196,7 +177,7 @@ public class UnsafeSortDataRows {
unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
semaphore.acquire();
dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+ MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
rowPage = new UnsafeCarbonRowPage(
parameters.getNoDictionaryDimnesionColumn(),
@@ -233,7 +214,7 @@ public class UnsafeSortDataRows {
unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
semaphore.acquire();
dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
- MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+ MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
rowPage = new UnsafeCarbonRowPage(
parameters.getNoDictionaryDimnesionColumn(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java
index b63bdfc..e7fec26 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java
@@ -16,7 +16,7 @@
*/
package org.apache.carbondata.processing.newflow.sort.unsafe.sort;
-import org.apache.carbondata.processing.newflow.sort.unsafe.IntPointerBuffer;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 91c2bc4..be86808 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -53,7 +54,6 @@ import org.apache.carbondata.processing.store.file.FileManager;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.Encoder;
/**
* Fact data handler class to handle the fact data
@@ -144,7 +144,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private ColumnarFormatVersion version;
- private DefaultEncoder encoder;
+ private TablePageEncoder encoder;
private SortScopeOptions.SortScope sortScope;
@@ -206,7 +206,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
rleEncodingForDictDimension = arrangeUniqueBlockType(rleEncodingForDictDimension);
}
this.version = CarbonProperties.getInstance().getFormatVersion();
- this.encoder = new DefaultEncoder(model);
+ this.encoder = new TablePageEncoder(model);
}
private void initParameters(CarbonFactDataHandlerModel model) {
@@ -358,8 +358,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
rowId++;
}
- // encode and compress dimensions and measure
- Encoder.EncodedData encodedData = encoder.encode(tablePage);
+ // apply and compress dimensions and measure
+ EncodedData encodedData = encoder.encode(tablePage);
TablePageStatistics tablePageStatistics = new TablePageStatistics(
model.getTableSpec(), tablePage, encodedData, tablePage.getMeasureStats());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
deleted file mode 100644
index 73c4fa1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.carbondata.core.compression.ValueCompressor;
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.ColGroupBlockStorage;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CompressionFinder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.processing.store.writer.Encoder;
-
-// Default encoder for encoding dimension and measures. For dimensions, it applies RLE and
-// inverted index encoding. For measures, it applies delta encoding or adaptive encoding
-public class DefaultEncoder implements Encoder {
-
- private ColumnarFormatVersion version;
-
- private boolean[] isUseInvertedIndex;
-
- private CarbonFactDataHandlerModel model;
-
- public DefaultEncoder(CarbonFactDataHandlerModel model) {
- this.version = CarbonProperties.getInstance().getFormatVersion();
- this.model = model;
- this.isUseInvertedIndex = model.getIsUseInvertedIndex();
- }
-
- // function to encode all columns in one table page
- public Encoder.EncodedData encode(TablePage tablePage) {
- Encoder.EncodedData encodedData = new Encoder.EncodedData();
- encodeAndCompressDimensions(tablePage, encodedData);
- encodeAndCompressMeasures(tablePage, encodedData);
- return encodedData;
- }
-
- // encode measure and set encodedData in `encodedData`
- private void encodeAndCompressMeasures(TablePage tablePage, Encoder.EncodedData encodedData) {
- // TODO: following conversion is required only because compress model requires them,
- // remove then after the compress framework is refactoried
- ColumnPage[] measurePage = tablePage.getMeasurePage();
- int measureCount = measurePage.length;
- byte[] dataTypeSelected = new byte[measureCount];
- CompressionFinder[] finders = new CompressionFinder[measureCount];
- for (int i = 0; i < measureCount; i++) {
- ColumnPageStatistics stats = measurePage[i].getStatistics();
- finders[i] = ValueCompressionUtil.getCompressionFinder(
- stats.getMax(),
- stats.getMin(),
- stats.getDecimal(),
- measurePage[i].getDataType(), dataTypeSelected[i]);
- }
-
- //CompressionFinder[] finders = compressionModel.getCompressionFinders();
- ValueCompressionHolder[] holders = ValueCompressionUtil.getValueCompressionHolder(finders);
- encodedData.measures = encodeMeasure(holders, finders, measurePage);
- }
-
- // this method first invokes encoding routine to encode the data chunk,
- // followed by invoking compression routine for preparing the data chunk for writing.
- private byte[][] encodeMeasure(ValueCompressionHolder[] holders,
- CompressionFinder[] finders,
- ColumnPage[] columnPages) {
- ValueCompressionHolder[] values = new ValueCompressionHolder[columnPages.length];
- byte[][] encodedMeasures = new byte[values.length][];
- for (int i = 0; i < columnPages.length; i++) {
- values[i] = holders[i];
- if (columnPages[i].getDataType() != DataType.DECIMAL) {
- ValueCompressor compressor =
- ValueCompressionUtil.getValueCompressor(finders[i]);
- Object compressed = compressor.getCompressedValues(
- finders[i],
- columnPages[i],
- columnPages[i].getStatistics().getMax(),
- columnPages[i].getStatistics().getDecimal());
- values[i].setValue(compressed);
- } else {
- // in case of decimal, 'flatten' the byte[][] to byte[]
- byte[][] decimalPage = columnPages[i].getDecimalPage();
- int totalSize = 0;
- for (byte[] decimal : decimalPage) {
- totalSize += decimal.length;
- }
- ByteBuffer temp = ByteBuffer.allocate(totalSize);
- for (byte[] decimal : decimalPage) {
- temp.put(decimal);
- }
- values[i].setValue(temp.array());
- }
- values[i].compress();
- encodedMeasures[i] = values[i].getCompressedData();
- }
-
- return encodedMeasures;
- }
-
- private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
- boolean isUseInvertedIndex) {
- if (isUseInvertedIndex) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(data, true, false, isSort);
- } else {
- return new BlockIndexerStorageForInt(data, true, false, isSort);
- }
- } else {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
- } else {
- return new BlockIndexerStorageForNoInvertedIndex(data);
- }
- }
- }
-
- private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
- boolean isUseInvertedIndex) {
- if (isUseInvertedIndex) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(data, false, false, isSort);
- } else {
- return new BlockIndexerStorageForInt(data, false, false, isSort);
- }
- } else {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
- } else {
- return new BlockIndexerStorageForNoInvertedIndex(data);
- }
- }
- }
-
- private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(data, false, false, false);
- } else {
- return new BlockIndexerStorageForInt(data, false, false, false);
- }
- }
-
- private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
- boolean isUseInvertedIndex) {
- if (isUseInvertedIndex) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(data, false, true, isSort);
- } else {
- return new BlockIndexerStorageForInt(data, false, true, isSort);
- }
- } else {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
- } else {
- return new BlockIndexerStorageForNoInvertedIndex(data);
- }
- }
- }
-
- // encode and compress each dimension, set encoded data in `encodedData`
- private void encodeAndCompressDimensions(TablePage tablePage, Encoder.EncodedData encodedData) {
- TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
- int dictionaryColumnCount = -1;
- int noDictionaryColumnCount = -1;
- int colGrpId = -1;
- int indexStorageOffset = 0;
- IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
- SegmentProperties segmentProperties = model.getSegmentProperties();
- Compressor compressor = CompressorFactory.getInstance().getCompressor();
- byte[][] compressedColumns = new byte[indexStorages.length][];
- for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
- byte[] flattened;
- boolean isSortColumn = model.isSortColumn(i);
- switch (dimensionSpec.getType(i)) {
- case GLOBAL_DICTIONARY:
- // dictionary dimension
- indexStorages[indexStorageOffset] =
- encodeAndCompressDictDimension(
- tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount),
- isSortColumn,
- isUseInvertedIndex[i] & isSortColumn);
- flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
- break;
- case DIRECT_DICTIONARY:
- // timestamp and date column
- indexStorages[indexStorageOffset] =
- encodeAndCompressDirectDictDimension(
- tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount),
- isSortColumn,
- isUseInvertedIndex[i] & isSortColumn);
- flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
- break;
- case PLAIN_VALUE:
- // high cardinality dimension, encoded as plain string
- indexStorages[indexStorageOffset] =
- encodeAndCompressNoDictDimension(
- tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getStringPage(),
- isSortColumn,
- isUseInvertedIndex[i] & isSortColumn);
- flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
- break;
- case COLUMN_GROUP:
- // column group
- indexStorages[indexStorageOffset] =
- new ColGroupBlockStorage(
- segmentProperties,
- ++colGrpId,
- tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount));
- flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
- break;
- case COMPLEX:
- // we need to add complex column at last, so skipping it here
- continue;
- default:
- throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
- }
- compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
- indexStorageOffset++;
- }
-
- // handle complex type column
- for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
- Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator();
- while (iterator.hasNext()) {
- byte[][] data = iterator.next();
- indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
- byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
- compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
- indexStorageOffset++;
- }
- }
-
- encodedData.indexStorages = indexStorages;
- encodedData.dimensions = compressedColumns;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index fb7ebfb..65504cd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.KeyColumnPage;
import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
@@ -50,7 +49,7 @@ public class TablePage {
// TODO: we should have separate class for key columns so that keys are stored together in
// one vector to make it efficient for sorting
- private KeyColumnPage keyColumnPage;
+ private ColumnPage[] dictDimensionPage;
private ColumnPage[] noDictDimensionPage;
private ComplexColumnPage[] complexDimensionPage;
private ColumnPage[] measurePage;
@@ -62,14 +61,17 @@ public class TablePage {
private CarbonFactDataHandlerModel model;
- TablePage(CarbonFactDataHandlerModel model, int pageSize) {
+ public TablePage(CarbonFactDataHandlerModel model, int pageSize) {
this.model = model;
this.pageSize = pageSize;
- keyColumnPage = new KeyColumnPage(pageSize,
- model.getSegmentProperties().getDimensionPartitions().length);
+ int numDictDimension = model.getMDKeyGenerator().getDimCount();
+ dictDimensionPage = new ColumnPage[numDictDimension];
+ for (int i = 0; i < dictDimensionPage.length; i++) {
+ dictDimensionPage[i] = ColumnPage.newPage(DataType.BYTE_ARRAY, pageSize);
+ }
noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()];
for (int i = 0; i < noDictDimensionPage.length; i++) {
- noDictDimensionPage[i] = new ColumnPage(DataType.STRING, pageSize);
+ noDictDimensionPage[i] = ColumnPage.newPage(DataType.BYTE_ARRAY, pageSize);
}
complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
for (int i = 0; i < complexDimensionPage.length; i++) {
@@ -80,7 +82,7 @@ public class TablePage {
measurePage = new ColumnPage[model.getMeasureCount()];
DataType[] dataTypes = model.getMeasureDataType();
for (int i = 0; i < measurePage.length; i++) {
- measurePage[i] = new ColumnPage(dataTypes[i], pageSize);
+ measurePage[i] = ColumnPage.newPage(dataTypes[i], pageSize);
}
}
@@ -90,13 +92,15 @@ public class TablePage {
* @param rowId Id of the input row
* @param row row object
*/
- void addRow(int rowId, CarbonRow row) throws KeyGenException {
+ public void addRow(int rowId, CarbonRow row) throws KeyGenException {
// convert each column category
// 1. convert dictionary columns
byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
byte[][] keys = model.getSegmentProperties().getFixedLengthKeySplitter().splitKey(mdk);
- keyColumnPage.putKey(rowId, keys);
+ for (int i = 0; i < dictDimensionPage.length; i++) {
+ dictDimensionPage[i].putData(rowId, keys[i]);
+ }
// 2. convert noDictionary columns and complex columns.
int noDictionaryCount = noDictDimensionPage.length;
@@ -165,7 +169,7 @@ public class TablePage {
encodedComplexColumnar.add(new ArrayList<byte[]>());
}
- // encode the complex type data and fill columnsArray
+ // apply the complex type data and fill columnsArray
try {
ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
@@ -195,8 +199,8 @@ public class TablePage {
return output;
}
- public KeyColumnPage getKeyColumnPage() {
- return keyColumnPage;
+ public ColumnPage[] getDictDimensionPage() {
+ return dictDimensionPage;
}
public ColumnPage[] getNoDictDimensionPage() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
new file mode 100644
index 0000000..5d460a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+public class TablePageEncoder {
+
+ private ColumnarFormatVersion version;
+
+ private boolean[] isUseInvertedIndex;
+
+ private CarbonFactDataHandlerModel model;
+
+ private static final EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+
+ public TablePageEncoder(CarbonFactDataHandlerModel model) {
+ this.version = CarbonProperties.getInstance().getFormatVersion();
+ this.model = model;
+ this.isUseInvertedIndex = model.getIsUseInvertedIndex();
+ }
+
+ // function to apply all columns in one table page
+ public EncodedData encode(TablePage tablePage) throws KeyGenException {
+ EncodedData encodedData = new EncodedData();
+ encodeAndCompressDimensions(tablePage, encodedData);
+ encodeAndCompressMeasures(tablePage, encodedData);
+ return encodedData;
+ }
+
+ // apply measure and set encodedData in `encodedData`
+ private void encodeAndCompressMeasures(TablePage tablePage, EncodedData encodedData) {
+ ColumnPage[] measurePage = tablePage.getMeasurePage();
+ byte[][] encodedMeasures = new byte[measurePage.length][];
+ for (int i = 0; i < measurePage.length; i++) {
+ ColumnPageCodec encoder = encodingStrategy.createCodec(measurePage[i].getStatistics());
+ encodedMeasures[i] = encoder.encode(measurePage[i]);
+ }
+ encodedData.measures = encodedMeasures;
+ }
+
+ private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex) throws KeyGenException {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, true, false, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, true, false, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndex(data);
+ }
+ }
+ }
+
+ private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex) throws KeyGenException {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, false, false, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, false, false, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndex(data);
+ }
+ }
+ }
+
+ private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, false, false, false);
+ } else {
+ return new BlockIndexerStorageForInt(data, false, false, false);
+ }
+ }
+
+ private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex) {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, false, true, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, false, true, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndex(data);
+ }
+ }
+ }
+
+ // apply and compress each dimension, set encoded data in `encodedData`
+ private void encodeAndCompressDimensions(TablePage tablePage, EncodedData encodedData)
+ throws KeyGenException {
+ TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
+ int dictionaryColumnCount = -1;
+ int noDictionaryColumnCount = -1;
+ int indexStorageOffset = 0;
+ IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
+ Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ byte[][] compressedColumns = new byte[indexStorages.length][];
+ for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
+ byte[] flattened;
+ boolean isSortColumn = model.isSortColumn(i);
+ switch (dimensionSpec.getType(i)) {
+ case GLOBAL_DICTIONARY:
+ // dictionary dimension
+ indexStorages[indexStorageOffset] =
+ encodeAndCompressDictDimension(
+ tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn);
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ break;
+ case DIRECT_DICTIONARY:
+ // timestamp and date column
+ indexStorages[indexStorageOffset] =
+ encodeAndCompressDirectDictDimension(
+ tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn);
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ break;
+ case PLAIN_VALUE:
+ // high cardinality dimension, encoded as plain string
+ indexStorages[indexStorageOffset] =
+ encodeAndCompressNoDictDimension(
+ tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn);
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ break;
+ case COMPLEX:
+ // we need to add complex column at last, so skipping it here
+ continue;
+ default:
+ throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
+ }
+ compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
+ indexStorageOffset++;
+ }
+
+ // handle complex type column
+ for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
+ Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator();
+ while (iterator.hasNext()) {
+ byte[][] data = iterator.next();
+ indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
+ byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
+ indexStorageOffset++;
+ }
+ }
+
+ encodedData.indexStorages = indexStorages;
+ encodedData.dimensions = compressedColumns;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
index 2911936..13eaac9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
@@ -23,9 +23,9 @@ import java.util.BitSet;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
-import org.apache.carbondata.processing.store.writer.Encoder;
// Statistics of dimension and measure column in a TablePage
public class TablePageStatistics {
@@ -55,7 +55,7 @@ public class TablePageStatistics {
private TableSpec tableSpec;
TablePageStatistics(TableSpec tableSpec, TablePage tablePage,
- Encoder.EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) {
+ EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) {
this.numDimensionsExpanded = tableSpec.getDimensionSpec().getNumExpandedDimensions();
int numMeasures = tableSpec.getMeasureSpec().getNumMeasures();
this.dimensionMinValue = new byte[numDimensionsExpanded][];
@@ -69,7 +69,7 @@ public class TablePageStatistics {
updateNullBitSet(tablePage);
}
- private void updateMinMax(TablePage tablePage, Encoder.EncodedData encodedData) {
+ private void updateMinMax(TablePage tablePage, EncodedData encodedData) {
IndexStorage[] keyStorageArray = encodedData.indexStorages;
byte[][] measureArray = encodedData.measures;
@@ -89,7 +89,7 @@ public class TablePageStatistics {
}
}
for (int i = 0; i < measureArray.length; i++) {
- ColumnPageStatistics stats = tablePage.getMeasurePage()[i].getStatistics();
+ ColumnPageStatsVO stats = tablePage.getMeasurePage()[i].getStatistics();
measureMaxValue[i] = stats.minBytes();
measureMinValue[i] = stats.maxBytes();
}