You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/08/29 07:57:56 UTC

carbondata git commit: [CARBONDATA-2887] Fix complex filters on spark carbon file format

Repository: carbondata
Updated Branches:
  refs/heads/master d801548aa -> 2f537b724


[CARBONDATA-2887] Fix complex filters on spark carbon file format

Problem:
Filters on complex types are not working using carbon fileformat as it try to push down nonull filter of complex type to carbon,
 but carbon does not handle any type of filters in complex types.
Solution:
Removed all types complex filters pushed down from carbon fileformat

This closes #2659


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f537b72
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f537b72
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f537b72

Branch: refs/heads/master
Commit: 2f537b724f6f03ab40c95f7ecc8ebd38f6500099
Parents: d801548
Author: ravipesala <ra...@gmail.com>
Authored: Fri Aug 24 20:43:07 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Aug 29 13:27:08 2018 +0530

----------------------------------------------------------------------
 .../spark/sql/test/TestQueryExecutor.scala      |   1 +
 .../execution/datasources/CarbonFileIndex.scala |  15 +-
 .../CarbonFileIndexReplaceRule.scala            |   2 +-
 .../datasources/CarbonSparkDataSourceUtil.scala |  34 ++-
 .../datasources/SparkCarbonFileFormat.scala     |  33 ++-
 .../src/test/resources/Array.csv                |  21 ++
 .../spark-datasource/src/test/resources/j2.csv  |   1 +
 .../src/test/resources/structofarray.csv        |  21 ++
 .../datasource/SparkCarbonDataSourceTest.scala  | 267 +++++++++++++++++--
 ...tCreateTableUsingSparkCarbonFileFormat.scala |   9 +-
 .../sql/carbondata/datasource/TestUtil.scala    |  16 +-
 .../InputProcessorStepWithNoConverterImpl.java  |  21 +-
 12 files changed, 355 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index d3a20c3..f69a142 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -153,6 +153,7 @@ object TestQueryExecutor {
     TestQueryExecutor.projectPath + "/core/target",
     TestQueryExecutor.projectPath + "/hadoop/target",
     TestQueryExecutor.projectPath + "/processing/target",
+    TestQueryExecutor.projectPath + "/integration/spark-datasource/target",
     TestQueryExecutor.projectPath + "/integration/spark-common/target",
     TestQueryExecutor.projectPath + "/integration/spark2/target",
     TestQueryExecutor.projectPath + "/integration/spark-common/target/jars",

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index 8471181..c330fcb 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -21,14 +21,13 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.{InMemoryFileIndex, _}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
@@ -79,9 +78,9 @@ class CarbonFileIndex(
   }
 
   private def prune(dataFilters: Seq[Expression],
-      directories: Seq[PartitionDirectory]) = {
+      directories: Seq[PartitionDirectory]): Seq[PartitionDirectory] = {
     val tablePath = parameters.get("path")
-    if (tablePath.nonEmpty) {
+    if (tablePath.nonEmpty && dataFilters.nonEmpty) {
       val hadoopConf = sparkSession.sessionState.newHadoopConf()
       // convert t sparks source filter
       val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
@@ -104,7 +103,7 @@ class CarbonFileIndex(
           map(new HDFSCarbonFile(_))
       }.toArray.asInstanceOf[Array[CarbonFile]]
       if (indexFiles.length == 0 && totalFiles > 0) {
-        throw new IOException("No Index files are present in the table location :" + tablePath.get)
+        return directories
       }
       CarbonInputFormat.setReadCommittedScope(
         hadoopConf,
@@ -125,7 +124,11 @@ class CarbonFileIndex(
       }
       prunedDirs
     } else {
-      directories
+      directories.map { dir =>
+        val files = dir.files
+          .filter(_.getPath.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT))
+        PartitionDirectory(dir.values, files)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
index ed67f48..f86200a 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
@@ -46,7 +46,7 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
         val fsRelation = l.relation.asInstanceOf[HadoopFsRelation]
         val fileIndex = fsRelation.location
         val carbonFileIndex = new CarbonFileIndex(fsRelation.sparkSession,
-          fsRelation.schema,
+          fsRelation.dataSchema,
           fsRelation.options,
           updateFileIndex(fileIndex, fsRelation))
         val fsRelationCopy = fsRelation.copy(location = carbonFileIndex)(fsRelation.sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 8724fd1..77c1dce 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -22,8 +22,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField, StructType => CarbonStructType}
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
@@ -196,20 +195,11 @@ object CarbonSparkDataSourceUtil {
   def prepareLoadModel(options: Map[String, String],
       dataSchema: StructType): CarbonLoadModel = {
     val schema = new Schema(dataSchema.fields.map { field =>
-      field.dataType match {
-        case s: StructType =>
-          new Field(field.name,
-            field.dataType.typeName,
-            s.fields
-              .map(f => new datatype.StructField(f.name,
-                CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
-        case a: ArrayType =>
-          new Field(field.name,
-            field.dataType.typeName,
-            Seq(new datatype.StructField(field.name,
-              CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
-        case other =>
-          new Field(field.name, field.dataType.simpleString)
+      val dataType = convertSparkToCarbonDataType(field.dataType)
+      dataType match {
+        case s: CarbonStructType =>
+          new Field(field.name, s, s.getFields)
+        case _ => new Field(field.name, dataType)
       }
     })
     val builder = new CarbonWriterBuilder
@@ -228,8 +218,16 @@ object CarbonSparkDataSourceUtil {
     builder.localDictionaryThreshold(
       options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
         CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
-    builder.sortBy(
-      options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
+    val sortCols = options.get(CarbonCommonConstants.SORT_COLUMNS) match {
+      case Some(cols) =>
+        if (cols.trim.isEmpty) {
+          Array[String]()
+        } else {
+          cols.split(",").map(_.trim)
+        }
+      case _ => null
+    }
+    builder.sortBy(sortCols)
     builder.uniqueIdentifier(System.currentTimeMillis())
     val model = builder.buildLoadModel(schema)
     val tableInfo = model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 81b395e..9bd73d4 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -79,7 +79,14 @@ class SparkCarbonFileFormat extends FileFormat
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     val tablePath = options.get("path") match {
-      case Some(path) => path
+      case Some(path) =>
+        val defaultFsUrl =
+          sparkSession.sparkContext.hadoopConfiguration.get(CarbonCommonConstants.FS_DEFAULT_FS)
+        if (defaultFsUrl == null) {
+          path
+        } else {
+          defaultFsUrl + CarbonCommonConstants.FILE_SEPARATOR + path
+        }
       case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
     }
 
@@ -225,6 +232,8 @@ class SparkCarbonFileFormat extends FileFormat
       while (i < data.length) {
         if (!row.isNullAt(i)) {
           dataType match {
+            case StringType =>
+              data(i) = row.getUTF8String(i).toString
             case d: DecimalType =>
               data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
             case s: StructType =>
@@ -233,6 +242,8 @@ class SparkCarbonFileFormat extends FileFormat
               data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
             case d: DateType =>
               data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+            case d: TimestampType =>
+              data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
             case other => data(i) = row.get(i, dataType)
           }
         } else {
@@ -281,7 +292,7 @@ class SparkCarbonFileFormat extends FileFormat
    */
   override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
     val conf = sparkSession.sessionState.conf
-    conf.wholeStageEnabled &&
+    supportVector(sparkSession, schema) && conf.wholeStageEnabled &&
     schema.length <= conf.wholeStageMaxNumFields &&
     schema.forall(_.dataType.isInstanceOf[AtomicType])
   }
@@ -297,7 +308,13 @@ class SparkCarbonFileFormat extends FileFormat
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+    val dataTypeMap = dataSchema.map(f => f.name -> f.dataType).toMap
+    // Filter out the complex filters as carbon does not support them.
+    val filter: Option[CarbonExpression] = filters.filterNot{ ref =>
+      ref.references.exists{ p =>
+        !dataTypeMap(p).isInstanceOf[AtomicType]
+      }
+    }.flatMap { filter =>
       CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
     }.reduceOption(new AndExpression(_, _))
 
@@ -305,13 +322,11 @@ class SparkCarbonFileFormat extends FileFormat
     val carbonProjection = new CarbonProjection
     projection.foreach(carbonProjection.addColumn)
 
-    var supportBatchValue: Boolean = false
-
     val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
-    val readVector = supportVector(sparkSession, resultSchema)
-    if (readVector) {
-      supportBatchValue = supportBatch(sparkSession, resultSchema)
-    }
+
+    var supportBatchValue: Boolean = supportBatch(sparkSession, resultSchema)
+    val readVector = supportVector(sparkSession, resultSchema) && supportBatchValue
+
     val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
     CarbonInputFormat
       .setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/test/resources/Array.csv
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/resources/Array.csv b/integration/spark-datasource/src/test/resources/Array.csv
new file mode 100644
index 0000000..08c5de1
--- /dev/null
+++ b/integration/spark-datasource/src/test/resources/Array.csv
@@ -0,0 +1,21 @@
+Cust00000000000000000000,2015,1,20,M,SSC,Y,1234$5678$9101$11121$12357,United Kingdom$England$Basildon$AAID001001$United Kingdom$England$Basildon$AD003001$AAID001001$United Kingdom$England$Basildon$AD003001$United Kingdom$England$Basildon$COUNTY00045,2015-01-01 00:00:00$2014-01-01 00:00:00$2013-01-01 00:00:00$2012-01-01 00:00:00$2011-01-01 00:00:00,21,55,58,337982404.6,989431364.6
+Cust00000000000000000001,2015,1,30,F,Degree,N,1235$5679$9102$11122$12358,United States$MO$Parkville$AAID001002$United States$MO$Parkville$AD003002$AAID001002$United States$MO$Parkville$AD003002$United States$MO$Parkville$COUNTY00046,2015-01-02 00:00:00$2014-01-02 00:00:00$2013-01-02 00:00:00$2012-01-02 00:00:00$2011-01-02 00:00:00,104,59,50,686815400.5,157442142.4
+Cust00000000000000000002,2015,1,40,M,graduation,D,1236$5680$9103$11123$12359,United States$OR$Astoria$AAID001003$United States$OR$Astoria$AD003003$AAID001003$United States$OR$Astoria$AD003003$United States$OR$Astoria$COUNTY00047,2015-01-03 00:00:00$2014-01-03 00:00:00$2013-01-03 00:00:00$2012-01-03 00:00:00$2011-01-03 00:00:00,141,190,145,106733870.5,182602141
+Cust00000000000000000003,2015,1,50,F,PG,Y,1237$5681$9104$11124$12360,Australia$Victoria$Echuca$AAID001004$Australia$Victoria$Echuca$AD003004$AAID001004$Australia$Victoria$Echuca$AD003004$Australia$Victoria$Echuca$COUNTY00048,2015-01-04 00:00:00$2014-01-04 00:00:00$2013-01-04 00:00:00$2012-01-04 00:00:00$2011-01-04 00:00:00,162,162,129,702614376.9,499071850.4
+Cust00000000000000000004,2015,1,60,M,MS,N,1238$5682$9105$11125$12361,United States$AL$Cahaba Heights$AAID001005$United States$AL$Cahaba Heights$AD003005$AAID001005$United States$AL$Cahaba Heights$AD003005$United States$AL$Cahaba Heights$COUNTY00049,2015-01-05 00:00:00$2014-01-05 00:00:00$2013-01-05 00:00:00$2012-01-05 00:00:00$2011-01-05 00:00:00,35,139,93,469745206.2,480746358.2
+Cust00000000000000000005,2015,1,70,F,Doctor,D,1239$5683$9106$11126$12362,United States$NJ$Mickleton$AAID001006$United States$NJ$Mickleton$AD003006$AAID001006$United States$NJ$Mickleton$AD003006$United States$NJ$Mickleton$COUNTY00050,2015-01-06 00:00:00$2014-01-06 00:00:00$2013-01-06 00:00:00$2012-01-06 00:00:00$2011-01-06 00:00:00,143,11,117,765486177,832680202.8
+Cust00000000000000000006,2015,1,80,M,Layer,Y,1240$5684$9107$11127$12363,United States$IL$Peoria$AAID001007$United States$IL$Peoria$AD003007$AAID001007$United States$IL$Peoria$AD003007$United States$IL$Peoria$COUNTY00051,2015-01-07 00:00:00$2014-01-07 00:00:00$2013-01-07 00:00:00$2012-01-07 00:00:00$2011-01-07 00:00:00,69,46,3,384037419,960098816.4
+Cust00000000000000000007,2015,1,90,F,Cop,N,1241$5685$9108$11128$12364,United States$TN$Martin$AAID001008$United States$TN$Martin$AD003008$AAID001008$United States$TN$Martin$AD003008$United States$TN$Martin$COUNTY00052,2015-01-08 00:00:00$2014-01-08 00:00:00$2013-01-08 00:00:00$2012-01-08 00:00:00$2011-01-08 00:00:00,31,96,153,370423493.5,378702989.2
+Cust00000000000000000008,2015,1,95,M,Bank,D,1242$5686$9109$11129$12365,Israel$Tel Aviv$Tel Aviv$AAID001009$Israel$Tel Aviv$Tel Aviv$AD003009$AAID001009$Israel$Tel Aviv$Tel Aviv$AD003009$Israel$Tel Aviv$Tel Aviv$COUNTY00053,2015-01-09 00:00:00$2014-01-09 00:00:00$2013-01-09 00:00:00$2012-01-09 00:00:00$2011-01-09 00:00:00,74,178,146,84894789.63,241636065.5
+Cust00000000000000000009,2015,1,45,F,Group1,Y,1243$5687$9110$11130$12366,France$Ile-de-France$Chatou$AAID001010$France$Ile-de-France$Chatou$AD003010$AAID001010$France$Ile-de-France$Chatou$AD003010$France$Ile-de-France$Chatou$COUNTY00054,2015-01-10 00:00:00$2014-01-10 00:00:00$2013-01-10 00:00:00$2012-01-10 00:00:00$2011-01-10 00:00:00,154,74,113,533819711.4,517387103.8
+Cust00000000000000000010,2015,1,20,M,Group2,N,1244$5688$9111$11131$12367,United States$NY$New York$AAID001011$United States$NY$New York$AD003011$AAID001011$United States$NY$New York$AD003011$United States$NY$New York$COUNTY00055,2015-01-11 00:00:00$2014-01-11 00:00:00$2013-01-11 00:00:00$2012-01-11 00:00:00$2011-01-11 00:00:00,90,41,160,29095200.54,541633736.8
+Cust00000000000000000011,2015,1,30,F,Group3,D,1245$5689$9112$11132$12368,Netherlands$Noord-Brabant$Eindhoven$AAID001012$Netherlands$Noord-Brabant$Eindhoven$AD003012$AAID001012$Netherlands$Noord-Brabant$Eindhoven$AD003012$Netherlands$Noord-Brabant$Eindhoven$COUNTY00056,2015-01-12 00:00:00$2014-01-12 00:00:00$2013-01-12 00:00:00$2012-01-12 00:00:00$2011-01-12 00:00:00,168,165,135,981582131.7,667832871.1
+Cust00000000000000000012,2015,1,40,M,Group4,Y,1246$5690$9113$11133$12369,United States$TX$Shavano Park$AAID001013$United States$TX$Shavano Park$AD003013$AAID001013$United States$TX$Shavano Park$AD003013$United States$TX$Shavano Park$COUNTY00057,2015-01-13 00:00:00$2014-01-13 00:00:00$2013-01-13 00:00:00$2012-01-13 00:00:00$2011-01-13 00:00:00,49,105,4,764456100.7,870094004.8
+Cust00000000000000000013,2015,1,50,F,Group5,N,1247$5691$9114$11134$12370,United States$ID$Eagle$AAID001014$United States$ID$Eagle$AD003014$AAID001014$United States$ID$Eagle$AD003014$United States$ID$Eagle$COUNTY00058,2015-01-14 00:00:00$2014-01-14 00:00:00$2013-01-14 00:00:00$2012-01-14 00:00:00$2011-01-14 00:00:00,163,43,29,344022418.1,646897328.3
+Cust00000000000000000014,2015,1,60,M,Group6,D,1248$5692$9115$11135$12371,United States$NJ$Riverside$AAID001015$United States$NJ$Riverside$AD003015$AAID001015$United States$NJ$Riverside$AD003015$United States$NJ$Riverside$COUNTY00059,2015-01-15 00:00:00$2014-01-15 00:00:00$2013-01-15 00:00:00$2012-01-15 00:00:00$2011-01-15 00:00:00,195,109,34,483925883.8,961737525
+Cust00000000000000000015,2015,1,70,F,SSC,Y,1249$5693$9116$11136$12372,Ireland$Meath$Julianstown$AAID001016$Ireland$Meath$Julianstown$AD003016$AAID001016$Ireland$Meath$Julianstown$AD003016$Ireland$Meath$Julianstown$COUNTY00060,2015-01-16 00:00:00$2014-01-16 00:00:00$2013-01-16 00:00:00$2012-01-16 00:00:00$2011-01-16 00:00:00,149,174,78,315807967.4,274095983.1
+Cust00000000000000000016,2015,1,80,M,Degree,N,1250$5694$9117$11137$12373,Canada$Ontario$Ottawa$AAID001017$Canada$Ontario$Ottawa$AD003017$AAID001017$Canada$Ontario$Ottawa$AD003017$Canada$Ontario$Ottawa$COUNTY00061,2015-01-17 00:00:00$2014-01-17 00:00:00$2013-01-17 00:00:00$2012-01-17 00:00:00$2011-01-17 00:00:00,156,114,95,50368016.86,667794471.6
+Cust00000000000000000017,2015,1,90,F,graduation,D,1251$5695$9118$11138$12374,India$Andhra Pradesh$Hyderabad$AAID001018$India$Andhra Pradesh$Hyderabad$AD003018$AAID001018$India$Andhra Pradesh$Hyderabad$AD003018$India$Andhra Pradesh$Hyderabad$COUNTY00062,2015-01-18 00:00:00$2014-01-18 00:00:00$2013-01-18 00:00:00$2012-01-18 00:00:00$2011-01-18 00:00:00,48,39,102,504966685.5,786884573.4
+Cust00000000000000000018,2015,1,95,M,PG,Y,1252$5696$9119$11139$12375,United Kingdom$England$London$AAID001019$United Kingdom$England$London$AD003019$AAID001019$United Kingdom$England$London$AD003019$United Kingdom$England$London$COUNTY00063,2015-01-19 00:00:00$2014-01-19 00:00:00$2013-01-19 00:00:00$2012-01-19 00:00:00$2011-01-19 00:00:00,3,180,55,781101377.9,422578922.1
+Cust00000000000000000019,2015,1,45,F,MS,N,1253$5697$9120$11140$12376,United States$UT$Salt Lake City$AAID001020$United States$UT$Salt Lake City$AD003020$AAID001020$United States$UT$Salt Lake City$AD003020$United States$UT$Salt Lake City$COUNTY00064,2015-01-20 00:00:00$2014-01-20 00:00:00$2013-01-20 00:00:00$2012-01-20 00:00:00$2011-01-20 00:00:00,113,50,30,108832885.3,988692298
+Cust00000000000000000020,2015,1,20,M,Doctor,D,1254$5698$9121$11141$12377,United Kingdom$England$Manchester$AAID001021$United Kingdom$England$Manchester$AD003021$AAID001021$United Kingdom$England$Manchester$AD003021$United Kingdom$England$Manchester$COUNTY00065,2015-01-21 00:00:00$2014-01-21 00:00:00$2013-01-21 00:00:00$2012-01-21 00:00:00$2011-01-21 00:00:00,60,145,96,409507003.5,592528603.7
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/test/resources/j2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/resources/j2.csv b/integration/spark-datasource/src/test/resources/j2.csv
new file mode 100644
index 0000000..517dfa8
--- /dev/null
+++ b/integration/spark-datasource/src/test/resources/j2.csv
@@ -0,0 +1 @@
+1,gb3e5135-5533-4ee7-51b3-F61F1355b471,2,2,,,2,563FXN1S,2016-06-28,OORM1L,,,46315_4,,,,,,,,66116E013000000000000000,66116E013000000000000000,13.143.170.55,0.0.0.1,,1,1,ZOCERS,1,1,,,seach out for star wars starwars starwars@foxmovies.comAA,,,,64,6416557544541,26557544541,560111140564316,64075303555565,504,55,,,,63613316334514,,,,,,211111111111111111,,,1,1,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,11163575,20160628
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/test/resources/structofarray.csv
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/resources/structofarray.csv b/integration/spark-datasource/src/test/resources/structofarray.csv
new file mode 100644
index 0000000..fec3320
--- /dev/null
+++ b/integration/spark-datasource/src/test/resources/structofarray.csv
@@ -0,0 +1,21 @@
+Cust00000000000000000000,2015,1,20,M,SSC,Y,123456789$2015-01-01  00:00:00$100&3000$100.123&3000.234$United Kingdom&England$2015-01-01  00:00:00&2014-01-01  00:00:00,42,104,160,325046028.8,859616748.6
+Cust00000000000000000001,2015,1,30,F,Degree,N,123456790$2015-01-02  00:00:00$101&3000$101.123&3001.234$United States&MO$2015-01-02  00:00:00&2014-01-02  00:00:00,141,181,54,378476092.1,818599132.6
+Cust00000000000000000002,2015,1,40,M,graduation,D,123456791$2015-01-03  00:00:00$102&3000$102.123&3002.234$United States&OR$2015-01-03  00:00:00&2014-01-03  00:00:00,138,43,175,408335001.4,906020942.6
+Cust00000000000000000003,2015,1,50,F,PG,Y,123456792$2015-01-04  00:00:00$103&3000$103.123&3003.234$Australia&Victoria$2015-01-04  00:00:00&2014-01-04  00:00:00,96,63,184,493146274.5,556184083.3
+Cust00000000000000000004,2015,1,60,M,MS,N,123456793$2015-01-05  00:00:00$104&3000$104.123&3004.234$United States&AL$2015-01-05  00:00:00&2014-01-05  00:00:00,115,172,165,457941392.3,641744932.5
+Cust00000000000000000005,2015,1,70,F,Doctor,D,123456794$2015-01-06  00:00:00$105&3000$105.123&3005.234$United States&NJ$2015-01-06  00:00:00&2014-01-06  00:00:00,178,192,178,112452170.2,502438883.3
+Cust00000000000000000006,2015,1,80,M,Layer,Y,123456795$2015-01-07  00:00:00$106&3000$106.123&3006.234$United States&IL$2015-01-07  00:00:00&2014-01-07  00:00:00,172,194,49,943273831.2,37711205.33
+Cust00000000000000000007,2015,1,90,F,Cop,N,123456796$2015-01-08  00:00:00$107&3000$107.123&3007.234$United States&TN$2015-01-08  00:00:00&2014-01-08  00:00:00,163,23,180,991766321.3,452456856.7
+Cust00000000000000000008,2015,1,95,M,Bank,D,123456797$2015-01-09  00:00:00$108&3000$108.123&3008.234$Israel&Tel Aviv$2015-01-09  00:00:00&2014-01-09  00:00:00,113,18,176,747561503.5,388896200.6
+Cust00000000000000000009,2015,1,45,F,Group1,Y,123456798$2015-01-10  00:00:00$109&3000$109.123&3009.234$France&Ile-de-France$2015-01-10  00:00:00&2014-01-10  00:00:00,50,99,10,667010292.4,910085933.7
+Cust00000000000000000010,2015,1,20,M,Group2,N,123456799$2015-01-11  00:00:00$110&3000$110.123&3010.234$United States&NY$2015-01-11  00:00:00&2014-01-11  00:00:00,87,38,27,490913423.8,732302478
+Cust00000000000000000011,2015,1,30,F,Group3,D,123456800$2015-01-12  00:00:00$111&3000$111.123&3011.234$Netherlands&Noord-Brabant$2015-01-12  00:00:00&2014-01-12  00:00:00,83,113,114,143467881.4,856281203.2
+Cust00000000000000000012,2015,1,40,M,Group4,Y,123456801$2015-01-13  00:00:00$112&3000$112.123&3012.234$United States&TX$2015-01-13  00:00:00&2014-01-13  00:00:00,141,159,82,574817864.5,855050321.4
+Cust00000000000000000013,2015,1,50,F,Group5,N,123456802$2015-01-14  00:00:00$113&3000$113.123&3013.234$United States&ID$2015-01-14  00:00:00&2014-01-14  00:00:00,148,188,155,421169023.6,72662265.24
+Cust00000000000000000014,2015,1,60,M,Group6,D,123456803$2015-01-15  00:00:00$114&3000$114.123&3014.234$United States&NJ$2015-01-15  00:00:00&2014-01-15  00:00:00,56,194,21,859080548.6,678050965.3
+Cust00000000000000000015,2015,1,70,F,SSC,Y,123456804$2015-01-16  00:00:00$115&3000$115.123&3015.234$Ireland&Meath$2015-01-16  00:00:00&2014-01-16  00:00:00,154,142,76,250204030.1,766100816.4
+Cust00000000000000000016,2015,1,80,M,Degree,N,123456805$2015-01-17  00:00:00$116&3000$116.123&3016.234$Canada&Ontario$2015-01-17  00:00:00&2014-01-17  00:00:00,44,106,66,123232522.7,98330280.09
+Cust00000000000000000017,2015,1,90,F,graduation,D,123456806$2015-01-18  00:00:00$117&3000$117.123&3017.234$India&Andhra Pradesh$2015-01-18  00:00:00&2014-01-18  00:00:00,133,49,48,739339891.6,20633802.45
+Cust00000000000000000018,2015,1,95,M,PG,Y,123456807$2015-01-19  00:00:00$118&3000$118.123&3018.234$United Kingdom&England$2015-01-19  00:00:00&2014-01-19  00:00:00,5,109,147,441325651.7,6536309.25
+Cust00000000000000000019,2015,1,45,F,MS,N,123456808$2015-01-20  00:00:00$119&3000$119.123&3019.234$United States&UT$2015-01-20  00:00:00&2014-01-20  00:00:00,38,172,172,25330134.99,657416760.8
+Cust00000000000000000020,2015,1,20,M,Doctor,D,123456809$2015-01-21  00:00:00$120&3000$120.123&3020.234$United Kingdom&England$2015-01-21  00:00:00&2014-01-21  00:00:00,59,48,5,473181158.1,648379863.2

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 623f2c4..04a96e2 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -54,10 +54,13 @@ class SparkCarbonDataSourceTest extends FunSuite  with BeforeAndAfterAll {
       .format("parquet").saveAsTable("testparquet")
     spark.sql("create table carbon_table(c1 string, c2 string, number int) using carbon")
     spark.sql("insert into carbon_table select * from testparquet")
-    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from testparquet"))
-    val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
-    DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(warehouse1+"/carbon_table"))
-    assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table where c1='a1'"), spark.sql("select * from testparquet where c1='a1'"))
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+      DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
+      assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+    }
     spark.sql("drop table if exists testparquet")
     spark.sql("drop table if exists testformat")
   }
@@ -79,23 +82,27 @@ class SparkCarbonDataSourceTest extends FunSuite  with BeforeAndAfterAll {
   }
 
   test("test write using subfolder") {
-    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
-    import spark.implicits._
-    val df = spark.sparkContext.parallelize(1 to 10)
-      .map(x => ("a" + x % 10, "b", x))
-      .toDF("c1", "c2", "number")
-
-    // Saves dataframe to carbon file
-    df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
-    df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
-    df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
-
-    val frame = spark.read.format("carbon").load(warehouse1 + "/test_folder")
-    assert(frame.count() == 30)
-    val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
-    DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(warehouse1+"/test_folder"))
-    assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
-    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+      import spark.implicits._
+      val df = spark.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+
+      // Saves dataframe to carbon file
+      df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
+      df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
+      df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
+
+      val frame = spark.read.format("carbon").load(warehouse1 + "/test_folder")
+      assert(frame.where("c1='a1'").count() == 3)
+
+        val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+        DataMapStoreManager.getInstance()
+          .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/test_folder"))
+        assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    }
   }
 
   test("test write using partition ddl") {
@@ -112,7 +119,7 @@ class SparkCarbonDataSourceTest extends FunSuite  with BeforeAndAfterAll {
     spark.sql("create table carbon_table(c1 string, c2 string, number int) using carbon  PARTITIONED by (c2)")
     spark.sql("insert into carbon_table select * from testparquet")
     // TODO fix in 2.1
-    if (!spark.sparkContext.version.contains("2.1")) {
+    if (!spark.sparkContext.version.startsWith("2.1")) {
       assert(spark.sql("select * from carbon_table").count() == 10)
       TestUtil
         .checkAnswer(spark.sql("select * from carbon_table"),
@@ -285,6 +292,222 @@ class SparkCarbonDataSourceTest extends FunSuite  with BeforeAndAfterAll {
     spark.sql("drop table if exists date_parquet_table")
   }
 
+  test("test write with array type with filter") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Array("b", "c"), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 array<string>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table where c1='a1' and c2[0]='b'"), spark.sql("select * from parquet_table where c1='a1' and c2[0]='b'"))
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test write with struct type with filter") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, (Array("1", "2"), ("3", "4")),Array(("1", 1), ("2", 2)), x))
+      .toDF("c1", "c2", "c3",  "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 struct<a1:array<string>, a2:struct<a1:string, a2:string>>, c3 array<struct<a1:string, a2:int>>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table where c2.a1[0]='1' and c1='a1'"), spark.sql("select * from parquet_table where c2._1[0]='1' and c1='a1'"))
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table where c2.a1[0]='1' and c3[0].a2=1"), spark.sql("select * from parquet_table where c2._1[0]='1' and c3[0]._2=1"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
+
+  test("test read with df write string issue") {
+    spark.sql("drop table if exists test123")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x.toShort , x, x.toLong, x.toDouble, BigDecimal.apply(x),  Array(x+1, x), ("b", BigDecimal.apply(x))))
+      .toDF("c1", "c2", "shortc", "intc", "longc", "doublec", "bigdecimalc", "arrayc", "structc")
+
+
+    // Saves dataframe to carbon file
+    df.write.format("carbon").save(warehouse1 + "/test_folder/")
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      spark
+        .sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint,  doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
+
+      checkAnswer(spark.sql("select * from test123"),
+        spark.read.format("carbon").load(warehouse1 + "/test_folder/"))
+    }
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    spark.sql("drop table if exists test123")
+  }
+
+  test("test read with df write with empty data") {
+    spark.sql("drop table if exists test123")
+    spark.sql("drop table if exists test123_par")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    // Saves dataframe to carbon file
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      spark
+        .sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint,  doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
+
+      spark
+        .sql(s"create table test123_par (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint,  doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
+      TestUtil
+        .checkAnswer(spark.sql("select count(*) from test123"),
+          spark.sql("select count(*) from test123_par"))
+    }
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    spark.sql("drop table if exists test123")
+    spark.sql("drop table if exists test123_par")
+  }
+
+  test("test write with nosort columns") {
+    spark.sql("drop table if exists test123")
+    spark.sql("drop table if exists test123_par")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x.toShort , x, x.toLong, x.toDouble, BigDecimal.apply(x),  Array(x+1, x), ("b", BigDecimal.apply(x))))
+      .toDF("c1", "c2", "shortc", "intc", "longc", "doublec", "bigdecimalc", "arrayc", "structc")
+
+
+    // Saves dataframe to carbon file
+    df.write.format("parquet").saveAsTable("test123_par")
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      spark
+        .sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc bigint,  doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>) using carbon options('sort_columns'='') location '$warehouse1/test_folder/'")
+
+      spark.sql(s"insert into test123 select * from test123_par")
+      checkAnswer(spark.sql("select * from test123"), spark.sql(s"select * from test123_par"))
+    }
+    spark.sql("drop table if exists test123")
+    spark.sql("drop table if exists test123_par")
+  }
+
+  test("test complex columns mismatch") {
+    spark.sql("drop table if exists array_com_hive")
+    spark.sql(s"drop table if exists array_com")
+    spark.sql("create table array_com_hive (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, EDUCATED string, IS_MARRIED string, ARRAY_INT array<int>,ARRAY_STRING array<string>,ARRAY_DATE array<timestamp>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT double, HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items terminated by '$'")
+    spark.sql(s"load data local inpath '$resource/Array.csv' into table array_com_hive")
+    spark.sql("create table Array_com (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, EDUCATED string, IS_MARRIED string, ARRAY_INT array<int>,ARRAY_STRING array<string>,ARRAY_DATE array<timestamp>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT double, HQ_DEPOSIT double) using carbon")
+    spark.sql("insert into Array_com select * from array_com_hive")
+    TestUtil.checkAnswer(spark.sql("select * from Array_com order by CUST_ID ASC limit 3"), spark.sql("select * from array_com_hive order by CUST_ID ASC limit 3"))
+    spark.sql("drop table if exists array_com_hive")
+    spark.sql(s"drop table if exists array_com")
+  }
+
+  test("test complex columns fail while insert ") {
+    spark.sql("drop table if exists STRUCT_OF_ARRAY_com_hive")
+    spark.sql(s"drop table if exists STRUCT_OF_ARRAY_com")
+    spark.sql(" create table STRUCT_OF_ARRAY_com_hive (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, EDUCATED string, IS_MARRIED string, STRUCT_OF_ARRAY struct<ID: int,CHECK_DATE: timestamp ,SNo: array<int>,sal1: array<double>,state: array<string>,date1: array<timestamp>>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT float, HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items terminated by '$' map keys terminated by '&'")
+    spark.sql(s"load data local inpath '$resource/structofarray.csv' into table STRUCT_OF_ARRAY_com_hive")
+    spark.sql("create table STRUCT_OF_ARRAY_com (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, EDUCATED string, IS_MARRIED string, STRUCT_OF_ARRAY struct<ID: int,CHECK_DATE: timestamp,SNo: array<int>,sal1: array<double>,state: array<string>,date1: array<timestamp>>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT double, HQ_DEPOSIT double) using carbon")
+    spark.sql(" insert into STRUCT_OF_ARRAY_com select * from STRUCT_OF_ARRAY_com_hive")
+    TestUtil.checkAnswer(spark.sql("select * from STRUCT_OF_ARRAY_com  order by CUST_ID ASC"), spark.sql("select * from STRUCT_OF_ARRAY_com_hive  order by CUST_ID ASC"))
+    spark.sql("drop table if exists STRUCT_OF_ARRAY_com_hive")
+    spark.sql(s"drop table if exists STRUCT_OF_ARRAY_com")
+  }
+
+  test("test partition error in carbon") {
+    spark.sql("drop table if exists carbon_par")
+    spark.sql("drop table if exists parquet_par")
+    spark.sql("create table carbon_par (name string, age int, country string) using carbon partitioned by (country)")
+    spark.sql("insert into carbon_par select 'b', '12', 'aa'")
+    spark.sql("create table parquet_par (name string, age int, country string) using carbon partitioned by (country)")
+    spark.sql("insert into parquet_par select 'b', '12', 'aa'")
+    checkAnswer(spark.sql("select * from carbon_par"), spark.sql("select * from parquet_par"))
+    spark.sql("drop table if exists carbon_par")
+    spark.sql("drop table if exists parquet_par")
+  }
+
+  test("test more cols error in carbon") {
+    spark.sql("drop table if exists h_jin")
+    spark.sql("drop table if exists c_jin")
+    spark.sql(s"""create table h_jin(RECORD_ID string,
+      CDR_ID string,LOCATION_CODE int,SYSTEM_ID string,
+      CLUE_ID string,HIT_ELEMENT string,CARRIER_CODE string,CAP_TIME date,
+      DEVICE_ID string,DATA_CHARACTER string,
+      NETCELL_ID string,NETCELL_TYPE int,EQU_CODE string,CLIENT_MAC string,
+      SERVER_MAC string,TUNNEL_TYPE string,TUNNEL_IP_CLIENT string,TUNNEL_IP_SERVER string,
+      TUNNEL_ID_CLIENT string,TUNNEL_ID_SERVER string,SIDE_ONE_TUNNEL_ID string,SIDE_TWO_TUNNEL_ID string,
+      CLIENT_IP string,SERVER_IP string,TRANS_PROTOCOL string,CLIENT_PORT int,SERVER_PORT int,APP_PROTOCOL string,
+      CLIENT_AREA bigint,SERVER_AREA bigint,LANGUAGE string,STYPE string,SUMMARY string,FILE_TYPE string,FILENAME string,
+      FILESIZE string,BILL_TYPE string,ORIG_USER_NUM string,USER_NUM string,USER_IMSI string,
+      USER_IMEI string,USER_BELONG_AREA_CODE string,USER_BELONG_COUNTRY_CODE string,
+      USER_LONGITUDE double,USER_LATITUDE double,USER_MSC string,USER_BASE_STATION string,
+      USER_CURR_AREA_CODE string,USER_CURR_COUNTRY_CODE string,USER_SIGNAL_POINT string,USER_IP string,
+      ORIG_OPPO_NUM string,OPPO_NUM string,OPPO_IMSI string,OPPO_IMEI string,OPPO_BELONG_AREA_CODE string,
+      OPPO_BELONG_COUNTRY_CODE string,OPPO_LONGITUDE double,OPPO_LATITUDE double,OPPO_MSC string,OPPO_BASE_STATION string,
+      OPPO_CURR_AREA_CODE string,OPPO_CURR_COUNTRY_CODE string,OPPO_SIGNAL_POINT string,OPPO_IP string,RING_TIME timestamp,
+      CALL_ESTAB_TIME timestamp,END_TIME timestamp,CALL_DURATION bigint,CALL_STATUS_CODE int,DTMF string,ORIG_OTHER_NUM string,
+      OTHER_NUM string,ROAM_NUM string,SEND_TIME timestamp,ORIG_SMS_CONTENT string,ORIG_SMS_CODE int,SMS_CONTENT string,SMS_NUM int,
+      SMS_COUNT int,REMARK string,CONTENT_STATUS int,VOC_LENGTH bigint,FAX_PAGE_COUNT int,COM_OVER_CAUSE int,ROAM_TYPE int,SGSN_ADDR string,GGSN_ADDR string,
+      PDP_ADDR string,APN_NI string,APN_OI string,CARD_ID string,TIME_OUT int,LOGIN_TIME timestamp,USER_IMPU string,OPPO_IMPU string,USER_LAST_IMPI string,
+      USER_CURR_IMPI string,SUPSERVICE_TYPE bigint,SUPSERVICE_TYPE_SUBCODE bigint,SMS_CENTERNUM string,USER_LAST_LONGITUDE double,USER_LAST_LATITUDE double,
+      USER_LAST_MSC string,USER_LAST_BASE_STATION string,LOAD_ID bigint,P_CAP_TIME string)  ROW format delimited FIELDS terminated by '|'""".stripMargin)
+    spark.sql(s"load data local inpath '$resource/j2.csv' into table h_jin")
+    spark.sql(s"""create table c_jin(RECORD_ID string,
+      CDR_ID string,LOCATION_CODE int,SYSTEM_ID string,
+      CLUE_ID string,HIT_ELEMENT string,CARRIER_CODE string,CAP_TIME date,
+      DEVICE_ID string,DATA_CHARACTER string,
+      NETCELL_ID string,NETCELL_TYPE int,EQU_CODE string,CLIENT_MAC string,
+      SERVER_MAC string,TUNNEL_TYPE string,TUNNEL_IP_CLIENT string,TUNNEL_IP_SERVER string,
+      TUNNEL_ID_CLIENT string,TUNNEL_ID_SERVER string,SIDE_ONE_TUNNEL_ID string,SIDE_TWO_TUNNEL_ID string,
+      CLIENT_IP string,SERVER_IP string,TRANS_PROTOCOL string,CLIENT_PORT int,SERVER_PORT int,APP_PROTOCOL string,
+      CLIENT_AREA string,SERVER_AREA string,LANGUAGE string,STYPE string,SUMMARY string,FILE_TYPE string,FILENAME string,
+      FILESIZE string,BILL_TYPE string,ORIG_USER_NUM string,USER_NUM string,USER_IMSI string,
+      USER_IMEI string,USER_BELONG_AREA_CODE string,USER_BELONG_COUNTRY_CODE string,
+      USER_LONGITUDE double,USER_LATITUDE double,USER_MSC string,USER_BASE_STATION string,
+      USER_CURR_AREA_CODE string,USER_CURR_COUNTRY_CODE string,USER_SIGNAL_POINT string,USER_IP string,
+      ORIG_OPPO_NUM string,OPPO_NUM string,OPPO_IMSI string,OPPO_IMEI string,OPPO_BELONG_AREA_CODE string,
+      OPPO_BELONG_COUNTRY_CODE string,OPPO_LONGITUDE double,OPPO_LATITUDE double,OPPO_MSC string,OPPO_BASE_STATION string,
+      OPPO_CURR_AREA_CODE string,OPPO_CURR_COUNTRY_CODE string,OPPO_SIGNAL_POINT string,OPPO_IP string,RING_TIME timestamp,
+      CALL_ESTAB_TIME timestamp,END_TIME timestamp,CALL_DURATION string,CALL_STATUS_CODE int,DTMF string,ORIG_OTHER_NUM string,
+      OTHER_NUM string,ROAM_NUM string,SEND_TIME timestamp,ORIG_SMS_CONTENT string,ORIG_SMS_CODE int,SMS_CONTENT string,SMS_NUM int,
+      SMS_COUNT int,REMARK string,CONTENT_STATUS int,VOC_LENGTH string,FAX_PAGE_COUNT int,COM_OVER_CAUSE int,ROAM_TYPE int,SGSN_ADDR string,GGSN_ADDR string,
+      PDP_ADDR string,APN_NI string,APN_OI string,CARD_ID string,TIME_OUT int,LOGIN_TIME timestamp,USER_IMPU string,OPPO_IMPU string,USER_LAST_IMPI string,
+      USER_CURR_IMPI string,SUPSERVICE_TYPE string,SUPSERVICE_TYPE_SUBCODE string,SMS_CENTERNUM string,USER_LAST_LONGITUDE double,USER_LAST_LATITUDE double,
+      USER_LAST_MSC string,USER_LAST_BASE_STATION string,LOAD_ID string,P_CAP_TIME string) using carbon""".stripMargin)
+    spark.sql(s"""insert into c_jin
+      select
+      RECORD_ID,CDR_ID,LOCATION_CODE,SYSTEM_ID,
+      CLUE_ID,HIT_ELEMENT,CARRIER_CODE,CAP_TIME,
+      DEVICE_ID,DATA_CHARACTER,NETCELL_ID,NETCELL_TYPE,EQU_CODE,CLIENT_MAC,
+      SERVER_MAC,TUNNEL_TYPE,TUNNEL_IP_CLIENT,TUNNEL_IP_SERVER,
+      TUNNEL_ID_CLIENT,TUNNEL_ID_SERVER,SIDE_ONE_TUNNEL_ID,SIDE_TWO_TUNNEL_ID,
+      CLIENT_IP,SERVER_IP,TRANS_PROTOCOL,CLIENT_PORT,SERVER_PORT,APP_PROTOCOL,
+      CLIENT_AREA,SERVER_AREA,LANGUAGE,STYPE,SUMMARY,FILE_TYPE,FILENAME,
+      FILESIZE,BILL_TYPE,ORIG_USER_NUM,USER_NUM,USER_IMSI,
+      USER_IMEI,USER_BELONG_AREA_CODE,USER_BELONG_COUNTRY_CODE,
+      USER_LONGITUDE,USER_LATITUDE,USER_MSC,USER_BASE_STATION,
+      USER_CURR_AREA_CODE,USER_CURR_COUNTRY_CODE,USER_SIGNAL_POINT,USER_IP,
+      ORIG_OPPO_NUM,OPPO_NUM,OPPO_IMSI,OPPO_IMEI,OPPO_BELONG_AREA_CODE,
+      OPPO_BELONG_COUNTRY_CODE,OPPO_LONGITUDE,OPPO_LATITUDE,OPPO_MSC,OPPO_BASE_STATION,
+      OPPO_CURR_AREA_CODE,OPPO_CURR_COUNTRY_CODE,OPPO_SIGNAL_POINT,OPPO_IP,RING_TIME,
+      CALL_ESTAB_TIME,END_TIME,CALL_DURATION,CALL_STATUS_CODE,DTMF,ORIG_OTHER_NUM,
+      OTHER_NUM,ROAM_NUM,SEND_TIME,ORIG_SMS_CONTENT,ORIG_SMS_CODE,SMS_CONTENT,SMS_NUM,
+      SMS_COUNT,REMARK,CONTENT_STATUS,VOC_LENGTH,FAX_PAGE_COUNT,COM_OVER_CAUSE,ROAM_TYPE,SGSN_ADDR,GGSN_ADDR,
+      PDP_ADDR,APN_NI,APN_OI,CARD_ID,TIME_OUT,LOGIN_TIME,USER_IMPU,OPPO_IMPU,USER_LAST_IMPI,
+      USER_CURR_IMPI,SUPSERVICE_TYPE,SUPSERVICE_TYPE_SUBCODE,SMS_CENTERNUM,USER_LAST_LONGITUDE,USER_LAST_LATITUDE,
+      USER_LAST_MSC,USER_LAST_BASE_STATION,LOAD_ID,P_CAP_TIME
+      from h_jin""".stripMargin)
+    assert(spark.sql("select * from c_jin").collect().length == 1)
+    spark.sql("drop table if exists h_jin")
+    spark.sql("drop table if exists c_jin")
+  }
+
 
   override protected def beforeAll(): Unit = {
     drop

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 3f90dd3..1d9c08a 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -294,7 +294,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
     cleanTestData()
   }
 
-  test("Read sdk writer output file without index file should fail") {
+  test("Read sdk writer output file without index file should not fail") {
     buildTestData(false)
     deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
     assert(new File(filePath).exists())
@@ -312,11 +312,8 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       // TO DO
     }
     //org.apache.spark.SparkException: Index file not present to read the carbondata file
-    val exception = intercept[Exception]
-      {
-        spark.sql("select * from sdkOutputTable").show(false)
-      }
-    assert(exception.getMessage().contains("No Index files are present in the table location"))
+    assert(spark.sql("select * from sdkOutputTable").collect().length == 100)
+    assert(spark.sql("select * from sdkOutputTable where name='robot0'").collect().length == 1)
 
     spark.sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
index f9bcb44..1296564 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
@@ -30,21 +30,24 @@ object TestUtil {
   val rootPath = new File(this.getClass.getResource("/").getPath
                           + "../../../..").getCanonicalPath
   val warehouse1 = s"$rootPath/integration/spark-datasource/target/warehouse"
+  val resource = s"$rootPath/integration/spark-datasource/src/test/resources"
   val metastoredb1 = s"$rootPath/integration/spark-datasource/target"
   val spark = SparkSession
     .builder()
+    .enableHiveSupport()
     .master("local")
     .config("spark.sql.warehouse.dir", warehouse1)
     .config("spark.driver.host", "localhost")
     .config("spark.sql.crossJoin.enabled", "true")
     .getOrCreate()
   spark.sparkContext.setLogLevel("ERROR")
+  if (!spark.sparkContext.version.startsWith("2.1")) {
+    spark.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
+  }
 
-  spark.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
-
-  def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String = {
+  def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]):Unit = {
     checkAnswer(df, expectedAnswer.asScala) match {
-      case Some(errorMessage) => errorMessage
+      case Some(errorMessage) => assert(false, errorMessage)
       case None => null
     }
   }
@@ -61,7 +64,10 @@ object TestUtil {
   }
 
   def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
-    checkAnswer(df, expectedAnswer.collect())
+    checkAnswer(df, expectedAnswer.collect()) match {
+      case Some(errorMessage) => assert(false, errorMessage)
+      case None => null
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f537b72/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index b6858e1..15d0994 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -37,8 +37,6 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.loading.BadRecordsLogger;
-import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
@@ -210,14 +208,14 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
     private int[] orderOfData;
 
-    private CarbonDataLoadConfiguration configuration;
-
     private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
 
     private DirectDictionaryGenerator dateDictionaryGenerator;
 
     private DirectDictionaryGenerator timestampDictionaryGenerator;
 
+    private BadRecordLogHolder logHolder = new BadRecordLogHolder();
+
     public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
         boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
         DataType[] dataTypes, CarbonDataLoadConfiguration configuration,
@@ -233,7 +231,6 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       this.noDictionaryMapping = noDictionaryMapping;
       this.dataTypes = dataTypes;
       this.dataFields = configuration.getDataFields();
-      this.configuration = configuration;
       this.orderOfData = orderOfData;
       this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType;
     }
@@ -286,9 +283,6 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
     private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) {
       Object[] newData = new Object[data.length];
-      BadRecordLogHolder logHolder = new BadRecordLogHolder();
-      BadRecordsLogger badRecordLogger =
-          BadRecordsLoggerProvider.createBadRecordLogger(configuration);
       for (int i = 0; i < data.length; i++) {
         if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
           newData[i] = DataTypeUtil
@@ -302,16 +296,6 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
               GenericDataType complextType =
                   dataFieldsWithComplexDataType.get(dataFields[i].getColumn().getOrdinal());
               complextType.writeByteArray(data[orderOfData[i]], dataOutputStream, logHolder);
-              if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
-                badRecordLogger.addBadRecordsToBuilder(data, logHolder.getReason());
-                if (badRecordLogger.isDataLoadFail()) {
-                  String error = "Data load failed due to bad record: " + logHolder.getReason();
-                  if (!badRecordLogger.isBadRecordLoggerEnable()) {
-                    error += "Please enable bad record logger to know the detail reason.";
-                  }
-                  throw new BadRecordFoundException(error);
-                }
-              }
               dataOutputStream.close();
               newData[i] = byteArray.toByteArray();
             } catch (BadRecordFoundException e) {
@@ -339,7 +323,6 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
           }
         }
       }
-      // System.out.println(Arrays.toString(data));
       return newData;
     }