You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/17 14:30:34 UTC
[21/50] [abbrv] carbondata git commit: [CARBONDATA-2452]
[CARBONDATA-2451] [CARBONDATA-2450] [CARBONDATA-2453] Fixed issues related to
complex types
[CARBONDATA-2452] [CARBONDATA-2451] [CARBONDATA-2450] [CARBONDATA-2453] Fixed issues related to complex types
Issue 1: Dictionary encoding was being added to complex types in SDK
case which led to data load failure
Issue 2: Sort columns were not being checked against table schema to
validate the same.
Issue 3: Bad record handling was not there for complex types.
Issue 4: Parent name was not being prepended to field name before
checking for duplicates which threw duplicate column exception
This closes #2278
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6b70b7e4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6b70b7e4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6b70b7e4
Branch: refs/heads/spark-2.3
Commit: 6b70b7e47b05a612ccb5a5ad01ee2d5a05ffa600
Parents: 8e7fceb
Author: kunal642 <ku...@gmail.com>
Authored: Mon May 7 20:58:21 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri May 11 03:27:36 2018 +0530
----------------------------------------------------------------------
.../schema/table/TableSchemaBuilder.java | 21 +-
.../complexType/TestComplexTypeQuery.scala | 2 +
.../TestNonTransactionalCarbonTable.scala | 410 +++++++++++++++++--
.../processing/datatypes/ArrayDataType.java | 11 +-
.../processing/datatypes/GenericDataType.java | 3 +-
.../processing/datatypes/PrimitiveDataType.java | 41 +-
.../processing/datatypes/StructDataType.java | 11 +-
.../loading/DataLoadProcessBuilder.java | 9 +
.../impl/ComplexFieldConverterImpl.java | 2 +-
.../DirectDictionaryFieldConverterImpl.java | 1 -
.../loading/model/CarbonLoadModelBuilder.java | 15 +-
.../InputProcessorStepWithNoConverterImpl.java | 32 +-
.../sdk/file/CarbonWriterBuilder.java | 24 +-
13 files changed, 524 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index b078400..03d03f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -122,7 +122,13 @@ public class TableSchemaBuilder {
private ColumnSchema addColumn(StructField field, String parentName, AtomicInteger valIndex,
boolean isSortColumn, boolean isComplexChild) {
Objects.requireNonNull(field);
- checkRepeatColumnName(field);
+ if (isComplexChild) {
+ // if field is complex then append parent name to the child field to check
+ // if any other field with same name exists
+ checkRepeatColumnName(field, parentName);
+ } else {
+ checkRepeatColumnName(field);
+ }
ColumnSchema newColumn = new ColumnSchema();
if (parentName != null) {
newColumn.setColumnName(parentName + "." + field.getFieldName());
@@ -156,7 +162,7 @@ public class TableSchemaBuilder {
// SO, this will not have any impact.
newColumn.setColumnUniqueId(field.getFieldName());
newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
- newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
+ newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn, isComplexChild));
if (field.getDataType().isComplexType()) {
if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
newColumn.setNumberOfChild(1);
@@ -209,6 +215,12 @@ public class TableSchemaBuilder {
/**
* Throw exception if {@param field} name is repeated
*/
+ private void checkRepeatColumnName(StructField field, String parentName) {
+ checkRepeatColumnName(
+ new StructField(parentName + "." + field.getFieldName(), field.getDataType(),
+ field.getChildren()));
+ }
+
private void checkRepeatColumnName(StructField field) {
for (ColumnSchema column : sortColumns) {
if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) {
@@ -234,9 +246,10 @@ public class TableSchemaBuilder {
}
}
- private List<Encoding> createEncoding(DataType dataType, boolean isSortColumn) {
+ private List<Encoding> createEncoding(DataType dataType, boolean isSortColumn,
+ boolean isComplexChild) {
List<Encoding> encodings = new LinkedList<>();
- if (dataType == DataTypes.TIMESTAMP || dataType == DataTypes.DATE) {
+ if (dataType == DataTypes.DATE && !isComplexChild) {
encodings.add(Encoding.DIRECT_DICTIONARY);
encodings.add(Encoding.DICTIONARY);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
index bc44df0..6728cdf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
@@ -38,6 +38,8 @@ class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ "force")
sql("drop table if exists complexcarbontable")
sql("drop table if exists complexhivetable")
sql("drop table if exists complex_filter")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 3c51efe..376501b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.spark.testsuite.createTable
import java.sql.Timestamp
import java.io.{File, FileFilter, IOException}
+import java.io.{File, FileFilter}
import java.util
import org.apache.commons.io.FileUtils
@@ -33,6 +34,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.sdk.file.AvroCarbonWriter
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -40,7 +42,7 @@ import org.apache.avro
import org.apache.commons.lang.CharEncoding
import tech.allegro.schema.json2avro.converter.JsonAvroConverter
-import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
import org.apache.carbondata.sdk.file.{CarbonWriter, CarbonWriterBuilder, Field, Schema}
@@ -219,6 +221,9 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
override def beforeAll(): Unit = {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
sql("DROP TABLE IF EXISTS sdkOutputTable")
}
@@ -247,8 +252,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
- test("test create External Table with Schema with partition, should ignore schema and partition")
- {
+ test("test create External Table with Schema with partition, should ignore schema and partition") {
buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -270,8 +274,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
- test("test create External Table with insert into feature")
- {
+ test("test create External Table with insert into feature") {
buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -302,8 +305,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
- test("test create External Table with insert overwrite")
- {
+ test("test create External Table with insert overwrite") {
buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -341,8 +343,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
- test("test create External Table with Load")
- {
+ test("test create External Table with Load") {
buildTestDataSingleFile()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -563,6 +564,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
+
test("Read sdk writer output file without any file should fail") {
buildTestDataSingleFile()
deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
@@ -748,7 +750,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
-
test("Read sdk two writer output with same column name but different sort columns") {
FileUtils.deleteDirectory(new File(writerPath))
buildTestDataOtherDataType(3, Array[String]("name"))
@@ -814,7 +815,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
- private def WriteFilesWithAvroWriter(rows: Int, mySchema: String, json: String): Unit = {
+ private def WriteFilesWithAvroWriter(rows: Int,
+ mySchema: String,
+ json: String,
+ fields: Array[Field]) = {
// conversion to GenericData.Record
val nn = new avro.Schema.Parser().parse(mySchema)
val converter = new JsonAvroConverter
@@ -822,8 +826,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
try {
- val writer = CarbonWriter.builder
- .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
+ val writer = CarbonWriter.builder.withSchema(new Schema(fields))
.outputPath(writerPath).isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput
var i = 0
@@ -860,7 +863,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
""".stripMargin
val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
- WriteFilesWithAvroWriter(rows, mySchema, json)
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+ fields(2) = new Field("address", "struct", fld)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataStructType(): Any = {
@@ -899,7 +911,17 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
""".stripMargin
val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """
- WriteFilesWithAvroWriter(rows, mySchema, json)
+
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+ // fields[1] = new Field("age", DataTypes.INT);
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fields(2) = new Field("address", "array", fld)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataSingleFileArrayType(): Any = {
@@ -943,7 +965,18 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
""" {"name":"bob", "age":10,
|"address" : {"street":"abc", "city":"bang"},
|"doorNum" : [1,2,3,4]}""".stripMargin
- WriteFilesWithAvroWriter(rows, mySchema, json)
+
+ val fields = new Array[Field](4)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+ fields(2) = new Field("address", "struct", fld)
+ val fld1 = new util.ArrayList[StructField]
+ fld1.add(new StructField("eachDoorNum", DataTypes.INT))
+ fields(3) = new Field("doorNum", "array", fld1)
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataBothStructArrayType(): Any = {
@@ -951,6 +984,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
buildAvroTestDataStructWithArrayType(3, null)
}
+
// ArrayOfStruct test
def buildAvroTestDataArrayOfStruct(rows: Int, options: util.Map[String, String]): Any = {
FileUtils.deleteDirectory(new File(writerPath))
@@ -995,7 +1029,20 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
|{"street":"def","city":"city2"},
|{"street":"ghi","city":"city3"},
|{"street":"jkl","city":"city4"}]} """.stripMargin
- WriteFilesWithAvroWriter(rows, mySchema, json)
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+
+ val fld2 = new util.ArrayList[StructField]
+ fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+ fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataArrayOfStructType(): Any = {
@@ -1003,6 +1050,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
buildAvroTestDataArrayOfStruct(3, null)
}
+
// StructOfArray test
def buildAvroTestDataStructOfArray(rows: Int, options: util.Map[String, String]): Any = {
FileUtils.deleteDirectory(new File(writerPath))
@@ -1064,7 +1112,21 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
| ]
| }
|} """.stripMargin
- WriteFilesWithAvroWriter(rows, mySchema, json)
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val fld2 = new util.ArrayList[StructField]
+ fld2.add(new StructField("street", DataTypes.STRING))
+ fld2.add(new StructField("city", DataTypes.STRING))
+
+ val fld1 = new util.ArrayList[StructField]
+ fld1.add(new StructField("eachDoorNum", DataTypes.INT))
+ fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1))
+
+ fields(2) = new Field("address","struct",fld2)
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataStructOfArrayType(): Any = {
@@ -1072,6 +1134,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
buildAvroTestDataStructOfArray(3, null)
}
+
test("Read sdk writer Avro output Record Type") {
buildAvroTestDataStructType()
assert(new File(writerPath).exists())
@@ -1080,6 +1143,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
+
checkAnswer(sql("select * from sdkOutputTable"), Seq(
Row("bob", 10, Row("abc","bang")),
Row("bob", 10, Row("abc","bang")),
@@ -1138,6 +1202,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
+
test("Read sdk writer Avro output with Array of struct") {
buildAvroTestDataArrayOfStructType()
assert(new File(writerPath).exists())
@@ -1163,6 +1228,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
+
// Struct of array
test("Read sdk writer Avro output with struct of Array") {
buildAvroTestDataStructOfArrayType()
@@ -1264,7 +1330,21 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
| }
| ]
|} """.stripMargin
- WriteFilesWithAvroWriter(rows, mySchema, json)
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+ fld.add(new StructField("FloorNum", DataTypes.createArrayType(DataTypes.INT)))
+
+ val fld2 = new util.ArrayList[StructField]
+ fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+ fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataMultiLevel3Type(): Any = {
@@ -1302,6 +1382,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
+
// test multi level -- 3 levels [array of struct of struct of string, int]
def buildAvroTestDataMultiLevel3_1(rows: Int, options: util.Map[String, String]): Any = {
FileUtils.deleteDirectory(new File(writerPath))
@@ -1381,7 +1462,26 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
| }
| ]
|} """.stripMargin
- WriteFilesWithAvroWriter(rows, mySchema, json)
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+
+ val subFld = new util.ArrayList[StructField]
+ subFld.add(new StructField("wing", DataTypes.STRING))
+ subFld.add(new StructField("number", DataTypes.INT))
+ fld.add(new StructField("FloorNum", DataTypes.createStructType(subFld)))
+
+ // array of struct of struct
+ val fld2 = new util.ArrayList[StructField]
+ fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+ fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataMultiLevel3_1Type(): Any = {
@@ -1461,7 +1561,22 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
| "BuildNum": [[[1,2,3],[4,5,6]],[[10,20,30],[40,50,60]]]
| } """.stripMargin
- WriteFilesWithAvroWriter(rows, mySchema, json)
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val subFld = new util.ArrayList[StructField]
+ subFld.add(new StructField("EachDoorNum", DataTypes.INT))
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("DoorNum", DataTypes.createArrayType(DataTypes.INT), subFld))
+ // array of struct of struct
+ val doorNum = new util.ArrayList[StructField]
+ doorNum.add(new StructField("FloorNum",
+ DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.INT)), fld))
+ fields(2) = new Field("BuildNum", "array", doorNum)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataMultiLevel3_2Type(): Any = {
@@ -1500,6 +1615,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
+
+
// test multi level -- 4 levels [array of array of array of struct]
def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String]): Any = {
FileUtils.deleteDirectory(new File(writerPath))
@@ -1578,7 +1695,30 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
| ]
| ]
|} """.stripMargin
- WriteFilesWithAvroWriter(rows, mySchema, json)
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val subFld = new util.ArrayList[StructField]
+ subFld.add(new StructField("EachDoorNum", DataTypes.INT))
+
+ val address = new util.ArrayList[StructField]
+ address.add(new StructField("street", DataTypes.STRING))
+ address.add(new StructField("city", DataTypes.STRING))
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("DoorNum",
+ DataTypes.createArrayType(DataTypes.createStructType(address)),
+ subFld))
+ // array of struct of struct
+ val doorNum = new util.ArrayList[StructField]
+ doorNum.add(new StructField("FloorNum",
+ DataTypes.createArrayType(
+ DataTypes.createArrayType(DataTypes.createStructType(address))), fld))
+ fields(2) = new Field("BuildNum", "array", doorNum)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
}
def buildAvroTestDataMultiLevel4Type(): Any = {
@@ -1604,4 +1744,228 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
-}
\ No newline at end of file
+ test(
+ "test if exception is thrown when a column which is not in schema is specified in sort columns")
+ {
+ val schema1 =
+ """{
+ | "namespace": "com.apache.schema",
+ | "type": "record",
+ | "name": "StudentActivity",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": "int"
+ | },
+ | {
+ | "name": "course_details",
+ | "type": {
+ | "name": "course_details",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "course_struct_course_time",
+ | "type": "string"
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+
+ val json1 =
+ """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05" }}""".stripMargin
+
+ val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+ val fields = new Array[Field](2)
+ fields(0) = new Field("id", DataTypes.INT)
+ val fld_s = new java.util.ArrayList[StructField]
+ fld_s.add(new StructField("course_struct_course_time", DataTypes.STRING))
+ fields(1) = new Field("course_details", "struct", fld_s)
+
+ assert(intercept[RuntimeException] {
+ val writer = CarbonWriter.builder.withSchema(new Schema(fields)).sortBy(Array("name", "id"))
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+ writer.write(record)
+ writer.close()
+ }.getMessage.toLowerCase.contains("column: name specified in sort columns"))
+ }
+
+ test("test if data load is success with a struct having timestamp column ") {
+ val schema1 =
+ """{
+ | "namespace": "com.apache.schema",
+ | "type": "record",
+ | "name": "StudentActivity",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": "int"
+ | },
+ | {
+ | "name": "course_details",
+ | "type": {
+ | "name": "course_details",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "course_struct_course_time",
+ | "type": "string"
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+
+ val json1 =
+ """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05 00:00:00" }}""".stripMargin
+ val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+ val fields = new Array[Field](2)
+ fields(0) = new Field("id", DataTypes.INT)
+ val fld_s = new java.util.ArrayList[StructField]
+ fld_s.add(new StructField("course_struct_course_time", DataTypes.TIMESTAMP))
+ fields(1) = new Field("course_details", "struct", fld_s)
+
+ val writer = CarbonWriter.builder.withSchema(new Schema(fields)).sortBy(Array("id"))
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+ writer.write(record)
+ writer.close()
+ }
+
+ test(
+ "test is dataload is successful if childcolumn has same name as one of the other fields(not " +
+ "complex)")
+ {
+ val schema =
+ """{
+ | "type": "record",
+ | "name": "Order",
+ | "namespace": "com.apache.schema",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": "long"
+ | },
+ | {
+ | "name": "entries",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "type": "record",
+ | "name": "Entry",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": "long"
+ | }
+ | ]
+ | }
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+ val json1 =
+ """{"id": 101, "entries": [ {"id":1234}, {"id":3212} ]}""".stripMargin
+
+ val nn = new org.apache.avro.Schema.Parser().parse(schema)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+ val fields = new Array[Field](2)
+ fields(0) = new Field("id", DataTypes.LONG)
+ val fld_s = new java.util.ArrayList[StructField]
+ fld_s.add(new StructField("id", DataTypes.LONG))
+ fields(1) = new Field("entries", DataTypes.createArrayType(DataTypes.createStructType(fld_s)))
+ val writer = CarbonWriter.builder.withSchema(new Schema(fields))
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+ writer.write(record)
+ writer.close()
+ }
+
+ test("test if data load with various bad_records_action") {
+ val schema =
+ """{
+ | "namespace": "com.apache.schema",
+ | "type": "record",
+ | "name": "StudentActivity",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": "string"
+ | },
+ | {
+ | "name": "course_details",
+ | "type": {
+ | "name": "course_details",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "course_struct_course_string",
+ | "type": "string"
+ | }
+ | ]
+ | }
+ | },
+ | {
+ | "name": "salary_string",
+ | "type": {
+ | "type": "array",
+ | "items": "string"
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+ val json1 =
+ """{
+ | "id": "cust_1",
+ | "course_details": {
+ | "course_struct_course_string": "asd"
+ | },
+ | "salary_string": [
+ | "xyz",
+ | "abc"
+ | ]
+ |}""".stripMargin
+
+ val nn = new org.apache.avro.Schema.Parser().parse(schema)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+ val fields = new Array[Field](3)
+ fields(0)=new Field("id", DataTypes.STRING)
+ val fld_s = new java.util.ArrayList[StructField]
+ fld_s.add(new StructField("carbon_int", DataTypes.INT))
+ fields(1)=new Field("course_details", "struct",fld_s)
+
+ val fld_a = new java.util.ArrayList[StructField]
+ fld_a.add(new StructField("carbon_array", DataTypes.INT))
+ fields(2)=new Field("salary_string", "array",fld_a)
+
+ val loadOptions = new util.HashMap[String, String]()
+ loadOptions.put("bad_records_action", "fail")
+ assert(intercept[Exception] {
+ val writer = CarbonWriter.builder.withSchema(new Schema(fields)).outputPath(writerPath)
+ .isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForAvroInput
+ writer.write(record)
+ writer.close()
+ }.getMessage.contains("Data load failed due to bad record"))
+
+ loadOptions.put("bad_records_action", "FORCE")
+ val writer = CarbonWriter.builder.withSchema(new Schema(fields)).outputPath(writerPath)
+ .isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForAvroInput
+ writer.write(record)
+ writer.close()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index d7d59ce..cc2619e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
-
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
/**
* Array DataType stateless object used in data loading
@@ -151,17 +151,16 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
return true;
}
- @Override
- public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream)
- throws IOException, DictionaryGenerationException {
+ @Override public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream,
+ BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException {
if (input == null) {
dataOutputStream.writeInt(1);
- children.writeByteArray(null, dataOutputStream);
+ children.writeByteArray(null, dataOutputStream, logHolder);
} else {
Object[] data = input.getData();
dataOutputStream.writeInt(data.length);
for (Object eachInput : data) {
- children.writeByteArray(eachInput, dataOutputStream);
+ children.writeByteArray(eachInput, dataOutputStream, logHolder);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 21ad83d..f48a91d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
/**
* Generic DataType interface which will be used while data loading for complex types like Array &
@@ -58,7 +59,7 @@ public interface GenericDataType<T> {
* @param dataOutputStream
* @throws IOException
*/
- void writeByteArray(T input, DataOutputStream dataOutputStream)
+ void writeByteArray(T input, DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
throws IOException, DictionaryGenerationException;
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index e34c184..fdfc3bb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -48,10 +48,12 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
/**
* Primitive DataType stateless object used in data loading
@@ -265,19 +267,29 @@ public class PrimitiveDataType implements GenericDataType<Object> {
return isDictionary;
}
- @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream)
- throws IOException, DictionaryGenerationException {
-
+ @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream,
+ BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException {
String parsedValue =
input == null ? null : DataTypeUtil.parseValue(input.toString(), carbonDimension);
+ String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
if (this.isDictionary) {
Integer surrogateKey;
if (null == parsedValue) {
surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+ if (null == message) {
+ message = CarbonDataProcessorUtil
+ .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
+ logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
+ logHolder.setReason(message);
+ }
} else {
surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+ message = CarbonDataProcessorUtil
+ .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
+ logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
+ logHolder.setReason(message);
}
}
dataOutputStream.writeInt(surrogateKey);
@@ -285,15 +297,15 @@ public class PrimitiveDataType implements GenericDataType<Object> {
// Transform into ByteArray for No Dictionary.
// TODO have to refactor and place all the cases present in NonDictionaryFieldConverterImpl
if (null == parsedValue && this.carbonDimension.getDataType() != DataTypes.STRING) {
- updateNullValue(dataOutputStream);
+ updateNullValue(dataOutputStream, logHolder);
} else if (null == parsedValue || parsedValue.equals(nullformat)) {
- updateNullValue(dataOutputStream);
+ updateNullValue(dataOutputStream, logHolder);
} else {
String dateFormat = null;
if (this.carbonDimension.getDataType() == DataTypes.DATE) {
- dateFormat = this.carbonDimension.getDateFormat();
+ dateFormat = carbonDimension.getDateFormat();
} else if (this.carbonDimension.getDataType() == DataTypes.TIMESTAMP) {
- dateFormat = this.carbonDimension.getTimestampFormat();
+ dateFormat = carbonDimension.getTimestampFormat();
}
try {
@@ -318,9 +330,12 @@ public class PrimitiveDataType implements GenericDataType<Object> {
updateValueToByteStream(dataOutputStream,
parsedValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
} else {
- updateNullValue(dataOutputStream);
+ updateNullValue(dataOutputStream, logHolder);
}
}
+ } catch (NumberFormatException e) {
+ // Update logHolder for bad record and put null in dataOutputStream.
+ updateNullValue(dataOutputStream, logHolder);
} catch (CarbonDataLoadingException e) {
throw e;
} catch (Throwable ex) {
@@ -338,7 +353,8 @@ public class PrimitiveDataType implements GenericDataType<Object> {
dataOutputStream.write(value);
}
- private void updateNullValue(DataOutputStream dataOutputStream) throws IOException {
+ private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
+ throws IOException {
if (this.carbonDimension.getDataType() == DataTypes.STRING) {
dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
@@ -346,6 +362,13 @@ public class PrimitiveDataType implements GenericDataType<Object> {
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
}
+ String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
+ if (null == message) {
+ message = CarbonDataProcessorUtil
+ .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
+ logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
+ logHolder.setReason(message);
+ }
}
@Override public void fillCardinality(List<Integer> dimCardWithComplex) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 4fe6255..bb3da6c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.processing.loading.complexobjects.StructObject;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
/**
* Struct DataType stateless object used in data loading
@@ -150,22 +151,22 @@ public class StructDataType implements GenericDataType<StructObject> {
return true;
}
- @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream)
- throws IOException, DictionaryGenerationException {
+ @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream,
+ BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException {
dataOutputStream.writeInt(children.size());
if (input == null) {
for (int i = 0; i < children.size(); i++) {
- children.get(i).writeByteArray(null, dataOutputStream);
+ children.get(i).writeByteArray(null, dataOutputStream, logHolder);
}
} else {
Object[] data = input.getData();
for (int i = 0; i < data.length && i < children.size(); i++) {
- children.get(i).writeByteArray(data[i], dataOutputStream);
+ children.get(i).writeByteArray(data[i], dataOutputStream, logHolder);
}
// For other children elements which dont have data, write empty
for (int i = data.length; i < children.size(); i++) {
- children.get(i).writeByteArray(null, dataOutputStream);
+ children.get(i).writeByteArray(null, dataOutputStream, logHolder);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 2f904ed..17d0c76 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -237,6 +237,15 @@ public final class DataLoadProcessBuilder {
}
if (column.isComplex()) {
complexDataFields.add(dataField);
+ List<CarbonDimension> childDimensions =
+ ((CarbonDimension) dataField.getColumn()).getListOfChildDimensions();
+ for (CarbonDimension childDimension : childDimensions) {
+ if (childDimension.getDataType() == DataTypes.DATE) {
+ childDimension.setDateFormat(loadModel.getDateFormat());
+ } else if (childDimension.getDataType() == DataTypes.TIMESTAMP) {
+ childDimension.setTimestampFormat(loadModel.getTimestampformat());
+ }
+ }
} else {
dataFields.add(dataField);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
index b26ef36..4e46f9f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -44,7 +44,7 @@ public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterI
ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
try {
- genericDataType.writeByteArray(object, dataOutputStream);
+ genericDataType.writeByteArray(object, dataOutputStream, logHolder);
dataOutputStream.close();
row.update(byteArray.toByteArray(), index);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
index b49cd90..64ddf27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -54,7 +54,6 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(dataField.getColumn().getDataType(),
dataField.getTimestampFormat());
-
} else {
this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(dataField.getColumn().getDataType());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 13dd75c..9a9d09e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -79,7 +79,20 @@ public class CarbonLoadModelBuilder {
// we have provided 'fileheader', so it hadoopConf can be null
build(options, optionsFinal, model, null);
-
+ String timestampFormat = options.get("timestampformat");
+ if (timestampFormat == null) {
+ timestampFormat = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+ }
+ String dateFormat = options.get("dateFormat");
+ if (dateFormat == null) {
+ dateFormat = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+ }
+ model.setDateFormat(dateFormat);
+ model.setTimestampformat(timestampFormat);
model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false")));
model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null));
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 77f5260..c99a413 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -35,11 +35,15 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory;
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -134,7 +138,7 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
outIterators[i] =
new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
rowCounter, orderOfData, noDictionaryMapping, dataTypes,
- configuration.getDataFields(), dataFieldsWithComplexDataType);
+ configuration, dataFieldsWithComplexDataType);
}
return outIterators;
}
@@ -207,11 +211,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
private int[] orderOfData;
+ private CarbonDataLoadConfiguration configuration;
+
private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
- DataType[] dataTypes, DataField[] dataFields,
+ DataType[] dataTypes, CarbonDataLoadConfiguration configuration,
Map<Integer, GenericDataType> dataFieldsWithComplexDataType) {
this.inputIterators = inputIterators;
this.batchSize = batchSize;
@@ -223,7 +229,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
this.firstTime = true;
this.noDictionaryMapping = noDictionaryMapping;
this.dataTypes = dataTypes;
- this.dataFields = dataFields;
+ this.dataFields = configuration.getDataFields();
+ this.configuration = configuration;
this.orderOfData = orderOfData;
this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType;
}
@@ -272,6 +279,9 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) {
Object[] newData = new Object[data.length];
+ BadRecordLogHolder logHolder = new BadRecordLogHolder();
+ BadRecordsLogger badRecordLogger =
+ BadRecordsLoggerProvider.createBadRecordLogger(configuration);
for (int i = 0; i < data.length; i++) {
if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
newData[i] = DataTypeUtil
@@ -284,11 +294,21 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
try {
GenericDataType complextType =
dataFieldsWithComplexDataType.get(dataFields[i].getColumn().getOrdinal());
-
- complextType.writeByteArray(data[orderOfData[i]], dataOutputStream);
-
+ complextType.writeByteArray(data[orderOfData[i]], dataOutputStream, logHolder);
+ if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
+ badRecordLogger.addBadRecordsToBuilder(data, logHolder.getReason());
+ if (badRecordLogger.isDataLoadFail()) {
+ String error = "Data load failed due to bad record: " + logHolder.getReason();
+ if (!badRecordLogger.isBadRecordLoggerEnable()) {
+ error += "Please enable bad record logger to know the detail reason.";
+ }
+ throw new BadRecordFoundException(error);
+ }
+ }
dataOutputStream.close();
newData[i] = byteArray.toByteArray();
+ } catch (BadRecordFoundException e) {
+ throw new CarbonDataLoadingException("Loading Exception: " + e.getMessage(), e);
} catch (Exception e) {
throw new CarbonDataLoadingException("Loading Exception", e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index f541dbb..00ba8a5 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -431,6 +431,20 @@ public class CarbonWriterBuilder {
// to child of complex array type in the order val1, val2 so that each array type child is
// differentiated to any level
AtomicInteger valIndex = new AtomicInteger(0);
+ // Check if any of the columns specified in sort columns are missing from schema.
+ for (String sortColumn: sortColumnsList) {
+ boolean exists = false;
+ for (Field field : fields) {
+ if (field.getFieldName().equalsIgnoreCase(sortColumn)) {
+ exists = true;
+ break;
+ }
+ }
+ if (!exists) {
+ throw new RuntimeException(
+ "column: " + sortColumn + " specified in sort columns does not exist in schema");
+ }
+ }
for (Field field : fields) {
if (null != field) {
int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
@@ -442,9 +456,9 @@ public class CarbonWriterBuilder {
" sort columns not supported for " + "array, struct, double, float, decimal ");
}
}
-
if (field.getChildren() != null && field.getChildren().size() > 0) {
if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
+ checkForUnsupportedDataTypes(field.getChildren().get(0).getDataType());
// Loop through the inner columns and for a StructData
DataType complexType =
DataTypes.createArrayType(field.getChildren().get(0).getDataType());
@@ -455,6 +469,7 @@ public class CarbonWriterBuilder {
List<StructField> structFieldsArray =
new ArrayList<StructField>(field.getChildren().size());
for (StructField childFld : field.getChildren()) {
+ checkForUnsupportedDataTypes(childFld.getDataType());
structFieldsArray
.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
}
@@ -475,6 +490,13 @@ public class CarbonWriterBuilder {
}
}
+ private void checkForUnsupportedDataTypes(DataType dataType) {
+ if (dataType == DataTypes.DOUBLE || dataType == DataTypes.DATE || DataTypes
+ .isDecimal(dataType)) {
+ throw new RuntimeException("Unsupported data type: " + dataType.getName());
+ }
+ }
+
/**
* Save the schema of the {@param table} to {@param persistFilePath}
* @param table table object containing schema