You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/26 09:10:28 UTC

[1/2] carbondata git commit: [CARBONDATA-2606] [Complex DataType Enhancements] Projection PushDown For Complex DataType

Repository: carbondata
Updated Branches:
  refs/heads/master 53a9fa7f8 -> afcaecf20


http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 6ad3d54..7fbbd9f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -23,6 +23,7 @@ import java.util
 import org.apache.avro
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang.CharEncoding
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
@@ -63,7 +64,6 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
     // conversion to GenericData.Record
     val nn = new avro.Schema.Parser().parse(mySchema)
     val record = avroUtil.jsonToAvro(json, mySchema)
-
     try {
       val writer = CarbonWriter.builder
         .outputPath(writerPath).isTransactionalTable(false)
@@ -239,8 +239,6 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
       """.stripMargin
     val pschema= org.apache.avro.Schema.parse(mySchema)
     val records = avroUtil.jsonToAvro(jsonvalue, mySchema)
-
-
     val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema)
     writer.write(records)
     writer.close()
@@ -257,4 +255,258 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
     // drop table should not delete the files
     cleanTestData()
   }
+
+  // test multi level -- 4 levels [array of array of array of struct]
+  test("test ComplexDataType projection for array of array of array of struct") {
+    buildAvroTestDataMultiLevel4Type()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select BuildNum[0][0][0].street from sdkOutputTable"),
+      Seq(Row("abc"), Row("abc"), Row("abc")))
+    checkAnswer(sql("select BuildNum[1][0][0].street from sdkOutputTable"),
+      Seq(Row("abc2"), Row("abc2"), Row("abc2")))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    cleanTestData()
+  }
+
+  def buildAvroTestDataMultiLevel6Type(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataMultiLevel6(1, null)
+  }
+
+  // test multi level -- 6 levels
+  def buildAvroTestDataMultiLevel6(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema =
+      """ {
+        |"type": "record",
+        |	"name": "UserInfo",
+        |	"namespace": "com.apache.schema.schemalevel6_struct",
+        |	"fields": [
+        |		{
+        |			"name": "username",
+        |			"type": "string",
+        |			"default": "NONE"
+        |		},
+        |		{
+        |			"name": "age",
+        |			"type": "int",
+        |			"default": -1
+        |		},
+        |		{
+        |			"name": "phone",
+        |			"type": "string",
+        |			"default": "NONE"
+        |		},
+        |		{
+        |			"name": "housenum",
+        |			"type": "string",
+        |			"default": "NONE"
+        |		},
+        |		{
+        |			"name": "address",
+        |			"type": {
+        |				"type": "record",
+        |				"name": "Mailing_Address",
+        |				"fields": [
+        |					{
+        |						"name": "Address_Detail",
+        |						"type": {
+        |							"type": "record",
+        |							"name": "Address_Detail",
+        |							"fields": [
+        |								{
+        |									"name": "Building_Detail",
+        |									"type": {
+        |										"type": "record",
+        |										"name": "Building_Address",
+        |										"fields": [
+        |											{
+        |												"name": "Society_name",
+        |												"type": "string"
+        |											},
+        |											{
+        |												"name": "building_no",
+        |												"type": "string"
+        |											},
+        |											{
+        |												"name": "house_no",
+        |												"type": "int"
+        |											},
+        |											{
+        |												"name": "Building_Type",
+        |												"type": {
+        |													"type": "record",
+        |													"name": "Building_Type",
+        |													"fields": [
+        |														{
+        |															"name":"Buildingname",
+        |															"type":"string"
+        |														},
+        |														{
+        |															"name":"buildingArea",
+        |															"type":"int"
+        |														},
+        |														{
+        |															"name":"Building_Criteria",
+        |															"type":{
+        |																"type":"record",
+        |																"name":"BuildDet",
+        |																"fields":[
+        |																	{
+        |																		"name":"f1",
+        |																		"type":"int"
+        |																	},
+        |																	{
+        |																		"name":"f2",
+        |																		"type":"string"
+        |																	},
+        |																	{
+        |																		"name":"BuildDetInner",
+        |																		"type":
+        |																			{
+        |																				"type":"record",
+        |																				"name":"BuildInner",
+        |																				"fields":[
+        |																						{
+        |																							"name": "duplex",
+        |																							"type": "boolean"
+        |																						},
+        |																						{
+        |																							"name": "Price",
+        |																							"type": "int"
+        |																						},
+        |																						{
+        |																							"name": "TotalCost",
+        |																							"type": "int"
+        |																						},
+        |																						{
+        |																							"name": "Floor",
+        |																							"type": "int"
+        |																						},
+        |																						{
+        |																							"name": "PhoneNo",
+        |																							"type": "long"
+        |																						},
+        |																						{
+        |																							"name": "value",
+        |																							"type": "string"
+        |																						}
+        |																				]
+        |																			}
+        |																	}
+        |																]
+        |															}
+        |														}
+        |													]
+        |												}
+        |											}
+        |										]
+        |									}
+        |								}
+        |							]
+        |						}
+        |					}
+        |				]
+        |			}
+        |		}
+        |	]
+        |} """.stripMargin
+
+    val json =
+      """ {
+        |"username": "DON",
+        |"age": 21,
+        |"phone": "9888",
+        |"housenum": "44",
+        |"address": {
+        |"Address_Detail": {
+        |"Building_Detail": {
+        |"Society_name": "TTTT",
+        |"building_no": "5",
+        |"house_no": 78,
+        |"Building_Type": {
+        |"Buildingname": "Amaranthus",
+        |"buildingArea": 34,
+        |"Building_Criteria": {
+        |"f1": 23,
+        |"f2": "RRR",
+        |"BuildDetInner": {
+        |"duplex": true,
+        |"Price": 3434,
+        |"TotalCost": 7777,
+        |"Floor": 4,
+        |"PhoneNo": 5656,
+        |"value":"Value"
+        |}
+        |}
+        |}
+        |}
+        |}
+        |}
+        |} """.stripMargin
+
+    WriteFilesWithAvroWriter(rows, mySchema, json)
+  }
+
+
+  test("test ComplexDataType projection for struct of struct -6 levels") {
+    buildAvroTestDataMultiLevel6Type()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"),
+      Seq(Row("DON", 21, "9888", "44", Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34,
+        Row(23, "RRR", Row(true, 3434, 7777, 4, 5656,  "Value")))))))))
+    checkAnswer(sql("select address from sdkOutputTable"),
+      Seq(Row(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))))))
+    checkAnswer(sql("select address.Address_Detail from sdkOutputTable"),
+      Seq(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))))))
+    checkAnswer(sql("select address.Address_Detail.Building_Detail from sdkOutputTable"),
+      Seq(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))))
+    checkAnswer(sql("select address.Address_Detail.Building_Detail.Building_Type from sdkOutputTable"),
+      Seq(Row(Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))))
+    checkAnswer(sql(
+      "select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria from " +
+      "sdkOutputTable"), Seq(Row(Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))
+    checkAnswer(sql(
+      "select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
+      ".BuildDetInner.duplex from sdkOutputTable"), Seq(Row(true)))
+    checkAnswer(sql(
+      "select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
+      ".BuildDetInner.price from sdkOutputTable"), Seq(Row(3434)))
+    checkAnswer(sql(
+      "select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
+      ".BuildDetInner.totalcost from sdkOutputTable"), Seq(Row(7777)))
+    checkAnswer(sql(
+      "select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
+      ".BuildDetInner.floor from sdkOutputTable"), Seq(Row(4)))
+    checkAnswer(sql(
+      "select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
+      ".BuildDetInner.phoneNo from sdkOutputTable"), Seq(Row(5656)))
+    checkAnswer(sql(
+      "select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
+      ".BuildDetInner.value from sdkOutputTable"), Seq(Row("Value")))
+    checkAnswer(sql("select address,address.Address_Detail from sdkOutputTable"),
+      Seq(Row(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))))
+      , Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))))))
+    checkAnswer(sql("select address.Address_Detail.Building_Detail.Building_Type,address.Address_Detail.Building_Detail.Building_Type.Building_Criteria from sdkOutputTable"), Seq(Row(Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))),Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))
+    checkAnswer(sql("select address.Address_Detail,address.Address_Detail.Building_Detail.Building_Type.Building_Criteria from sdkOutputTable"),Seq(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))),Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))
+    checkAnswer(sql("select address.Address_Detail,address.Address_Detail.Building_Detail.Society_name,address.Address_Detail.Building_Detail.Building_Type.Building_Criteria.f1 from sdkOutputTable"),
+      Seq(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))),"TTTT",23)))
+    checkAnswer(sql("select address.Address_Detail.Building_Detail.Society_name,address.Address_Detail.Building_Detail.building_no from sdkOutputTable"),Seq(Row("TTTT","5")))
+    sql("select address.Address_Detail.Building_Detail.Society_name,address.Address_Detail.Building_Detail.building_no from sdkOutputTable where address.Address_Detail.Building_Detail.Society_name ='TTTT'").show(false)
+    sql("DROP TABLE sdkOutputTable")
+    cleanTestData()
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index fc62ba0..fad944c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -17,16 +17,17 @@
 
 package org.apache.spark.sql
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetArrayItem, GetStructField, NamedExpression}
 import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{ArrayType, StructType}
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -67,14 +68,72 @@ case class CarbonDatasourceHadoopRelation(
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
 
   def buildScan(requiredColumns: Array[String],
+      projects: Seq[NamedExpression],
       filters: Array[Filter],
       partitions: Seq[PartitionSpec]): RDD[InternalRow] = {
     val filterExpression: Option[Expression] = filters.flatMap { filter =>
       CarbonFilters.createCarbonFilter(schema, filter)
     }.reduceOption(new AndExpression(_, _))
 
+    // In case of Struct or StructofStruct Complex type, get the project column for given
+    // parent/child field and pushdown the corresponding project column. In case of Array,
+    // ArrayofStruct or StructofArray, pushdown parent column
+    var reqColumns = projects.map {
+      case a@Alias(s: GetStructField, name) =>
+        val arrayTypeExists = s.childSchema.map(x => x.dataType)
+          .filter(dataType => dataType.isInstanceOf[ArrayType])
+        val ifGetArrayItem = s.child.map(x => x.isInstanceOf[GetArrayItem])
+        var ifGetArrayItemExists = false
+        ifGetArrayItem.foreach(ifexists =>
+          if (ifexists.equals(true)) {
+            ifGetArrayItemExists = true
+          }
+        )
+        if (0 == arrayTypeExists.length && ifGetArrayItemExists.equals(false)) {
+          s.toString().replaceAll("#[0-9]*", "").toLowerCase
+        } else if (ifGetArrayItemExists.equals(true)) {
+          s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
+        } else {
+          s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
+        }
+      case a@Alias(s: GetArrayItem, name) =>
+        s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
+      case attributeReference: AttributeReference =>
+        var columnName: String = attributeReference.name
+        requiredColumns.foreach(colName =>
+          if (colName.equalsIgnoreCase(attributeReference.name)) {
+            columnName = colName
+          })
+        columnName
+      case other =>
+        None
+    }
+
+    reqColumns = reqColumns.filter(col => !col.equals(None))
+    var output = new ListBuffer[String]
+
+    if (null != requiredColumns && requiredColumns.nonEmpty) {
+      requiredColumns.foreach(col => {
+
+        if (null != reqColumns && reqColumns.nonEmpty) {
+          reqColumns.foreach(reqCol => {
+            if (!reqCol.toString.equalsIgnoreCase(col) && !reqCol.toString.startsWith(col + ".")) {
+              output += col
+            } else {
+              output += reqCol.toString
+            }
+          })
+        } else {
+          output += col
+        }
+        output = output.distinct
+      })
+    }
+
     val projection = new CarbonProjection
-    requiredColumns.foreach(projection.addColumn)
+    output.toArray.foreach(projection.addColumn)
+
+
     CarbonSession.threadUnset(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)
     val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
     new CarbonScanRDD(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 30db50a..3926ff6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -64,7 +64,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           projects,
           filters,
           (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan(
-            a.map(_.name).toArray, f, p), needDecoder)) :: Nil
+            a.map(_.name).toArray, projects, f, p), needDecoder)) :: Nil
       case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
         if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
             !CarbonDictionaryDecoder.


[2/2] carbondata git commit: [CARBONDATA-2606] [Complex DataType Enhancements] Projection PushDown For Complex DataType

Posted by ra...@apache.org.
[CARBONDATA-2606] [Complex DataType Enhancements] Projection PushDown For Complex DataType

Complex data type enhancements.
(1) Projection push down:
(a) Projection push down is handled only for STRUCT data type.
(b) Even if the STRUCT contains an ARRAY type as child then projection push down is not applicable.
(c) When the STRUCT column is given in projection list, then the projection list can be rewritten in an optimized way considering all the complex columns required.

This closes #2396


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/afcaecf2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/afcaecf2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/afcaecf2

Branch: refs/heads/master
Commit: afcaecf20426f589cea8513831b27e0410a84113
Parents: 53a9fa7
Author: Indhumathi27 <in...@gmail.com>
Authored: Fri Jun 15 14:44:14 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 26 14:40:14 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  52 +-
 .../schema/table/column/CarbonDimension.java    |  13 +
 .../impl/DictionaryBasedResultCollector.java    | 172 ++++-
 .../core/scan/complextypes/ArrayQueryType.java  |  12 +
 .../scan/complextypes/PrimitiveQueryType.java   |  38 +-
 .../core/scan/complextypes/StructQueryType.java |  50 ++
 .../executor/impl/AbstractQueryExecutor.java    |  10 +-
 .../core/scan/executor/util/QueryUtil.java      |  16 +-
 .../scan/executor/util/RestructureUtil.java     |  56 +-
 .../core/scan/filter/GenericQueryType.java      |   7 +
 .../core/scan/model/ProjectionDimension.java    |   8 +
 .../core/scan/model/QueryModelBuilder.java      | 213 ++++++-
 .../core/scan/model/QueryProjection.java        |   1 -
 .../carbondata/core/util/DataTypeUtil.java      |  12 +
 .../src/test/resources/Struct.csv               |  10 +
 .../src/test/resources/StructofStruct.csv       |  10 +
 .../complexType/TestComplexDataType.scala       | 639 +++++++++++++++++++
 ...ransactionalCarbonTableWithComplexType.scala | 258 +++++++-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  65 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 20 files changed, 1600 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index c302b2b..2cb19ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -661,14 +661,47 @@ public class CarbonTable implements Serializable {
   public CarbonDimension getDimensionByName(String tableName, String columnName) {
     CarbonDimension carbonDimension = null;
     List<CarbonDimension> dimList = tableDimensionsMap.get(tableName);
-    for (CarbonDimension dim : dimList) {
-      if (dim.getColName().equalsIgnoreCase(columnName)) {
-        carbonDimension = dim;
-        break;
+    String[] colSplits = columnName.split("\\.");
+    StringBuffer tempColName = new StringBuffer(colSplits[0]);
+    for (String colSplit : colSplits) {
+      if (!tempColName.toString().equalsIgnoreCase(colSplit)) {
+        tempColName = tempColName.append(".").append(colSplit);
+      }
+      carbonDimension = getCarbonDimension(tempColName.toString(), dimList);
+      if (carbonDimension != null && carbonDimension.getListOfChildDimensions() != null) {
+        dimList = carbonDimension.getListOfChildDimensions();
       }
     }
     List<CarbonDimension> implicitDimList = tableImplicitDimensionsMap.get(tableName);
-    for (CarbonDimension dim : implicitDimList) {
+    if (carbonDimension == null) {
+      carbonDimension = getCarbonDimension(columnName, implicitDimList);
+    }
+
+    if (colSplits.length > 1) {
+      List<CarbonDimension> dimLists = tableDimensionsMap.get(tableName);
+      for (CarbonDimension dims : dimLists) {
+        if (dims.getColName().equalsIgnoreCase(colSplits[0])) {
+          // Set the parent Dimension
+          carbonDimension
+              .setComplexParentDimension(getDimensionBasedOnOrdinal(dimLists, dims.getOrdinal()));
+          break;
+        }
+      }
+    }
+    return carbonDimension;
+  }
+
+  /**
+   * Get Dimension for columnName from list of dimensions
+   *
+   * @param columnName
+   * @param dimensions
+   * @return
+   */
+  public static CarbonDimension getCarbonDimension(String columnName,
+      List<CarbonDimension> dimensions) {
+    CarbonDimension carbonDimension = null;
+    for (CarbonDimension dim : dimensions) {
       if (dim.getColName().equalsIgnoreCase(columnName)) {
         carbonDimension = dim;
         break;
@@ -677,6 +710,15 @@ public class CarbonTable implements Serializable {
     return carbonDimension;
   }
 
+  private CarbonDimension getDimensionBasedOnOrdinal(List<CarbonDimension> dimList, int ordinal) {
+    for (CarbonDimension dimension : dimList) {
+      if (dimension.getOrdinal() == ordinal) {
+        return dimension;
+      }
+    }
+    throw new RuntimeException("No Dimension Matches the ordinal value");
+  }
+
   /**
    * @param tableName
    * @param columnName

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
index d15cde7..bec9889 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
@@ -51,6 +51,11 @@ public class CarbonDimension extends CarbonColumn {
    */
   private int complexTypeOrdinal;
 
+  /**
+   * Save the Parent Dimension of the complex Parent Column.
+   */
+  private CarbonDimension complexParentDimension = null;
+
   public CarbonDimension(ColumnSchema columnSchema, int ordinal, int keyOrdinal,
           int columnGroupOrdinal, int complexTypeOrdinal) {
        this(columnSchema, ordinal, 0, keyOrdinal, columnGroupOrdinal, complexTypeOrdinal);
@@ -162,4 +167,12 @@ public class CarbonDimension extends CarbonColumn {
     }
     return true;
   }
+
+  public CarbonDimension getComplexParentDimension() {
+    return complexParentDimension;
+  }
+
+  public void setComplexParentDimension(CarbonDimension complexParentDimension) {
+    this.complexParentDimension = complexParentDimension;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index 60f14a4..495d7de 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.collector.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -27,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
@@ -67,10 +69,33 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
   int noDictionaryColumnIndex;
   int complexTypeColumnIndex;
 
+
   boolean isDimensionExists;
 
+  private int[] surrogateResult;
+  private byte[][] noDictionaryKeys;
+  private byte[][] complexTypeKeyArray;
+
   protected Map<Integer, GenericQueryType> comlexDimensionInfoMap;
 
+  /**
+   * Field of this Map is the parent Column and associated child columns.
+   * Final Projection shuld be a merged list consist of only parents.
+   */
+  private Map<Integer, List<Integer>> parentToChildColumnsMap = new HashMap<>();
+
+  /**
+   * Map to hold the complex parent ordinal of each query dimension
+   */
+  private List<Integer> queryDimensionToComplexParentOrdinal = new ArrayList<>();
+
+  /**
+   * Fields of this Map of Parent Ordinal with the List is the Child Column Dimension and
+   * the corresponding data buffer of that column.
+   */
+  private Map<Integer, Map<CarbonDimension, ByteBuffer>> mergedComplexDimensionDataMap =
+      new HashMap<>();
+
   public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
     queryDimensions = executionInfo.getProjectionDimensions();
@@ -78,6 +103,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
     initDimensionAndMeasureIndexesForFillingData();
     isDimensionExists = queryDimensions.length > 0;
     this.comlexDimensionInfoMap = executionInfo.getComlexDimensionInfoMap();
+
   }
 
   /**
@@ -90,9 +116,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
     // scan the record and add to list
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     int rowCounter = 0;
-    int[] surrogateResult;
-    byte[][] noDictionaryKeys;
-    byte[][] complexTypeKeyArray;
+
     while (scannedResult.hasNext() && rowCounter < batchSize) {
       Object[] row = new Object[queryDimensions.length + queryMeasures.length];
       if (isDimensionExists) {
@@ -102,6 +126,9 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
         dictionaryColumnIndex = 0;
         noDictionaryColumnIndex = 0;
         complexTypeColumnIndex = 0;
+
+        // get the complex columns data of this row
+        fillComplexColumnDataBufferForThisRow();
         for (int i = 0; i < queryDimensions.length; i++) {
           fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
               comlexDimensionInfoMap, row, i);
@@ -119,9 +146,54 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
     return listBasedResult;
   }
 
+  private void fillComplexColumnDataBufferForThisRow() {
+    mergedComplexDimensionDataMap.clear();
+    int noDictionaryComplexColumnIndex = 0;
+    int complexTypeComplexColumnIndex = 0;
+    for (int i = 0; i < queryDimensions.length; i++) {
+      int complexParentOrdinal = queryDimensionToComplexParentOrdinal.get(i);
+      if (complexParentOrdinal != -1) {
+        Map<CarbonDimension, ByteBuffer> childColumnByteBuffer;
+        // Add the parent and the child ordinal to the parentToChildColumnsMap
+        if (mergedComplexDimensionDataMap.get(complexParentOrdinal) == null) {
+          childColumnByteBuffer = new HashMap<>();
+        } else {
+          childColumnByteBuffer = mergedComplexDimensionDataMap.get(complexParentOrdinal);
+        }
+
+        // send the byte buffer for the complex columns. Currently expected columns for
+        // complex types are
+        // a) Complex Columns
+        // b) No Dictionary columns.
+        // TODO have to fill out for dictionary columns. Once the support for push down in
+        // complex dictionary columns comes.
+        ByteBuffer buffer;
+        if (!dictionaryEncodingArray[i]) {
+          if (implictColumnArray[i]) {
+            throw new RuntimeException("Not Supported Column Type");
+          } else if (complexDataTypeArray[i]) {
+            buffer = ByteBuffer.wrap(complexTypeKeyArray[complexTypeComplexColumnIndex++]);
+          } else {
+            buffer = ByteBuffer.wrap(noDictionaryKeys[noDictionaryComplexColumnIndex++]);
+          }
+        } else if (directDictionaryEncodingArray[i]) {
+          throw new RuntimeException("Direct Dictionary Column Type Not Supported Yet.");
+        } else if (complexDataTypeArray[i]) {
+          buffer = ByteBuffer.wrap(complexTypeKeyArray[complexTypeComplexColumnIndex++]);
+        } else {
+          throw new RuntimeException("Not Supported Column Type");
+        }
+
+        childColumnByteBuffer
+            .put(queryDimensions[i].getDimension(), buffer);
+        mergedComplexDimensionDataMap.put(complexParentOrdinal, childColumnByteBuffer);
+      }
+    }
+  }
+
   void fillDimensionData(BlockletScannedResult scannedResult, int[] surrogateResult,
       byte[][] noDictionaryKeys, byte[][] complexTypeKeyArray,
-      Map<Integer, GenericQueryType> comlexDimensionInfoMap, Object[] row, int i) {
+      Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i) {
     if (!dictionaryEncodingArray[i]) {
       if (implictColumnArray[i]) {
         if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
@@ -136,12 +208,26 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
         }
       } else if (complexDataTypeArray[i]) {
         // Complex Type With No Dictionary Encoding.
-        row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
-            .getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+        if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
+          fillRow(complexDimensionInfoMap, row, i,
+              ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+        } else {
+          row[order[i]] =
+              complexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
+                  .getDataBasedOnDataType(
+                      ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+        }
       } else {
-        row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
-            noDictionaryKeys[noDictionaryColumnIndex++],
-            queryDimensions[i].getDimension().getDataType());
+        if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
+          // When the parent Ordinal is not -1 then this is a predicate is being pushed down
+          // for complex column.
+          fillRow(complexDimensionInfoMap, row, i,
+              ByteBuffer.wrap(noDictionaryKeys[noDictionaryColumnIndex++]));
+        } else {
+          row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
+              noDictionaryKeys[noDictionaryColumnIndex++],
+              queryDimensions[i].getDimension().getDataType());
+        }
       }
     } else if (directDictionaryEncodingArray[i]) {
       if (directDictionaryGenerators[i] != null) {
@@ -149,15 +235,45 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
             surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
       }
     } else if (complexDataTypeArray[i]) {
-      row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
-          .getDataBasedOnDataType(
-              ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+      row[order[i]] = complexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
+          .getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
       dictionaryColumnIndex++;
     } else {
       row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
     }
   }
 
+  private void fillRow(Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i,
+      ByteBuffer wrap) {
+    if (parentToChildColumnsMap.get(queryDimensionToComplexParentOrdinal.get(i)).size() > 1) {
+      fillRowForComplexColumn(complexDimensionInfoMap, row, i);
+    } else {
+      row[order[i]] = complexDimensionInfoMap.get(queryDimensionToComplexParentOrdinal.get(i))
+          .getDataBasedOnColumn(wrap, queryDimensions[i].getDimension().getComplexParentDimension(),
+              queryDimensions[i].getDimension());
+    }
+  }
+
+  private void fillRowForComplexColumn(Map<Integer, GenericQueryType> complexDimensionInfoMap,
+      Object[] row, int i) {
+    // When multiple columns are then the first child elements is only going to make
+    // parent Object Array. For all other cases it should be null.
+    // For e.g. a : <b,c,d>. here as a is the parent column and b, c, d are child columns
+    // during traversal when we encounter the first element in list i.e. column 'b'
+    // a will be completely filled. In case when column 'c' and 'd' encountered then
+    // only place null in the output.
+    int complexParentOrdinal = queryDimensionToComplexParentOrdinal.get(i);
+    List<Integer> childColumns = parentToChildColumnsMap.get(complexParentOrdinal);
+    if (childColumns.get(0).equals(queryDimensions[i].getDimension().getOrdinal())) {
+      // Fill out Parent Column.
+      row[order[i]] = complexDimensionInfoMap.get(complexParentOrdinal).getDataBasedOnColumnList(
+          mergedComplexDimensionDataMap.get(queryDimensions[i].getParentDimension().getOrdinal()),
+          queryDimensions[i].getParentDimension());
+    } else {
+      row[order[i]] = null;
+    }
+  }
+
   void fillMeasureData(BlockletScannedResult scannedResult, Object[] row) {
     if (measureInfo.getMeasureDataTypes().length > 0) {
       Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
@@ -181,18 +297,41 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
     Arrays.sort(primitive);
     actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
     int index = 0;
+
+    dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
+    directDictionaryEncodingArray = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
+    implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
+    complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
+
+    parentToChildColumnsMap.clear();
+    queryDimensionToComplexParentOrdinal.clear();
     for (int i = 0; i < queryDimensions.length; i++) {
       if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
           .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         actualIndexInSurrogateKey[index++] =
             Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
       }
+      if (null != queryDimensions[i].getDimension().getComplexParentDimension()) {
+        // Add the parent and the child ordinal to the parentToChildColumnsMap
+        int complexParentOrdinal =
+            queryDimensions[i].getDimension().getComplexParentDimension().getOrdinal();
+        queryDimensionToComplexParentOrdinal.add(complexParentOrdinal);
+        if (parentToChildColumnsMap.get(complexParentOrdinal) == null) {
+          // Add the parent and child ordinal in the map
+          List<Integer> childOrdinals = new ArrayList<>();
+          childOrdinals.add(queryDimensions[i].getDimension().getOrdinal());
+          parentToChildColumnsMap.put(complexParentOrdinal, childOrdinals);
+
+        } else {
+          List<Integer> childOrdinals = parentToChildColumnsMap.get(complexParentOrdinal);
+          childOrdinals.add(queryDimensions[i].getDimension().getOrdinal());
+          parentToChildColumnsMap.put(complexParentOrdinal, childOrdinals);
+        }
+      } else {
+        queryDimensionToComplexParentOrdinal.add(-1);
+      }
     }
 
-    dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
-    directDictionaryEncodingArray = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
-    implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
-    complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
     order = new int[queryDimensions.length + queryMeasures.length];
     for (int i = 0; i < queryDimensions.length; i++) {
       order[i] = queryDimensions[i].getOrdinal();
@@ -206,5 +345,4 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
           .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 81e9651..9541b01 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -20,8 +20,10 @@ package org.apache.carbondata.core.scan.complextypes;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -97,4 +99,14 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
     return DataTypeUtil.getDataTypeConverter().wrapWithGenericArrayData(data);
   }
 
+  @Override public Object getDataBasedOnColumn(ByteBuffer dataBuffer, CarbonDimension parent,
+      CarbonDimension child) {
+    throw new UnsupportedOperationException("Operation Unsupported for ArrayType");
+  }
+
+  @Override public Object getDataBasedOnColumnList(Map<CarbonDimension, ByteBuffer> childBuffer,
+      CarbonDimension presentColumn) {
+    throw new UnsupportedOperationException("Operation Unsupported for ArrayType");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index d7723b3..948b765 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.scan.complextypes;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -29,6 +30,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary
 import org.apache.carbondata.core.keygenerator.mdkey.Bits;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -107,8 +109,32 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
   }
 
   @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
-    Object actualData = null;
+    return getDataObject(dataBuffer, -1);
+  }
+
+  @Override public Object getDataBasedOnColumn(ByteBuffer dataBuffer, CarbonDimension parent,
+      CarbonDimension child) {
+    Object actualData;
+
+    if (parent.getOrdinal() != child.getOrdinal() || null == dataBuffer || !dataBuffer
+        .hasRemaining()) {
+      return null;
+    }
+    int size;
+    if (!DataTypeUtil.isFixedSizeDataType(child.getDataType())) {
+      size = dataBuffer.array().length;
+    } else if (child.getDataType() == DataTypes.TIMESTAMP) {
+      size = DataTypes.LONG.getSizeInBytes();
+    } else {
+      size = child.getDataType().getSizeInBytes();
+    }
+    actualData = getDataObject(dataBuffer, size);
 
+    return actualData;
+  }
+
+  private Object getDataObject(ByteBuffer dataBuffer, int size) {
+    Object actualData;
     if (isDirectDictionary) {
       // Direct Dictionary Column
       byte[] data = new byte[keySize];
@@ -119,8 +145,9 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
           DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(dataType);
       actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
     } else if (!isDictionary) {
-      // No Dictionary Columns
-      int size = dataBuffer.getShort();
+      if (size == -1) {
+        size = dataBuffer.getShort();
+      }
       byte[] value = new byte[size];
       dataBuffer.get(value, 0, size);
       if (dataType == DataTypes.DATE) {
@@ -142,8 +169,11 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
       String dictionaryValueForKey = dictionary.getDictionaryValueForKey(surrgateValue);
       actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, this.dataType);
     }
-
     return actualData;
   }
 
+  @Override public Object getDataBasedOnColumnList(Map<CarbonDimension, ByteBuffer> childBuffer,
+      CarbonDimension presentColumn) {
+    return getDataBasedOnColumn(childBuffer.get(presentColumn), presentColumn, presentColumn);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 301eb5a..c607f84 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -109,4 +111,52 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
     }
     return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
   }
+
+  @Override public Object getDataBasedOnColumn(ByteBuffer dataBuffer, CarbonDimension parent,
+      CarbonDimension child) {
+    int childLength;
+    if (parent.getOrdinal() < child.getOrdinal()) {
+      childLength = parent.getNumberOfChild();
+      Object[] fields = new Object[childLength];
+      for (int i = 0; i < childLength; i++) {
+        fields[i] = children.get(i)
+            .getDataBasedOnColumn(dataBuffer, parent.getListOfChildDimensions().get(i), child);
+      }
+      return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
+    } else if (parent.getOrdinal() > child.getOrdinal()) {
+      return null;
+    } else {
+      //      childLength = dataBuffer.getShort();
+      Object field = getDataBasedOnDataType(dataBuffer);
+      return field;
+    }
+  }
+
+  @Override public Object getDataBasedOnColumnList(Map<CarbonDimension, ByteBuffer> childBuffer,
+      CarbonDimension presentColumn) {
+    // Traverse through the Complex Tree and check if the at present column is same as the
+    // column present in the child column then fill it up else add null to the column.
+    if (childBuffer.get(presentColumn) != null) {
+      if (presentColumn.getNumberOfChild() > 0) {
+        // This is complex Column. And all its child will be present in the corresponding data
+        // buffer.
+        Object field = getDataBasedOnDataType(childBuffer.get(presentColumn));
+        return field;
+      } else {
+        // This is a child column with with primitive data type.
+        Object field = children.get(0)
+            .getDataBasedOnColumn(childBuffer.get(presentColumn), presentColumn, presentColumn);
+        return field;
+      }
+    } else {
+      int childLength;
+      childLength = presentColumn.getNumberOfChild();
+      Object[] fields = new Object[childLength];
+      for (int i = 0; i < childLength; i++) {
+        fields[i] = children.get(i)
+            .getDataBasedOnColumnList(childBuffer, presentColumn.getListOfChildDimensions().get(i));
+      }
+      return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index f365045..e7bbea0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -521,7 +521,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     List<Integer> parentBlockIndexList = new ArrayList<Integer>();
     for (ProjectionDimension queryDimension : queryDimensions) {
       if (queryDimension.getDimension().getDataType().isComplexType()) {
-        parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
+        if (null != queryDimension.getDimension().getComplexParentDimension()) {
+          if (queryDimension.getDimension().isComplex()) {
+            parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
+          } else {
+            parentBlockIndexList.add(queryDimension.getParentDimension().getOrdinal());
+          }
+        } else {
+          parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
+        }
       }
     }
     return ArrayUtils

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index bb33b24..7986e8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -708,10 +708,24 @@ public class QueryUtil {
       Set<CarbonDimension> filterDimensions) {
     Map<Integer, GenericQueryType> complexTypeMap = new HashMap<Integer, GenericQueryType>();
     for (ProjectionDimension dimension : queryDimensions) {
-      CarbonDimension actualDimension = dimension.getDimension();
+      CarbonDimension actualDimension;
+      CarbonDimension complexDimension = null;
+      if (null != dimension.getDimension().getComplexParentDimension()) {
+        // get the parent dimension column.
+        actualDimension = dimension.getParentDimension();
+        if (dimension.getDimension().isComplex()) {
+          complexDimension = dimension.getDimension();
+        }
+      } else {
+        actualDimension = dimension.getDimension();
+      }
       if (actualDimension.getNumberOfChild() == 0) {
         continue;
       }
+      if (complexDimension != null) {
+        fillParentDetails(dimensionToBlockIndexMap, complexDimension, complexTypeMap,
+            eachComplexColumnValueSize, columnIdToDictionaryMap);
+      }
       fillParentDetails(dimensionToBlockIndexMap, actualDimension, complexTypeMap,
           eachComplexColumnValueSize, columnIdToDictionaryMap);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index c69ba6c..a74b73c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -109,7 +110,13 @@ public class RestructureUtil {
         for (CarbonDimension tableDimension : tableComplexDimension) {
           if (isColumnMatches(isTransactionalTable, queryDimension.getDimension(),
               tableDimension)) {
-            ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
+            ProjectionDimension currentBlockDimension = null;
+            // If projection dimension is child of struct field and contains Parent Ordinal
+            if (null != queryDimension.getDimension().getComplexParentDimension()) {
+              currentBlockDimension = new ProjectionDimension(queryDimension.getDimension());
+            } else {
+              currentBlockDimension = new ProjectionDimension(tableDimension);
+            }
             // TODO: for complex dimension set scale and precision by traversing
             // the child dimensions
             currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
@@ -156,8 +163,51 @@ public class RestructureUtil {
     // If it is non transactional table just check the column names, no need to validate
     // column id as multiple sdk's output placed in a single folder doesn't have same
     // column ID but can have same column name
-    return (tableColumn.getColumnId().equals(queryColumn.getColumnId()) ||
-        (!isTransactionalTable && tableColumn.getColName().equals(queryColumn.getColName())));
+    if (tableColumn.getDataType().isComplexType() && !(tableColumn.getDataType().getId()
+        == DataTypes.ARRAY_TYPE_ID)) {
+      if (tableColumn.getColumnId().equals(queryColumn.getColumnId())) {
+        return true;
+      } else {
+        return isColumnMatchesStruct(tableColumn, queryColumn);
+      }
+    } else {
+      return (tableColumn.getColumnId().equals(queryColumn.getColumnId()) || (!isTransactionalTable
+          && tableColumn.getColName().equals(queryColumn.getColName())));
+    }
+  }
+
+  /**
+   * In case of Multilevel Complex column - STRUCT/STRUCTofSTRUCT, traverse all the child dimension
+   * to check column Id
+   *
+   * @param tableColumn
+   * @param queryColumn
+   * @return
+   */
+  private static boolean isColumnMatchesStruct(CarbonColumn tableColumn, CarbonColumn queryColumn) {
+    if (tableColumn instanceof CarbonDimension) {
+      List<CarbonDimension> parentDimension =
+          ((CarbonDimension) tableColumn).getListOfChildDimensions();
+      CarbonDimension carbonDimension = null;
+      String[] colSplits = queryColumn.getColName().split("\\.");
+      StringBuffer tempColName = new StringBuffer(colSplits[0]);
+      for (String colSplit : colSplits) {
+        if (!tempColName.toString().equalsIgnoreCase(colSplit)) {
+          tempColName = tempColName.append(".").append(colSplit);
+        }
+        carbonDimension = CarbonTable.getCarbonDimension(tempColName.toString(), parentDimension);
+        if (carbonDimension != null) {
+          if (carbonDimension.getColumnSchema().getColumnUniqueId()
+              .equalsIgnoreCase(queryColumn.getColumnId())) {
+            return true;
+          }
+          if (carbonDimension.getListOfChildDimensions() != null) {
+            parentDimension = carbonDimension.getListOfChildDimensions();
+          }
+        }
+      }
+    }
+    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
index fe65669..6c087d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
@@ -20,8 +20,10 @@ package org.apache.carbondata.core.scan.filter;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 
 public interface GenericQueryType {
@@ -45,4 +47,9 @@ public interface GenericQueryType {
 
   Object getDataBasedOnDataType(ByteBuffer dataBuffer);
 
+  Object getDataBasedOnColumn(ByteBuffer dataBuffer, CarbonDimension parent, CarbonDimension child);
+
+  Object getDataBasedOnColumnList(Map<CarbonDimension, ByteBuffer> childBuffer,
+      CarbonDimension presentColumn);
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/model/ProjectionDimension.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/ProjectionDimension.java b/core/src/main/java/org/apache/carbondata/core/scan/model/ProjectionDimension.java
index 806de85..b5b0000 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/ProjectionDimension.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/ProjectionDimension.java
@@ -42,4 +42,12 @@ public class ProjectionDimension extends ProjectionColumn {
     return dimension;
   }
 
+  public CarbonDimension getParentDimension() {
+    if (null != dimension.getComplexParentDimension()) {
+      return dimension.getComplexParentDimension();
+    } else {
+      throw new RuntimeException("Dimension doesn't have Parent Dimension linked.");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index 198608f..c75f2bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -17,9 +17,16 @@
 
 package org.apache.carbondata.core.scan.model;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -35,6 +42,11 @@ public class QueryModelBuilder {
   private DataTypeConverter dataTypeConverter;
   private boolean forcedDetailRawQuery;
   private boolean readPageByPage;
+  /**
+   * log information
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(QueryModelBuilder.class.getName());
 
   public QueryModelBuilder(CarbonTable table) {
     this.table = table;
@@ -49,23 +61,214 @@ public class QueryModelBuilder {
     for (String projectionColumnName : projectionColumns) {
       CarbonDimension dimension = table.getDimensionByName(factTableName, projectionColumnName);
       if (dimension != null) {
-        projection.addDimension(dimension, i);
-        i++;
+        CarbonDimension complexParentDimension = dimension.getComplexParentDimension();
+        if (null != complexParentDimension && dimension.hasEncoding(Encoding.DICTIONARY)) {
+          if (!isAlreadyExists(complexParentDimension, projection.getDimensions())) {
+            projection.addDimension(complexParentDimension, i);
+            i++;
+          }
+        } else {
+          projection.addDimension(dimension, i);
+          i++;
+        }
       } else {
         CarbonMeasure measure = table.getMeasureByName(factTableName, projectionColumnName);
         if (measure == null) {
-          throw new RuntimeException(projectionColumnName +
-              " column not found in the table " + factTableName);
+          throw new RuntimeException(
+              projectionColumnName + " column not found in the table " + factTableName);
         }
         projection.addMeasure(measure, i);
         i++;
       }
     }
-
+    projection = optimizeProjectionForComplexColumns(projection, projectionColumns, factTableName);
     this.projection = projection;
+    for (ProjectionDimension projectionDimension : projection.getDimensions()) {
+      LOGGER.info("Project Columns: " + projectionDimension.getColumnName());
+    }
+    for (ProjectionMeasure projectionMeasure : projection.getMeasures()) {
+      LOGGER.info("Project Columns: " + projectionMeasure.getColumnName());
+    }
     return this;
   }
 
+  /**
+   * For complex dimensions, check if the dimension already exists in the projection list or not
+   *
+   * @param dimension
+   * @param projectionDimensions
+   * @return
+   */
+  private boolean isAlreadyExists(CarbonDimension dimension,
+      List<ProjectionDimension> projectionDimensions) {
+    boolean exists = false;
+    for (ProjectionDimension projectionDimension : projectionDimensions) {
+      if (dimension.getColName().equals(projectionDimension.getColumnName())) {
+        exists = true;
+        break;
+      }
+    }
+    return exists;
+  }
+
+  private QueryProjection optimizeProjectionForComplexColumns(QueryProjection projection,
+      String[] projectionColumns, String factTableName) {
+    // Get the List of Complex Column Projection.
+    // The optimization techniques which can be applied are
+    // A. Merging in Driver Side
+    // B. Merging in the result Collector side.
+    // Merging is driver side cases are
+    // Driver merging will eliminate one of the CarbonDimension.
+    // Executor merging will merge the column output in Result Collector.
+    // In this routine we are going to do driver merging and leave executor merging.
+    Map<Integer, List<Integer>> complexColumnMap = new HashMap<>();
+    List<ProjectionDimension> carbonDimensions = projection.getDimensions();
+    // Traverse and find out if the top most parent of projection column is already there
+    List<CarbonDimension> projectionDimenesionToBeMerged = new ArrayList<>();
+    for (ProjectionDimension projectionDimension : carbonDimensions) {
+      CarbonDimension complexParentDimension =
+          projectionDimension.getDimension().getComplexParentDimension();
+      if (null != complexParentDimension && isAlreadyExists(complexParentDimension,
+          carbonDimensions)) {
+        projectionDimenesionToBeMerged.add(projectionDimension.getDimension());
+      }
+    }
+
+    if (projectionDimenesionToBeMerged.size() != 0) {
+      projection =
+          removeMergedDimensions(projectionDimenesionToBeMerged, projectionColumns, factTableName);
+      carbonDimensions = projection.getDimensions();
+    }
+
+    for (ProjectionDimension cols : carbonDimensions) {
+      // get all the Projections with Parent Ordinal Set.
+      if (null != cols.getDimension().getComplexParentDimension()) {
+        if (complexColumnMap.get(cols.getDimension().getComplexParentDimension().getOrdinal())
+            != null) {
+          List<Integer> childColumns =
+              complexColumnMap.get(cols.getDimension().getComplexParentDimension().getOrdinal());
+          childColumns.add(cols.getDimension().getOrdinal());
+          complexColumnMap
+              .put(cols.getDimension().getComplexParentDimension().getOrdinal(), childColumns);
+        } else {
+          List<Integer> childColumns = new ArrayList<>();
+          childColumns.add(cols.getDimension().getOrdinal());
+          complexColumnMap
+              .put(cols.getDimension().getComplexParentDimension().getOrdinal(), childColumns);
+        }
+      }
+    }
+
+    // Traverse the Map to Find any columns are parent.
+    for (Map.Entry<Integer, List<Integer>> entry : complexColumnMap.entrySet()) {
+      List<Integer> childOrdinals = entry.getValue();
+      if (childOrdinals.size() > 1) {
+        // In case of more that one child, have to check if the child columns are in the same path
+        // and have a common parent.
+        Collections.sort(childOrdinals);
+        List<CarbonDimension> mergedDimensions = mergeChildColumns(childOrdinals);
+        if (mergedDimensions.size() > 0) {
+          projection = removeMergedDimensions(mergedDimensions, projectionColumns, factTableName);
+        }
+      }
+    }
+    return projection;
+  }
+
+  /**
+   * Remove the dimensions from the projection list which are merged
+   *
+   * @param mergedDimensions
+   * @param projectionColumns
+   * @param factTableName
+   * @return
+   */
+  private QueryProjection removeMergedDimensions(List<CarbonDimension> mergedDimensions,
+      String[] projectionColumns, String factTableName) {
+    QueryProjection queryProjection = new QueryProjection();
+    int i = 0;
+    for (String projectionColumnName : projectionColumns) {
+      CarbonDimension dimension = table.getDimensionByName(factTableName, projectionColumnName);
+      if (dimension != null) {
+        if (!mergedDimensions.contains(dimension)) {
+          if (!isAlreadyExists(dimension, queryProjection.getDimensions())) {
+            queryProjection.addDimension(dimension, i);
+            i++;
+          }
+        }
+      } else {
+        CarbonMeasure measure = table.getMeasureByName(factTableName, projectionColumnName);
+        if (measure == null) {
+          throw new RuntimeException(
+              projectionColumnName + " column not found in the table " + factTableName);
+        }
+        queryProjection.addMeasure(measure, i);
+        i++;
+      }
+    }
+    return queryProjection;
+  }
+
+  private List<CarbonDimension> mergeChildColumns(List<Integer> childOrdinals) {
+    // Check If children if they are in the path of not.
+    List<CarbonDimension> mergedChild = new ArrayList<>();
+    List<CarbonDimension> dimList = table.getDimensions();
+    for (int i = 0; i < childOrdinals.size(); i++) {
+      for (int j = i; j < childOrdinals.size(); j++) {
+        CarbonDimension parentDimension = getDimensionBasedOnOrdinal(dimList, childOrdinals.get(i));
+        CarbonDimension childDimension = getDimensionBasedOnOrdinal(dimList, childOrdinals.get(j));
+        if (!mergedChild.contains(childOrdinals.get(j)) && checkChildsInSamePath(parentDimension,
+            childDimension)) {
+          mergedChild.add(childDimension);
+        }
+      }
+    }
+    return mergedChild;
+  }
+
+  private boolean checkChildsInSamePath(CarbonDimension parentDimension,
+      CarbonDimension childDimension) {
+    if (parentDimension.getColName().equals(childDimension.getColName())) {
+      return false;
+    } else if (checkForChildColumns(parentDimension, childDimension)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private boolean checkForChildColumns(CarbonDimension parentDimension,
+      CarbonDimension childDimension) {
+    boolean output = false;
+    if (parentDimension.getOrdinal() == childDimension.getOrdinal()) {
+      output = true;
+    } else if (parentDimension.getNumberOfChild() > 0) {
+      for (int i = 0; i < parentDimension.getNumberOfChild() && !output; i++) {
+        output =
+            checkForChildColumns(parentDimension.getListOfChildDimensions().get(i), childDimension);
+      }
+    } else {
+      output = false;
+    }
+    return output;
+  }
+
+  private CarbonDimension getDimensionBasedOnOrdinal(List<CarbonDimension> dimList,
+      Integer ordinal) {
+    for (CarbonDimension dims : dimList) {
+      if (dims.getOrdinal() == ordinal) {
+        return dims;
+      } else if (dims.getNumberOfChild() > 0) {
+        CarbonDimension dimensionBasedOnOrdinal =
+            getDimensionBasedOnOrdinal(dims.getListOfChildDimensions(), ordinal);
+        if (null != dimensionBasedOnOrdinal) {
+          return dimensionBasedOnOrdinal;
+        }
+      }
+    }
+    return null;
+  }
+
   public QueryModelBuilder projectAllColumns() {
     QueryProjection projection = new QueryProjection();
     List<CarbonDimension> dimensions = table.getDimensions();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/scan/model/QueryProjection.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryProjection.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryProjection.java
index d4496e0..7a008cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryProjection.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryProjection.java
@@ -46,7 +46,6 @@ public class QueryProjection {
    */
   private List<ProjectionMeasure> measures =
       new ArrayList<ProjectionMeasure>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
   /**
    * Constructor created with database name and table name.
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c84b0da..ff41b9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -417,6 +417,18 @@ public final class DataTypeUtil {
     }
   }
 
+  /**
+   * Returns true for fixed length DataTypes.
+   * @param dataType
+   * @return
+   */
+  public static boolean isFixedSizeDataType(DataType dataType) {
+    if (dataType == DataTypes.STRING || DataTypes.isDecimal(dataType)) {
+      return false;
+    } else {
+      return true;
+    }
+  }
 
   /**
    * Below method will be used to convert the data passed to its actual data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/integration/spark-common-test/src/test/resources/Struct.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/Struct.csv b/integration/spark-common-test/src/test/resources/Struct.csv
new file mode 100644
index 0000000..debfcd0
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/Struct.csv
@@ -0,0 +1,10 @@
+1,11$abc$10.00
+2,12$abcd$10.01
+3,13$abce$10.02
+4,14$abcr$10.03
+5,15$abct$10.04
+6,16$abcn$10.05
+7,17$abcq$10.06
+8,18$abcs$10.07
+9,19$abcm$10.08
+10,20$abck$10.09

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/integration/spark-common-test/src/test/resources/StructofStruct.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/StructofStruct.csv b/integration/spark-common-test/src/test/resources/StructofStruct.csv
new file mode 100644
index 0000000..21398ba
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/StructofStruct.csv
@@ -0,0 +1,10 @@
+1,11&abc&10.00
+2,12&abcd&10.01
+3,13&abce&10.02
+4,14&abcr&10.03
+5,15&abct&10.04
+6,16&abcn&10.05
+7,17&abcq&10.06
+8,18&abcs&10.07
+9,19&abcm&10.08
+10,20&abck&10.09

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afcaecf2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
new file mode 100644
index 0000000..524289c
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -0,0 +1,639 @@
+package org.apache.carbondata.integration.spark.testsuite.complexType
+
+import java.sql.Timestamp
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test class of testing projection with complex data type
+ *
+ */
+
+class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
+
+  test("test Projection PushDown for Struct - Integer type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll string,person Struct<detail:int>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values('abc',1)")
+    sql("select roll,person,roll,person.detail from table1").show(false)
+    checkAnswer(sql("select roll,person,person.detail from table1"),
+      Seq(Row("abc", Row(1), 1)))
+    checkAnswer(sql("select person,person.detail from table1"),
+      Seq(Row(Row(1), 1)))
+    checkAnswer(sql("select roll,person from table1"), Seq(Row("abc", Row(1))))
+    checkAnswer(sql("select roll from table1"), Seq(Row("abc")))
+  }
+
+  test("test projection pushDown for Array") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll string,person array<int>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values('abc','1$2$3')")
+    sql("select roll,person,roll,person from table1").show(false)
+    checkAnswer(sql("select roll,person from table1"),
+      Seq(Row("abc", mutable.WrappedArray.make(Array(1, 2, 3)))))
+  }
+
+  test("test Projection PushDown for StructofArray - Integer type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:array<int>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'1:2')")
+    checkAnswer(sql("select person.detail[0] from table1"), Seq(Row(1)))
+    checkAnswer(sql("select person.detail[1] from table1"), Seq(Row(2)))
+    checkAnswer(sql("select roll,person from table1"),
+      Seq(Row(1, Row(mutable.WrappedArray.make(Array(1, 2))))))
+    checkAnswer(sql("select roll,person.detail[0],person,person.detail[1] from table1"),
+      Seq(Row(1, 1, Row(mutable.WrappedArray.make(Array(1, 2))), 2)))
+  }
+
+  test("test Projection PushDown for Struct - String type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:string>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'abc')")
+    checkExistence(sql("select person from table1"), true, "abc")
+    checkAnswer(sql("select roll,person,person.detail from table1"), Seq(Row(1, Row("abc"), "abc")))
+    checkExistence(sql("select person.detail from table1"), true, "abc")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row("abc"))))
+    checkAnswer(sql("select roll,person,roll,person from table1"),
+      Seq(Row(1, Row("abc"), 1, Row("abc"))))
+  }
+
+  test("test Projection PushDown for StructofArray - String type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:array<string>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'abc:bcd')")
+    //    sql("select person from table1").show(false)
+    sql("select person.detail[0] from table1").show(false)
+    checkAnswer(sql("select person.detail[0] from table1"), Seq(Row("abc")))
+    checkAnswer(sql("select person.detail[1] from table1"), Seq(Row("bcd")))
+    checkAnswer(sql("select roll,person from table1"),
+      Seq(Row(1, Row(mutable.WrappedArray.make(Array("abc", "bcd"))))))
+    checkAnswer(sql("select roll,person.detail[0],person,person.detail[1] from table1"),
+      Seq(Row(1, "abc", Row(mutable.WrappedArray.make(Array("abc", "bcd"))), "bcd")))
+  }
+
+  test("test Projection PushDown for Struct - Double type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:double>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,10.00)")
+    checkExistence(sql("select person from table1"), true, "10.0")
+    checkAnswer(sql("select roll,person,person.detail from table1"), Seq(Row(1, Row(10.0), 10.0)))
+    checkExistence(sql("select person.detail from table1"), true, "10.0")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(10.0))))
+  }
+
+  test("test Projection PushDown for StructofArray - Double type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:array<double>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'10.00:20.00')")
+    checkAnswer(sql("select person.detail[0] from table1"), Seq(Row(10.0)))
+    checkAnswer(sql("select person.detail[1] from table1"), Seq(Row(20.0)))
+    checkAnswer(sql("select roll,person from table1"),
+      Seq(Row(1, Row(mutable.WrappedArray.make(Array(10.0, 20.0))))))
+  }
+
+  test("test Projection PushDown for Struct - Decimal type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:decimal(3,2)>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,3.4)")
+    checkExistence(sql("select person from table1"), true, "3")
+    checkExistence(sql("select person.detail from table1"), true, "3")
+    checkAnswer(sql("select roll,person.detail from table1"), Seq(Row(1, 3.40)))
+  }
+
+  test("test Projection PushDown for StructofArray - Decimal type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:array<decimal(3,2)>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'3.4:4.2')")
+    checkAnswer(sql("select person.detail[0] from table1"), Seq(Row(3.40)))
+    checkAnswer(sql("select person.detail[1] from table1"), Seq(Row(4.20)))
+    checkAnswer(sql("select roll,person.detail[0] from table1"), Seq(Row(1, 3.40)))
+  }
+
+  test("test Projection PushDown for Struct - timestamp type") {
+    sql("DROP TABLE IF EXISTS table1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql(
+      "create table table1 (roll int,person Struct<detail:timestamp>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'2018/01/01')")
+    checkExistence(sql("select person from table1"), true, "2018-01-01 00:00:00.0")
+    sql("select person,roll,person.detail from table1").show(false)
+    checkAnswer(sql("select person,roll,person.detail from table1"),
+      Seq(Row(Row(Timestamp.valueOf("2018-01-01 00:00:00.0")), 1,
+        Timestamp.valueOf("2018-01-01 00:00:00.0"))))
+    checkAnswer(sql("select roll,person from table1"),
+      Seq(Row(1, Row(Timestamp.valueOf("2018-01-01 00:00:00.0")))))
+    checkAnswer(sql("select roll,person.detail from table1"),
+      Seq(Row(1, Timestamp.valueOf("2018-01-01 00:00:00.0"))))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+  }
+
+  test("test Projection PushDown for StructofArray - timestamp type") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:array<timestamp>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 select 1,'2018/01/01:2017/01/01'")
+    checkExistence(sql("select person.detail[0] from table1"), true, "2018-01-01 00:00:00.0")
+    checkExistence(sql("select person.detail[1] from table1"), true, "2017-01-01 00:00:00.0")
+    checkAnswer(sql("select roll,person from table1"),
+      Seq(Row(1, Row(mutable.WrappedArray
+        .make(Array(Timestamp.valueOf("2018-01-01 00:00:00.0"),
+          Timestamp.valueOf("2017-01-01 00:00:00.0")))))))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  }
+
+  test("test Projection PushDown for Struct - long type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:long>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,2018888)")
+    checkExistence(sql("select person from table1"), true, "2018888")
+    checkAnswer(sql("select person,roll,person.detail from table1"),
+      Seq(Row(Row(2018888), 1, 2018888)))
+    checkExistence(sql("select person.detail from table1"), true, "2018888")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(2018888))))
+  }
+
+  test("test Projection PushDown for StructofArray - long type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:array<long>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'2018888:2018889')")
+    checkAnswer(sql("select person.detail[0] from table1"), Seq(Row(2018888)))
+    checkAnswer(sql("select person.detail[1] from table1"), Seq(Row(2018889)))
+    checkAnswer(sql("select person,roll from table1"),
+      Seq(Row(Row(mutable.WrappedArray.make(Array(2018888, 2018889))), 1)))
+  }
+
+  test("test Projection PushDown for Struct - short type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:short>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,20)")
+    checkExistence(sql("select person from table1"), true, "20")
+    sql("select person,person.detail from table1").show(false)
+    checkAnswer(sql("select person,roll,person.detail from table1"), Seq(Row(Row(20), 1, 20)))
+    checkExistence(sql("select person.detail from table1"), true, "20")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(20))))
+  }
+
+  test("test Projection PushDown for StructofArray - short type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:array<short>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'20:30')")
+    checkAnswer(sql("select person.detail[0] from table1"), Seq(Row(20)))
+    checkAnswer(sql("select person.detail[1] from table1"), Seq(Row(30)))
+    checkAnswer(sql("select person,roll from table1"),
+      Seq(Row(Row(mutable.WrappedArray.make(Array(20, 30))), 1)))
+  }
+
+  test("test Projection PushDown for Struct - boolean type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:boolean>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,true)")
+    checkExistence(sql("select person from table1"), true, "true")
+    sql("select person,person.detail from table1").show(false)
+    checkAnswer(sql("select person,roll,person.detail from table1"), Seq(Row(Row(true), 1, true)))
+    checkExistence(sql("select person.detail from table1"), true, "true")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(true))))
+  }
+
+  test("test Projection PushDown for StructofArray - boolean type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:array<boolean>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'true:false')")
+    checkAnswer(sql("select person.detail[0] from table1"), Seq(Row(true)))
+    checkAnswer(sql("select person.detail[1] from table1"), Seq(Row(false)))
+    checkAnswer(sql("select person,roll from table1"),
+      Seq(Row(Row(mutable.WrappedArray.make(Array(true, false))), 1)))
+  }
+
+  test("test Projection PushDown for StructofStruct - Integer type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:int>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'1')")
+    checkExistence(sql("select person from table1"), true, "1")
+    checkAnswer(sql("select person,roll,person.detail from table1"),
+      Seq(Row(Row(Row(1)), 1, Row(1))))
+    checkExistence(sql("select person.detail.age from table1"), true, "1")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(Row(1)))))
+  }
+
+  test("test Projection PushDown for StructofStruct - String type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:string>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'abc')")
+    checkExistence(sql("select person from table1"), true, "abc")
+    checkAnswer(sql("select person,person.detail from table1"),
+      Seq(Row(Row(Row("abc")), Row("abc"))))
+    checkExistence(sql("select person.detail.age from table1"), true, "abc")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(Row("abc")))))
+  }
+
+  test("test Projection PushDown for StructofStruct - Double type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:double>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,10.00)")
+    checkExistence(sql("select person from table1"), true, "10.0")
+    checkAnswer(sql("select person,person.detail from table1"), Seq(Row(Row(Row(10.0)), Row(10.0))))
+    checkExistence(sql("select person.detail.age from table1"), true, "10.0")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(Row(10.0)))))
+  }
+
+  test("test Projection PushDown for StructofStruct - Decimal type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:decimal(3,2)>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,3.2)")
+    checkExistence(sql("select person from table1"), true, "3")
+    checkExistence(sql("select person.detail.age from table1"), true, "3")
+  }
+
+  test("test Projection PushDown for StructofStruct - timestamp type") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:timestamp>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'2018/01/01')")
+    checkExistence(sql("select person from table1"), true, "2018-01-01 00:00:00.0")
+    checkAnswer(sql("select person,person.detail from table1"),
+      Seq(Row(Row(Row(Timestamp.valueOf("2018-01-01 00:00:00.0"))),
+        Row(Timestamp.valueOf("2018-01-01 00:00:00.0")))))
+    checkExistence(sql("select person.detail.age from table1"), true, "2018-01-01 00:00:00.0")
+    checkAnswer(sql("select roll,person from table1"),
+      Seq(Row(1, Row(Row(Timestamp.valueOf("2018-01-01 00:00:00.0"))))))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  }
+
+  test("test Projection PushDown for StructofStruct - long type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:long>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,2018888)")
+    checkExistence(sql("select person from table1"), true, "2018888")
+    checkAnswer(sql("select person,person.detail from table1"),
+      Seq(Row(Row(Row(2018888)), Row(2018888))))
+    checkExistence(sql("select person.detail.age from table1"), true, "2018888")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(Row(2018888)))))
+
+  }
+
+  test("test Projection PushDown for StructofStruct - short type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:short>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,20)")
+    checkExistence(sql("select person from table1"), true, "20")
+    checkAnswer(sql("select person,person.detail from table1"), Seq(Row(Row(Row(20)), Row(20))))
+    checkExistence(sql("select person.detail.age from table1"), true, "20")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(Row(20)))))
+  }
+
+  test("test Projection PushDown for  StructofStruct - boolean type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:boolean>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,true)")
+    checkExistence(sql("select person from table1"), true, "true")
+    checkAnswer(sql("select person,person.detail from table1"), Seq(Row(Row(Row(true)), Row(true))))
+    checkExistence(sql("select person.detail.age from table1"), true, "true")
+    checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(Row(true)))))
+  }
+
+  test("test StructofArray pushdown") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (person Struct<detail:string,ph:array<int>>) stored by " +
+      "'carbondata' tblproperties('dictionary_include'='person')")
+    sql("insert into table1 values ('abc$2')")
+    sql("select person from table1").show(false)
+    sql("select person.detail, person.ph[0] from table1").show(false)
+  }
+
+  test("test Projection PushDown for Struct - Merge column") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:int,age:string,height:double>) stored " +
+      "by " +
+      "'carbondata'")
+    sql(
+      "load data inpath '" + resourcesPath +
+      "/Struct.csv' into table table1 options('delimiter'=','," +
+      "'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
+      "'complex_delimiter_level_2'='&')")
+    checkAnswer(sql("select person from table1"), Seq(
+      Row(Row(11, "abc", 10.0)),
+      Row(Row(12, "abcd", 10.01)),
+      Row(Row(13, "abce", 10.02)),
+      Row(Row(14, "abcr", 10.03)),
+      Row(Row(15, "abct", 10.04)),
+      Row(Row(16, "abcn", 10.05)),
+      Row(Row(17, "abcq", 10.06)),
+      Row(Row(18, "abcs", 10.07)),
+      Row(Row(19, "abcm", 10.08)),
+      Row(Row(20, "abck", 10.09))
+    ))
+    checkAnswer(sql("select person.detail,person.age,person.height from table1"), Seq(
+      Row(11, "abc", 10.0),
+      Row(12, "abcd", 10.01),
+      Row(13, "abce", 10.02),
+      Row(14, "abcr", 10.03),
+      Row(15, "abct", 10.04),
+      Row(16, "abcn", 10.05),
+      Row(17, "abcq", 10.06),
+      Row(18, "abcs", 10.07),
+      Row(19, "abcm", 10.08),
+      Row(20, "abck", 10.09)))
+    checkAnswer(sql("select person.age,person.detail,person.height from table1"), Seq(
+      Row("abc", 11, 10.0),
+      Row("abcd", 12, 10.01),
+      Row("abce", 13, 10.02),
+      Row("abcr", 14, 10.03),
+      Row("abct", 15, 10.04),
+      Row("abcn", 16, 10.05),
+      Row("abcq", 17, 10.06),
+      Row("abcs", 18, 10.07),
+      Row("abcm", 19, 10.08),
+      Row("abck", 20, 10.09)))
+    checkAnswer(sql("select person.height,person.age,person.detail from table1"), Seq(
+      Row(10.0, "abc", 11),
+      Row(10.01, "abcd", 12),
+      Row(10.02, "abce", 13),
+      Row(10.03, "abcr", 14),
+      Row(10.04, "abct", 15),
+      Row(10.05, "abcn", 16),
+      Row(10.06, "abcq", 17),
+      Row(10.07, "abcs", 18),
+      Row(10.08, "abcm", 19),
+      Row(10.09, "abck", 20)))
+  }
+
+  test("test Projection PushDown for StructofStruct - Merging columns") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person Struct<detail:Struct<age:int,name:string," +
+      "height:double>>) stored " +
+      "by " +
+      "'carbondata'")
+    sql(
+      "load data inpath '" + resourcesPath +
+      "/StructofStruct.csv' into table table1 options('delimiter'=','," +
+      "'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
+      "'complex_delimiter_level_2'='&')")
+    checkAnswer(sql("select person from table1"), Seq(
+      Row(Row(Row(11, "abc", 10.0))),
+      Row(Row(Row(12, "abcd", 10.01))),
+      Row(Row(Row(13, "abce", 10.02))),
+      Row(Row(Row(14, "abcr", 10.03))),
+      Row(Row(Row(15, "abct", 10.04))),
+      Row(Row(Row(16, "abcn", 10.05))),
+      Row(Row(Row(17, "abcq", 10.06))),
+      Row(Row(Row(18, "abcs", 10.07))),
+      Row(Row(Row(19, "abcm", 10.08))),
+      Row(Row(Row(20, "abck", 10.09)))))
+    checkAnswer(sql("select person.detail.age,person.detail.name,person.detail.height from table1"),
+      Seq(
+        Row(11, "abc", 10.0),
+        Row(12, "abcd", 10.01),
+        Row(13, "abce", 10.02),
+        Row(14, "abcr", 10.03),
+        Row(15, "abct", 10.04),
+        Row(16, "abcn", 10.05),
+        Row(17, "abcq", 10.06),
+        Row(18, "abcs", 10.07),
+        Row(19, "abcm", 10.08),
+        Row(20, "abck", 10.09)))
+    checkAnswer(sql("select person.detail.name,person.detail.age,person.detail.height from table1"),
+      Seq(
+        Row("abc", 11, 10.0),
+        Row("abcd", 12, 10.01),
+        Row("abce", 13, 10.02),
+        Row("abcr", 14, 10.03),
+        Row("abct", 15, 10.04),
+        Row("abcn", 16, 10.05),
+        Row("abcq", 17, 10.06),
+        Row("abcs", 18, 10.07),
+        Row("abcm", 19, 10.08),
+        Row("abck", 20, 10.09)))
+    checkAnswer(sql("select person.detail.height,person.detail.name,person.detail.age from table1"),
+      Seq(
+        Row(10.0, "abc", 11),
+        Row(10.01, "abcd", 12),
+        Row(10.02, "abce", 13),
+        Row(10.03, "abcr", 14),
+        Row(10.04, "abct", 15),
+        Row(10.05, "abcn", 16),
+        Row(10.06, "abcq", 17),
+        Row(10.07, "abcs", 18),
+        Row(10.08, "abcm", 19),
+        Row(10.09, "abck", 20)))
+    checkAnswer(sql("select person.detail from table1"),
+      Seq(
+        Row(Row(11, "abc", 10.0)),
+        Row(Row(12, "abcd", 10.01)),
+        Row(Row(13, "abce", 10.02)),
+        Row(Row(14, "abcr", 10.03)),
+        Row(Row(15, "abct", 10.04)),
+        Row(Row(16, "abcn", 10.05)),
+        Row(Row(17, "abcq", 10.06)),
+        Row(Row(18, "abcs", 10.07)),
+        Row(Row(19, "abcm", 10.08)),
+        Row(Row(20, "abck", 10.09))))
+    checkAnswer(sql("select person.detail.age from table1"), Seq(
+      Row(11), Row(12), Row(13), Row(14), Row(15), Row(16), Row(17), Row(18), Row(19), Row(20)))
+  }
+
+  test("test Projection PushDown for more than one Struct column- Integer type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll string,person Struct<detail:int,age:string>,person1 " +
+      "Struct<detail:int,age:array<string>>) stored by " +
+      "'carbondata'")
+    sql("insert into table1 values('abc','1$abc','2$cde')")
+    sql("select person.detail,person1.age from table1").show(false)
+  }
+
+  test("test Projection PushDown for more than one Struct column Cases -1") {
+    sql("drop table if exists test")
+    sql("create table test (a struct<b:int, c:struct<d:int,e:int>>) stored by 'carbondata'")
+    sql("insert into test select '1$2:3'")
+    checkAnswer(sql("select * from test"), Seq(Row(Row(1, Row(2, 3)))))
+    checkAnswer(sql("select a.b,a.c from test"), Seq(Row(1, Row(2, 3))))
+    checkAnswer(sql("select a.c, a.b from test"), Seq(Row(Row(2, 3), 1)))
+    checkAnswer(sql("select a.c,a,a.b from test"), Seq(Row(Row(2, 3), Row(1, Row(2, 3)), 1)))
+    checkAnswer(sql("select a.c from test"), Seq(Row(Row(2, 3))))
+    checkAnswer(sql("select a.b from test"), Seq(Row(1)))
+    sql("drop table if exists test")
+  }
+
+  test("test Projection PushDown for with more than one StructofArray column - Integer type") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (person Struct<detail:array<int>>,person1 Struct<detail:array<int>>) " +
+      "stored by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,2)")
+    sql("select person.detail[0],person1.detail[0] from table1").show(false)
+  }
+
+  test("test Projection PushDown for StructofStruct case1 - Merging columns") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,a struct<b:int,c:string,d:int,e:string,f:struct<g:int," +
+      "h:string,i:int>,j:int>) stored " +
+      "by " +
+      "'carbondata'")
+    sql("insert into table1 values(1,'1$abc$2$efg$3:mno:4$5')")
+    sql("insert into table1 values(2,'1$abc$2$efg$3:mno:4$5')")
+    sql("insert into table1 values(3,'1$abc$2$efg$3:mno:4$5')")
+    checkAnswer(sql("select a.b from table1"), Seq(Row(1), Row(1), Row(1)))
+    checkAnswer(sql("select a.c from table1"), Seq(Row("abc"), Row("abc"), Row("abc")))
+    checkAnswer(sql("select a.d from table1"), Seq(Row(2), Row(2), Row(2)))
+    checkAnswer(sql("select a.e from table1"), Seq(Row("efg"), Row("efg"), Row("efg")))
+    checkAnswer(sql("select a.f from table1"),
+      Seq(Row(Row(3, "mno", 4)), Row(Row(3, "mno", 4)), Row(Row(3, "mno", 4))))
+    checkAnswer(sql("select a.f.g  from table1"), Seq(Row(3), Row(3), Row(3)))
+    checkAnswer(sql("select a.f.h  from table1"), Seq(Row("mno"), Row("mno"), Row("mno")))
+    checkAnswer(sql("select a.f.i  from table1"), Seq(Row(4), Row(4), Row(4)))
+    checkAnswer(sql("select a.f.g,a.f.h,a.f.i  from table1"),
+      Seq(Row(3, "mno", 4), Row(3, "mno", 4), Row(3, "mno", 4)))
+    checkAnswer(sql("select a.b,a.f from table1"),
+      Seq(Row(1, Row(3, "mno", 4)), Row(1, Row(3, "mno", 4)), Row(1, Row(3, "mno", 4))))
+    checkAnswer(sql("select a.c,a.f from table1"),
+      Seq(Row("abc", Row(3, "mno", 4)), Row("abc", Row(3, "mno", 4)), Row("abc", Row(3, "mno", 4))))
+    checkAnswer(sql("select a.d,a.f from table1"),
+      Seq(Row(2, Row(3, "mno", 4)), Row(2, Row(3, "mno", 4)), Row(2, Row(3, "mno", 4))))
+    checkAnswer(sql("select a.j from table1"), Seq(Row(5), Row(5), Row(5)))
+    checkAnswer(sql("select * from table1"),
+      Seq(Row(1, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
+        Row(2, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
+        Row(3, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5))))
+    checkAnswer(sql("select *,a from table1"),
+      Seq(Row(1,
+        Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
+        Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
+        Row(2,
+          Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
+          Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
+        Row(3,
+          Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
+          Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5))))
+  }
+
+  test("test Projection PushDown for StructofStruct for Dictionary Include ") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,a struct<b:int,c:string,d:int,e:string,f:struct<g:int," +
+      "h:string,i:int>,j:int>) stored " +
+      "by " +
+      "'carbondata' tblproperties('dictionary_include'='a')")
+    sql("insert into table1 values(1,'1$abc$2$efg$3:mno:4$5')")
+    sql("insert into table1 values(2,'1$abc$2$efg$3:mno:4$5')")
+    sql("insert into table1 values(3,'1$abc$2$efg$3:mno:4$5')")
+
+    checkAnswer(sql("select a.b from table1"), Seq(Row(1), Row(1), Row(1)))
+    checkAnswer(sql("select a.c from table1"), Seq(Row("abc"), Row("abc"), Row("abc")))
+    checkAnswer(sql("select a.d from table1"), Seq(Row(2), Row(2), Row(2)))
+    checkAnswer(sql("select a.e from table1"), Seq(Row("efg"), Row("efg"), Row("efg")))
+    checkAnswer(sql("select a.f from table1"),
+      Seq(Row(Row(3, "mno", 4)), Row(Row(3, "mno", 4)), Row(Row(3, "mno", 4))))
+    checkAnswer(sql("select a.f.g  from table1"), Seq(Row(3), Row(3), Row(3)))
+    checkAnswer(sql("select a.f.h  from table1"), Seq(Row("mno"), Row("mno"), Row("mno")))
+    checkAnswer(sql("select a.f.i  from table1"), Seq(Row(4), Row(4), Row(4)))
+    checkAnswer(sql("select a.f.g,a.f.h,a.f.i  from table1"),
+      Seq(Row(3, "mno", 4), Row(3, "mno", 4), Row(3, "mno", 4)))
+    checkAnswer(sql("select a.b,a.f from table1"),
+      Seq(Row(1, Row(3, "mno", 4)), Row(1, Row(3, "mno", 4)), Row(1, Row(3, "mno", 4))))
+    checkAnswer(sql("select a.c,a.f from table1"),
+      Seq(Row("abc", Row(3, "mno", 4)), Row("abc", Row(3, "mno", 4)), Row("abc", Row(3, "mno", 4))))
+    checkAnswer(sql("select a.d,a.f from table1"),
+      Seq(Row(2, Row(3, "mno", 4)), Row(2, Row(3, "mno", 4)), Row(2, Row(3, "mno", 4))))
+    checkAnswer(sql("select a.j from table1"), Seq(Row(5), Row(5), Row(5)))
+    checkAnswer(sql("select * from table1"),
+      Seq(Row(1, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
+        Row(2, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
+        Row(3, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5))))
+    checkAnswer(sql("select *,a from table1"),
+      Seq(Row(1,
+        Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
+        Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
+        Row(2,
+          Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
+          Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
+        Row(3,
+          Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
+          Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5))))
+  }
+
+  test("ArrayofArray PushDown")
+  {
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(a array<array<int>>) stored by 'carbondata'")
+    sql("insert into test values(1) ")
+    sql("select a[0][0] from test").show(false)
+  }
+
+  test("Struct and ArrayofArray PushDown")
+  {
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(a array<array<int>>,b struct<c:array<int>>) stored by 'carbondata'")
+    sql("insert into test values(1,1) ")
+    sql("select b.c[0],a[0][0] from test").show(false)
+  }
+
+}