You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/12/17 14:16:56 UTC
[14/21] carbondata git commit: [CARBONDATA-3017] Map DDL Support
[CARBONDATA-3017] Map DDL Support
Support Create DDL for Map type.
This closes #2980
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c9b136c2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c9b136c2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c9b136c2
Branch: refs/heads/branch-1.5
Commit: c9b136c26763baad9bb8f81aa3676217578d6ccc
Parents: 8935164
Author: manishnalla1994 <ma...@gmail.com>
Authored: Tue Oct 16 15:18:08 2018 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Mon Dec 17 18:58:34 2018 +0530
----------------------------------------------------------------------
.../apache/carbondata/core/util/CarbonUtil.java | 4 +
.../hadoop/api/CarbonTableOutputFormat.java | 17 +-
.../TestCreateDDLForComplexMapType.scala | 445 +++++++++++++++++++
.../LocalDictionarySupportLoadTableTest.scala | 17 +
.../spark/rdd/NewCarbonDataLoadRDD.scala | 10 +-
.../carbondata/spark/util/CarbonScalaUtil.scala | 1 +
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 43 +-
.../streaming/CarbonAppendableStreamSink.scala | 9 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 4 +-
.../CarbonAlterTableCompactionCommand.scala | 6 +
.../management/CarbonLoadDataCommand.scala | 6 +-
.../table/CarbonCreateTableCommand.scala | 2 +-
.../spark/util/AllDictionaryTestCase.scala | 4 +-
.../util/ExternalColumnDictionaryTestCase.scala | 4 +-
.../TestStreamingTableWithRowParser.scala | 3 +-
.../loading/ComplexDelimitersEnum.java | 39 ++
.../loading/DataLoadProcessBuilder.java | 7 +-
.../loading/model/CarbonLoadModel.java | 38 +-
.../loading/model/CarbonLoadModelBuilder.java | 11 +-
.../processing/loading/model/LoadOption.java | 17 +-
.../loading/parser/CarbonParserFactory.java | 25 +-
.../loading/parser/impl/ArrayParserImpl.java | 6 +-
.../loading/parser/impl/MapParserImpl.java | 60 +++
.../loading/parser/impl/RowParserImpl.java | 8 +-
.../sdk/file/CarbonWriterBuilder.java | 1 +
.../streaming/parser/RowStreamParserImp.scala | 2 +
26 files changed, 715 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ac52728..fc4704e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -597,6 +597,10 @@ public final class CarbonUtil {
*/
public static String delimiterConverter(String delimiter) {
switch (delimiter) {
+ case "\\001":
+ case "\\002":
+ case "\\003":
+ case "\\004":
case "|":
case "*":
case ".":
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index dbd2f0e..16486d0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
import org.apache.carbondata.processing.loading.DataLoadExecutor;
import org.apache.carbondata.processing.loading.TableProcessingOperations;
import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
@@ -338,11 +339,19 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
SKIP_EMPTY_LINE,
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)));
- String complexDelim = conf.get(COMPLEX_DELIMITERS, "\\\001" + "," + "\\\002");
+ String complexDelim = conf.get(COMPLEX_DELIMITERS);
+ if (null == complexDelim) {
+ complexDelim = ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value() + ","
+ + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value() + ","
+ + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value();
+ }
String[] split = complexDelim.split(",");
- model.setComplexDelimiterLevel1(split[0]);
- if (split.length > 1) {
- model.setComplexDelimiterLevel2(split[1]);
+ model.setComplexDelimiter(split[0]);
+ if (split.length > 2) {
+ model.setComplexDelimiter(split[1]);
+ model.setComplexDelimiter(split[2]);
+ } else if (split.length > 1) {
+ model.setComplexDelimiter(split[1]);
}
model.setDateFormat(
conf.get(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
new file mode 100644
index 0000000..941364c
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
@@ -0,0 +1,445 @@
+/*
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ *
+ http://www.apache.org/licenses/LICENSE-2.0
+ *
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.createTable.TestCreateDDLForComplexMapType
+
+import java.io.{BufferedWriter, File, FileWriter}
+import java.util
+
+import au.com.bytecode.opencsv.CSVWriter
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk
+
+import scala.collection.JavaConversions._
+
+class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
+ private val conf: Configuration = new Configuration(false)
+
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+
+ val path = s"$rootPath/integration/spark-common-test/src/test/resources/maptest2.csv"
+
+ private def checkForLocalDictionary(dimensionRawColumnChunks: util
+ .List[DimensionRawColumnChunk]): Boolean = {
+ var isLocalDictionaryGenerated = false
+ import scala.collection.JavaConversions._
+ isLocalDictionaryGenerated = dimensionRawColumnChunks
+ .filter(dimensionRawColumnChunk => dimensionRawColumnChunk.getDataChunkV3
+ .isSetLocal_dictionary).size > 0
+ isLocalDictionaryGenerated
+ }
+
+ def createCSVFile(): Unit = {
+ val out = new BufferedWriter(new FileWriter(path));
+ val writer = new CSVWriter(out);
+
+ val employee1 = Array("1\u0002Nalla\u00012\u0002Singh\u00011\u0002Gupta\u00014\u0002Kumar")
+
+ val employee2 = Array("10\u0002Nallaa\u000120\u0002Sissngh\u0001100\u0002Gusspta\u000140" +
+ "\u0002Kumar")
+
+ var listOfRecords = List(employee1, employee2)
+
+ writer.writeAll(listOfRecords)
+ out.close()
+ }
+
+ override def beforeAll(): Unit = {
+ createCSVFile()
+ sql("DROP TABLE IF EXISTS carbon")
+ }
+
+ override def afterAll(): Unit = {
+ new File(path).delete()
+ }
+
+ test("Single Map One Level") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<STRING,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon
+ | """.stripMargin).collect()
+ assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,string>"))
+ }
+
+ test("Single Map with Two Nested Level") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<STRING,map<INT,STRING>>
+ | )
+ | STORED BY
+ |'carbondata'
+ |"""
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon
+ | """.stripMargin).collect()
+ assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,map<int,string>>"))
+ }
+
+ test("Map Type with array type as value") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<STRING,array<INT>>
+ | )
+ | STORED BY 'carbondata'
+ |
+ """
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon
+ | """.stripMargin).collect()
+ assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,array<int>>"))
+ }
+
+ test("Map Type with struct type as value") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<STRING,struct<key:INT,val:INT>>
+ | )
+ | STORED BY
+ | 'carbondata'
+ | """
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon
+ | """.stripMargin).collect()
+ assert(desc(0).get(1).asInstanceOf[String].trim
+ .equals("map<string,struct<key:int,val:int>>"))
+ }
+
+ test("Map Type as child to struct type") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField struct<key:INT,val:map<INT,INT>>
+ | )
+ | STORED BY
+ |'carbondata' """
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon
+ | """.stripMargin).collect()
+ assert(desc(0).get(1).asInstanceOf[String].trim
+ .equals("struct<key:int,val:map<int,int>>"))
+ }
+
+ test("Map Type as child to array type") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField array<map<INT,INT>>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon """.stripMargin).collect()
+ assert(desc(0).get(1).asInstanceOf[String].trim.equals("array<map<int,int>>"))
+ sql("insert into carbon values('1\0032\0022\0033\001100\003200\002200\003300')")
+ sql("select * from carbon").show(false)
+ }
+
+ test("Test Load data in map") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<INT,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon
+ | """.stripMargin).collect()
+ sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
+ }
+
+ test("Test Load data in map with empty value") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<INT,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon
+ | """.stripMargin).collect()
+ sql("insert into carbon values('1\002Nalla\0012\002\0013\002Gupta\0014\002Kumar')")
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(Map(1 -> "Nalla", 2 -> "", 3 -> "Gupta", 4 -> "Kumar"))))
+ }
+
+ // Support this for Map type
+ test("Test Load data in map with dictionary include") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<int,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('DICTIONARY_INCLUDE'='mapField')
+ | """
+ .stripMargin)
+ sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta')")
+ sql("select * from carbon").show(false)
+ //checkAnswer(sql("select * from carbon"), Seq(
+ //Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
+ }
+
+ test("Test Load data in map with partition columns") {
+ sql("DROP TABLE IF EXISTS carbon")
+ val exception = intercept[AnalysisException](
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | a INT,
+ | mapField array<STRING>,
+ | b STRING
+ | )
+ | PARTITIONED BY (mp map<int,string>)
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ )
+ assertResult("Cannot use map<int,string> for partition column;")(exception.getMessage())
+ }
+
+ test("Test IUD in map columns") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | a INT,
+ | mapField map<INT,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ sql("insert into carbon values(1,'1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
+ sql("insert into carbon values(2,'1\002abc\0012\002xyz\0013\002hello\0014\002mno')")
+ val exception = intercept[UnsupportedOperationException](
+ sql("update carbon set(mapField)=('1,haha') where a=1").show(false))
+ assertResult("Unsupported operation on Complex data type")(exception.getMessage())
+ sql("delete from carbon where mapField[1]='abc'")
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(1, Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
+
+ }
+
+ test("Test Compaction blocking") {
+ sql("DROP TABLE IF EXISTS carbon")
+
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | a INT,
+ | mapField map<INT,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+
+ val exception = intercept[UnsupportedOperationException](
+ sql("ALTER table carbon compact 'minor'")
+ )
+ assertResult("Compaction is unsupported for Table containing Map Columns")(exception
+ .getMessage())
+ }
+
+ test("Test Load duplicate keys data in map") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<INT,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ val desc = sql(
+ s"""
+ | Describe Formatted
+ | carbon
+ | """.stripMargin).collect()
+ sql("insert into carbon values('1\002Nalla\0012\002Singh\0011\002Gupta\0014\002Kumar')")
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar"))))
+ }
+
+ test("Test Load data in map of map") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<STRING,map<INT,STRING>>
+ | )
+ | STORED BY
+ |'carbondata' """
+ .stripMargin)
+ sql(
+ "insert into carbon values('manish\0021\004nalla\0032\004gupta\001kunal\0021\004kapoor\0032" +
+ "\004sharma')")
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(Map("manish" -> Map(1 -> "nalla", 2 -> "gupta"),
+ "kunal" -> Map(1 -> "kapoor", 2 -> "sharma")))))
+ }
+
+ test("Test Load duplicate keys data in map of map") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<STRING,map<INT,STRING>>
+ | )
+ | STORED BY
+ |'carbondata'
+ |"""
+ .stripMargin)
+ sql(
+ "insert into carbon values('manish\0021\004nalla\0031\004gupta\001kunal\0021\004kapoor\0032" +
+ "\004sharma')")
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(Map("manish" -> Map(1 -> "nalla"),
+ "kunal" -> Map(1 -> "kapoor", 2 -> "sharma")))))
+ }
+
+ test("Test Create table as select with map") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql("DROP TABLE IF EXISTS carbon1")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<INT,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
+ sql(
+ s"""
+ | CREATE TABLE carbon1
+ | AS
+ | Select *
+ | From carbon
+ | """
+ .stripMargin)
+ checkAnswer(sql("select * from carbon1"), Seq(
+ Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
+ }
+
+ test("Test Create table with double datatype in map") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<DOUBLE,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ sql(
+ "insert into carbon values('1.23\002Nalla\0012.34\002Singh\0013.67676\002Gupta\0013.67676" +
+ "\002Kumar')")
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(Map(1.23 -> "Nalla", 2.34 -> "Singh", 3.67676 -> "Gupta"))))
+ }
+
+ test("Load Map data from CSV File") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<INT,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE carbon OPTIONS(
+ | 'header' = 'false')
+ """.stripMargin)
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
+ Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar"))
+ ))
+ }
+
+ test("Sort Column table property blocking for Map type") {
+ sql("DROP TABLE IF EXISTS carbon")
+ val exception1 = intercept[Exception] {
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<STRING,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='mapField')
+ | """
+ .stripMargin)
+ }
+ assert(exception1.getMessage
+ .contains(
+ "sort_columns is unsupported for map datatype column: mapfield"))
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
index d23c844..d332a5a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
@@ -21,6 +21,7 @@ import java.io.{File, PrintWriter}
import java.util
import java.util.Collections
+import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, Ignore}
@@ -136,6 +137,22 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA
assert(checkForLocalDictionary(getDimRawChunk(2)))
}
+ test("test local dictionary generation for map type") {
+ sql("drop table if exists local2")
+ sql(
+ "CREATE TABLE local2(name map<string,string>) STORED BY 'carbondata' tblproperties" +
+ "('local_dictionary_enable'='true','local_dictionary_include'='name')")
+ sql(
+ "insert into local2 values('Manish\002Nalla\001Manish\002Gupta\001Shardul\002Singh" +
+ "\001Vishal\002Kumar')")
+ checkAnswer(sql("select * from local2"), Seq(
+ Row(Map("Manish" -> "Nalla", "Shardul" -> "Singh", "Vishal" -> "Kumar"))))
+ assert(!checkForLocalDictionary(getDimRawChunk(0)))
+ assert(!checkForLocalDictionary(getDimRawChunk(1)))
+ assert(checkForLocalDictionary(getDimRawChunk(2)))
+ assert(checkForLocalDictionary(getDimRawChunk(3)))
+ }
+
test("test local dictionary data validation") {
sql("drop table if exists local_query_enable")
sql("drop table if exists local_query_disable")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 3752eef..f7249b8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -339,8 +339,9 @@ class NewRddIterator(rddIter: Iterator[Row],
private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
- private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
+ private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
+ private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
import scala.collection.JavaConverters._
@@ -388,8 +389,9 @@ class LazyRddIterator(serializer: SerializerInstance,
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
- private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
+ private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
+ private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
// the order of fields in dataframe and createTable may be different, here we need to know whether
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 71447e9..ca9b4af 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -643,6 +643,7 @@ object CarbonScalaUtil {
!x.dataType.get.equalsIgnoreCase("STRING") &&
!x.dataType.get.equalsIgnoreCase("VARCHAR") &&
!x.dataType.get.equalsIgnoreCase("STRUCT") &&
+ !x.dataType.get.equalsIgnoreCase("MAP") &&
!x.dataType.get.equalsIgnoreCase("ARRAY"))) {
val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " +
dictColm.trim +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3ac2d2b..c971573 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -114,6 +114,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val MULTILINE = carbonKeyWord("MULTILINE")
protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
+ protected val COMPLEX_DELIMITER_LEVEL_3 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_3")
protected val OPTIONS = carbonKeyWord("OPTIONS")
protected val OUTPATH = carbonKeyWord("OUTPATH")
protected val OVERWRITE = carbonKeyWord("OVERWRITE")
@@ -915,7 +916,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
* @param dimensionDatatype
*/
def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
- val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
+ val dimensionType = Array("string", "array", "struct", "map", "timestamp", "date", "char")
dimensionType.exists(x => dimensionDatatype.toLowerCase.contains(x))
}
@@ -1070,13 +1071,32 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
// validate with all supported options
val options = optionList.get.groupBy(x => x._1)
- val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
- "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
- "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
- "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH",
- "BATCH_SORT_SIZE_INMB", "GLOBAL_SORT_PARTITIONS", "SINGLE_PASS",
- "IS_EMPTY_DATA_BAD_RECORD", "HEADER", "TIMESTAMPFORMAT", "SKIP_EMPTY_LINE",
- "SORT_COLUMN_BOUNDS", "LOAD_MIN_SIZE_INMB"
+ val supportedOptions = Seq("DELIMITER",
+ "QUOTECHAR",
+ "FILEHEADER",
+ "ESCAPECHAR",
+ "MULTILINE",
+ "COMPLEX_DELIMITER_LEVEL_1",
+ "COMPLEX_DELIMITER_LEVEL_2",
+ "COMPLEX_DELIMITER_LEVEL_3",
+ "COLUMNDICT",
+ "SERIALIZATION_NULL_FORMAT",
+ "BAD_RECORDS_LOGGER_ENABLE",
+ "BAD_RECORDS_ACTION",
+ "ALL_DICTIONARY_PATH",
+ "MAXCOLUMNS",
+ "COMMENTCHAR",
+ "DATEFORMAT",
+ "BAD_RECORD_PATH",
+ "BATCH_SORT_SIZE_INMB",
+ "GLOBAL_SORT_PARTITIONS",
+ "SINGLE_PASS",
+ "IS_EMPTY_DATA_BAD_RECORD",
+ "HEADER",
+ "TIMESTAMPFORMAT",
+ "SKIP_EMPTY_LINE",
+ "SORT_COLUMN_BOUNDS",
+ "LOAD_MIN_SIZE_INMB"
)
var isSupported = true
val invalidOptions = StringBuilder.newBuilder
@@ -1291,13 +1311,16 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
Field("unknown", Some("struct"), Some("unknown"), Some(e1))
}
+ // Map<Key,Value> is represented as Map<Struct<Key,Value>>
protected lazy val mapFieldType: Parser[Field] =
(MAP ^^^ "map") ~> "<" ~> primitiveFieldType ~ ("," ~> nestedType) <~ ">" ^^ {
case key ~ value =>
Field("unknown", Some("map"), Some("unknown"),
Some(List(
- Field("key", key.dataType, Some("key"), key.children),
- Field("value", value.dataType, Some("value"), value.children))))
+ Field("val", Some("struct"), Some("unknown"),
+ Some(List(
+ Field("key", key.dataType, Some("key"), key.children),
+ Field("value", value.dataType, Some("value"), value.children)))))))
}
protected lazy val measureCol: Parser[Field] =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 3d8170e..184cc1d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming
+import java.util
import java.util.Date
import scala.collection.JavaConverters._
@@ -77,10 +78,10 @@ class CarbonAppendableStreamSink(
conf.set(entry._1, entry._2)
}
// properties below will be used for default CarbonStreamParser
- conf.set("carbon_complex_delimiter_level_1",
- carbonLoadModel.getComplexDelimiterLevel1)
- conf.set("carbon_complex_delimiter_level_2",
- carbonLoadModel.getComplexDelimiterLevel2)
+ val complexDelimiters = carbonLoadModel.getComplexDelimiters
+ conf.set("carbon_complex_delimiter_level_1", complexDelimiters.get(0))
+ conf.set("carbon_complex_delimiter_level_2", complexDelimiters.get(1))
+ conf.set("carbon_complex_delimiter_level_3", complexDelimiters.get(2))
conf.set(
DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
carbonLoadModel.getSerializationNullFormat().split(",")(1))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b07be72..a849c99 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -989,8 +989,8 @@ object CarbonDataRDDFactory {
// generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
// input data from DataFrame
- val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
dataFrame.get.rdd.map { row =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index f98c0cf..6edfcf4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
+import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -87,6 +88,11 @@ case class CarbonAlterTableCompactionCommand(
if (!table.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
+ if (table.getTableInfo.getFactTable.getListOfColumns.asScala
+ .exists((m => DataTypes.isMapType(m.getDataType)))) {
+ throw new UnsupportedOperationException(
+ "Compaction is unsupported for Table containing Map Columns")
+ }
if (CarbonUtil.hasAggregationDataMap(table) ||
(table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
// If the compaction request is of 'streaming' type then we need to generate loadCommands
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 6a8a9cf..7a974e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -69,7 +69,7 @@ import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
-import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.{ComplexDelimitersEnum, TableProcessingOperations}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
@@ -188,11 +188,13 @@ case class CarbonLoadDataCommand(
val carbonLoadModel = new CarbonLoadModel()
val tableProperties = table.getTableInfo.getFactTable.getTableProperties
val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
+ optionsFinal
+ .put("complex_delimiter_level_4",
+ ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value())
optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
-
optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
val factPath = if (dataFrame.isDefined) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index ca39931..713561b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -128,7 +128,7 @@ case class CarbonCreateTableCommand(
if (partitionInfo != null &&
partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
// Restrict dictionary encoding on partition columns.
- // TODO Need to decide wherher it is required
+ // TODO Need to decide whether it is required
val dictionaryOnPartitionColumn =
partitionInfo.getColumnSchemaList.asScala.exists{p =>
p.hasEncoding(Encoding.DICTIONARY) && !p.hasEncoding(Encoding.DIRECT_DICTIONARY)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 08c149b..ed5486b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -55,8 +55,8 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
carbonLoadModel.setFactFilePath(filePath)
carbonLoadModel.setCsvHeader(header)
carbonLoadModel.setCsvDelimiter(",")
- carbonLoadModel.setComplexDelimiterLevel1("$")
- carbonLoadModel.setComplexDelimiterLevel2(":")
+ carbonLoadModel.setComplexDelimiter("$")
+ carbonLoadModel.setComplexDelimiter(":")
carbonLoadModel.setAllDictPath(allDictFilePath)
carbonLoadModel.setSerializationNullFormat(
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 060afca..69248d6 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -165,8 +165,8 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
carbonLoadModel.setFactFilePath(filePath)
carbonLoadModel.setCsvHeader(header)
carbonLoadModel.setCsvDelimiter(csvDelimiter)
- carbonLoadModel.setComplexDelimiterLevel1("$")
- carbonLoadModel.setComplexDelimiterLevel2(":")
+ carbonLoadModel.setComplexDelimiter("$")
+ carbonLoadModel.setComplexDelimiter(":")
carbonLoadModel.setColDictFilePath(extColFilePath)
carbonLoadModel.setQuoteChar("\"");
carbonLoadModel.setSerializationNullFormat(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 985b9d9..21cad07 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -42,7 +42,6 @@ case class StreamData(id: Integer, name: String, city: String, salary: java.lang
register: String, updated: String,
file: FileElement)
-@Ignore
class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
private val spark = sqlContext.sparkSession
@@ -420,7 +419,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
continueSeconds = 20,
generateBadRecords = true,
badRecordAction = "force",
- autoHandoff = true
+ autoHandoff = false
)
// non-filter
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java b/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java
new file mode 100644
index 0000000..bc196e1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+public enum ComplexDelimitersEnum {
+
+ COMPLEX_DELIMITERS_LEVEL_1("\001"),
+
+ COMPLEX_DELIMITERS_LEVEL_2("\002"),
+
+ COMPLEX_DELIMITERS_LEVEL_3("\003"),
+
+ COMPLEX_DELIMITERS_LEVEL_4("\004");
+
+ private String value;
+
+ ComplexDelimitersEnum(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 89d09fe..6fe89a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -221,9 +221,10 @@ public final class DataLoadProcessBuilder {
configuration.setHeader(loadModel.getCsvHeaderColumns());
configuration.setSegmentId(loadModel.getSegmentId());
configuration.setTaskNo(loadModel.getTaskNo());
- configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
- new String[] { loadModel.getComplexDelimiterLevel1(),
- loadModel.getComplexDelimiterLevel2() });
+ String[] complexDelimiters = new String[loadModel.getComplexDelimiters().size()];
+ loadModel.getComplexDelimiters().toArray(complexDelimiters);
+ configuration
+ .setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS, complexDelimiters);
configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
loadModel.getSerializationNullFormat().split(",")[1]);
configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index e15fb5d..aecc52e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -65,8 +65,7 @@ public class CarbonLoadModel implements Serializable {
private String csvHeader;
private String[] csvHeaderColumns;
private String csvDelimiter;
- private String complexDelimiterLevel1;
- private String complexDelimiterLevel2;
+ private ArrayList<String> complexDelimiters;
private List<LoadMetadataDetails> loadMetadataDetails;
private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
@@ -276,20 +275,14 @@ public class CarbonLoadModel implements Serializable {
this.csvDelimiter = csvDelimiter;
}
- public String getComplexDelimiterLevel1() {
- return complexDelimiterLevel1;
+ public void setComplexDelimiter(String delimiter) {
+ checkAndInitializeComplexDelimiterList();
+ this.complexDelimiters.add(delimiter);
}
- public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
- this.complexDelimiterLevel1 = complexDelimiterLevel1;
- }
-
- public String getComplexDelimiterLevel2() {
- return complexDelimiterLevel2;
- }
-
- public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
- this.complexDelimiterLevel2 = complexDelimiterLevel2;
+ public ArrayList<String> getComplexDelimiters() {
+ checkAndInitializeComplexDelimiterList();
+ return complexDelimiters;
}
public String getAllDictPath() {
@@ -441,8 +434,7 @@ public class CarbonLoadModel implements Serializable {
copy.csvHeader = csvHeader;
copy.csvHeaderColumns = csvHeaderColumns;
copy.csvDelimiter = csvDelimiter;
- copy.complexDelimiterLevel1 = complexDelimiterLevel1;
- copy.complexDelimiterLevel2 = complexDelimiterLevel2;
+ copy.complexDelimiters = complexDelimiters;
copy.carbonDataLoadSchema = carbonDataLoadSchema;
copy.blocksID = blocksID;
copy.taskNo = taskNo;
@@ -500,8 +492,7 @@ public class CarbonLoadModel implements Serializable {
copyObj.csvHeader = header;
copyObj.csvHeaderColumns = csvHeaderColumns;
copyObj.csvDelimiter = delimiter;
- copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
- copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
+ copyObj.complexDelimiters = complexDelimiters;
copyObj.blocksID = blocksID;
copyObj.taskNo = taskNo;
copyObj.factTimeStamp = factTimeStamp;
@@ -631,7 +622,16 @@ public class CarbonLoadModel implements Serializable {
}
public String[] getDelimiters() {
- return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
+ checkAndInitializeComplexDelimiterList();
+ String[] delimiters = new String[complexDelimiters.size()];
+ delimiters = complexDelimiters.toArray(delimiters);
+ return delimiters;
+ }
+
+ private void checkAndInitializeComplexDelimiterList() {
+ if (null == complexDelimiters) {
+ complexDelimiters = new ArrayList<>();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 4a29304..d02348d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -164,6 +164,8 @@ public class CarbonLoadModelBuilder {
String delimeter = optionsFinal.get("delimiter");
String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1");
String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2");
+ String complex_delimeter_level3 = optionsFinal.get("complex_delimiter_level_3");
+ String complex_delimeter_level4 = optionsFinal.get("complex_delimiter_level_4");
String all_dictionary_path = optionsFinal.get("all_dictionary_path");
String column_dict = optionsFinal.get("columndict");
validateDateTimeFormat(timestampformat, "TimestampFormat");
@@ -257,11 +259,14 @@ public class CarbonLoadModelBuilder {
if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
- delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+ delimeter.equalsIgnoreCase(complex_delimeter_level2) ||
+ delimeter.equalsIgnoreCase(complex_delimeter_level3)) {
throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
} else {
- carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1);
- carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2);
+ carbonLoadModel.setComplexDelimiter(complex_delimeter_level1);
+ carbonLoadModel.setComplexDelimiter(complex_delimeter_level2);
+ carbonLoadModel.setComplexDelimiter(complex_delimeter_level3);
+ carbonLoadModel.setComplexDelimiter(complex_delimeter_level4);
}
// set local dictionary path, and dictionary file extension
carbonLoadModel.setAllDictPath(all_dictionary_path);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 2c5fa8b..759cf04 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.carbondata.processing.util.CarbonLoaderUtil;
@@ -111,13 +112,17 @@ public class LoadOption {
"all_dictionary_path",
Maps.getOrDefault(options, "all_dictionary_path", ""));
- optionsFinal.put(
- "complex_delimiter_level_1",
- Maps.getOrDefault(options,"complex_delimiter_level_1", "\\\001"));
+ optionsFinal.put("complex_delimiter_level_1",
+ Maps.getOrDefault(options, "complex_delimiter_level_1",
+ ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value()));
- optionsFinal.put(
- "complex_delimiter_level_2",
- Maps.getOrDefault(options, "complex_delimiter_level_2", "\\\002"));
+ optionsFinal.put("complex_delimiter_level_2",
+ Maps.getOrDefault(options, "complex_delimiter_level_2",
+ ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value()));
+
+ optionsFinal.put("complex_delimiter_level_3",
+ Maps.getOrDefault(options, "complex_delimiter_level_3",
+ ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value()));
optionsFinal.put(
"dateformat",
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
index 3964869..6ffea4f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.processing.loading.parser;
+import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -23,6 +24,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.processing.loading.parser.impl.ArrayParserImpl;
+import org.apache.carbondata.processing.loading.parser.impl.MapParserImpl;
import org.apache.carbondata.processing.loading.parser.impl.PrimitiveParserImpl;
import org.apache.carbondata.processing.loading.parser.impl.StructParserImpl;
@@ -35,7 +37,8 @@ public final class CarbonParserFactory {
* @param complexDelimiters
* @return
*/
- public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
+ public static GenericParser createParser(CarbonColumn carbonColumn,
+ ArrayList<String> complexDelimiters,
String nullFormat) {
return createParser(carbonColumn, complexDelimiters, nullFormat, 0);
}
@@ -51,23 +54,33 @@ public final class CarbonParserFactory {
* delimiters
* @return GenericParser
*/
- private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
- String nullFormat, int depth) {
+ private static GenericParser createParser(CarbonColumn carbonColumn,
+ ArrayList<String> complexDelimiters, String nullFormat, int depth) {
DataType dataType = carbonColumn.getDataType();
- if (DataTypes.isArrayType(dataType) || DataTypes.isMapType(dataType)) {
+ if (DataTypes.isArrayType(dataType)) {
List<CarbonDimension> listOfChildDimensions =
((CarbonDimension) carbonColumn).getListOfChildDimensions();
// Create array parser with complex delimiter
- ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
+ ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters.get(depth), nullFormat);
for (CarbonDimension dimension : listOfChildDimensions) {
arrayParser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
}
return arrayParser;
+ } else if (DataTypes.isMapType(dataType)) {
+ List<CarbonDimension> listOfChildDimensions =
+ ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ // Create map parser with complex delimiter and key-value delimiter
+ MapParserImpl mapParser = new MapParserImpl(complexDelimiters.get(depth), nullFormat,
+ complexDelimiters.get(depth + 1));
+ for (CarbonDimension dimension : listOfChildDimensions) {
+ mapParser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
+ }
+ return mapParser;
} else if (DataTypes.isStructType(dataType)) {
List<CarbonDimension> dimensions =
((CarbonDimension) carbonColumn).getListOfChildDimensions();
// Create struct parser with complex delimiter
- StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
+ StructParserImpl parser = new StructParserImpl(complexDelimiters.get(depth), nullFormat);
for (CarbonDimension dimension : dimensions) {
parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
index c56691a..c27f0fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
@@ -32,11 +32,11 @@ import org.apache.commons.lang.ArrayUtils;
*/
public class ArrayParserImpl implements ComplexParser<ArrayObject> {
- private Pattern pattern;
+ protected Pattern pattern;
- private GenericParser child;
+ protected GenericParser child;
- private String nullFormat;
+ protected String nullFormat;
public ArrayParserImpl(String delimiter, String nullFormat) {
pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
new file mode 100644
index 0000000..e6814f8
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser.impl;
+
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
+
+import org.apache.commons.lang.ArrayUtils;
+
+
+public class MapParserImpl extends ArrayParserImpl {
+
+ private String keyValueDelimiter;
+
+ public MapParserImpl(String delimiter, String nullFormat, String keyValueDelimiter) {
+ super(delimiter, nullFormat);
+ this.keyValueDelimiter = keyValueDelimiter;
+ }
+
+ //The Key for Map will always be a PRIMITIVE type so Set<Object> here will work fine
+ //Only the first occurance of key will be added and the remaining will be skipped/ignored
+ @Override public ArrayObject parse(Object data) {
+ if (data != null) {
+ String value = data.toString();
+ if (!value.isEmpty() && !value.equals(nullFormat)) {
+ String[] split = pattern.split(value, -1);
+ if (ArrayUtils.isNotEmpty(split)) {
+ ArrayList<Object> array = new ArrayList<>();
+ Set<Object> set = new HashSet<>();
+ for (int i = 0; i < split.length; i++) {
+ Object currKey = split[i].split(keyValueDelimiter)[0];
+ if (set.add(currKey)) {
+ array.add(child.parse(split[i]));
+ }
+ }
+ return new ArrayObject(array.toArray());
+ }
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 00d8420..d0fe30b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.carbondata.processing.loading.parser.impl;
+import java.util.ArrayList;
+
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
@@ -34,8 +36,12 @@ public class RowParserImpl implements RowParser {
private int numberOfColumns;
public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configuration) {
- String[] complexDelimiters =
+ String[] tempComplexDelimiters =
(String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
+ ArrayList<String> complexDelimiters = new ArrayList<>();
+ for (int i = 0; i < tempComplexDelimiters.length; i++) {
+ complexDelimiters.add(tempComplexDelimiters[i]);
+ }
String nullFormat =
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
.toString();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index c9adcdf..1241504 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -184,6 +184,7 @@ public class CarbonWriterBuilder {
!option.equalsIgnoreCase("timestampformat") &&
!option.equalsIgnoreCase("complex_delimiter_level_1") &&
!option.equalsIgnoreCase("complex_delimiter_level_2") &&
+ !option.equalsIgnoreCase("complex_delimiter_level_3") &&
!option.equalsIgnoreCase("quotechar") &&
!option.equalsIgnoreCase("escapechar")) {
throw new IllegalArgumentException("Unsupported option:" + option
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9b136c2/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 5a888ef..4dcb3ce 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -41,6 +41,7 @@ class RowStreamParserImp extends CarbonStreamParser {
var dateFormat: SimpleDateFormat = null
var complexDelimiterLevel1: String = null
var complexDelimiterLevel2: String = null
+ var complexDelimiterLevel3: String = null
var serializationNullFormat: String = null
override def initialize(configuration: Configuration, structType: StructType): Unit = {
@@ -54,6 +55,7 @@ class RowStreamParserImp extends CarbonStreamParser {
this.configuration.get(CarbonCommonConstants.CARBON_DATE_FORMAT))
this.complexDelimiterLevel1 = this.configuration.get("carbon_complex_delimiter_level_1")
this.complexDelimiterLevel2 = this.configuration.get("carbon_complex_delimiter_level_2")
+ this.complexDelimiterLevel3 = this.configuration.get("carbon_complex_delimiter_level_3")
this.serializationNullFormat =
this.configuration.get(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
}