You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 19:21:14 UTC

[9/9] carbondata git commit: [CARBONDATA-1520] [PreAgg] Support pre-aggregate table load

[CARBONDATA-1520] [PreAgg] Support pre-aggregate table load

This closes #1446


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cc0e6f1e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cc0e6f1e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cc0e6f1e

Branch: refs/heads/master
Commit: cc0e6f1e77b39d712de0e6101b2d24e57c5b47cb
Parents: f7f516e
Author: kunal642 <ku...@gmail.com>
Authored: Thu Oct 26 17:09:54 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 14 00:48:16 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 .../core/indexstore/UnsafeMemoryDMStore.java    |  14 +-
 .../blockletindex/BlockletDataMap.java          |  41 +++--
 .../core/indexstore/row/DataMapRow.java         |   6 +-
 .../core/indexstore/row/DataMapRowImpl.java     |   4 +-
 .../core/indexstore/row/UnsafeDataMapRow.java   |   8 +-
 .../core/indexstore/schema/CarbonRowSchema.java | 124 +++++++++++++
 .../core/indexstore/schema/DataMapSchema.java   | 124 -------------
 .../metadata/converter/SchemaConverter.java     |   2 +-
 .../core/metadata/schema/table/CarbonTable.java |   5 +
 .../metadata/schema/table/DataMapSchema.java    |   4 +-
 .../schema/table/RelationIdentifier.java        |   4 +
 .../core/metadata/schema/table/TableInfo.java   |   2 +-
 .../core/scan/executor/util/QueryUtil.java      |  44 +++--
 .../carbondata/core/util/SessionParams.java     |  13 ++
 .../core/writer/CarbonIndexFileMergeWriter.java |   2 +-
 .../carbondata/events/OperationListenerBus.java |   3 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  19 +-
 .../src/test/resources/sample.csv               |   2 +
 .../dataload/TestLoadDataGeneral.scala          |  10 +-
 .../TestLoadDataWithAutoLoadMerge.scala         |   2 +-
 .../TestLoadDataWithYarnLocalDirs.scala         |   2 +-
 .../preaggregate/TestPreAggregateLoad.scala     | 172 +++++++++++++++++++
 .../deleteTable/TestDeleteTableNewDDL.scala     |   2 +
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  13 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   8 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  24 ++-
 .../command/management/LoadTableCommand.scala   |   4 +
 .../CreatePreAggregateTableCommand.scala        |   5 +
 .../DropPreAggregateTablePostListener.scala     |  49 ------
 .../preaaggregate/PreAggregateListeners.scala   |  81 +++++++++
 .../preaaggregate/PreAggregateUtil.scala        |  23 ++-
 .../spark/sql/hive/CarbonAnalysisRules.scala    |  57 +++++-
 .../spark/sql/hive/CarbonSessionState.scala     |   7 +-
 .../execution/command/CarbonHiveCommands.scala  |   3 +
 .../impl/DictionaryFieldConverterImpl.java      |   8 +-
 .../converter/impl/FieldEncoderFactory.java     |  38 +++-
 .../loading/model/CarbonLoadModel.java          |  13 ++
 38 files changed, 685 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 014478f..e27e5bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -57,6 +57,12 @@ public final class CarbonCommonConstants {
   public static final String CARBON_INPUT_SEGMENTS = "carbon.input.segments.";
 
   /**
+   * Fetch and validate the segments.
+   * Used for aggregate table load as segment validation is not required.
+   */
+  public static final String VALIDATE_CARBON_INPUT_SEGMENTS = "validate.carbon.input.segments.";
+
+  /**
    * location of the carbon member, hierarchy and fact files
    */
   @CarbonProperty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index bf1678a..450796a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -18,7 +18,7 @@ package org.apache.carbondata.core.indexstore;
 
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
@@ -44,7 +44,7 @@ public class UnsafeMemoryDMStore {
 
   private boolean isMemoryFreed;
 
-  private DataMapSchema[] schema;
+  private CarbonRowSchema[] schema;
 
   private int[] pointers;
 
@@ -52,7 +52,7 @@ public class UnsafeMemoryDMStore {
 
   private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
 
-  public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException {
+  public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException {
     this.schema = schema;
     this.allocatedSize = capacity;
     this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
@@ -101,7 +101,7 @@ public class UnsafeMemoryDMStore {
     pointers[rowCount++] = pointer;
   }
 
-  private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) {
+  private void addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index) {
     switch (schema.getSchemaType()) {
       case FIXED:
         DataType dataType = schema.getDataType();
@@ -154,8 +154,8 @@ public class UnsafeMemoryDMStore {
         runningLength += data.length;
         break;
       case STRUCT:
-        DataMapSchema[] childSchemas =
-            ((DataMapSchema.StructDataMapSchema) schema).getChildSchemas();
+        CarbonRowSchema[] childSchemas =
+            ((CarbonRowSchema.StructCarbonRowSchema) schema).getChildSchemas();
         DataMapRow struct = row.getRow(index);
         for (int i = 0; i < childSchemas.length; i++) {
           addToUnsafe(childSchemas[i], struct, i);
@@ -200,7 +200,7 @@ public class UnsafeMemoryDMStore {
     return runningLength;
   }
 
-  public DataMapSchema[] getSchema() {
+  public CarbonRowSchema[] getSchema() {
     return schema;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 7829034..43e265d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
@@ -132,7 +132,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       String filePath) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
-    DataMapSchema[] schema = unsafeMemoryDMStore.getSchema();
+    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
     for (int index = 0; index < blockletList.size(); index++) {
       DataMapRow row = new DataMapRowImpl(schema);
       int ordinal = 0;
@@ -256,9 +256,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return updatedValues;
   }
 
-  private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) {
-    DataMapSchema[] minSchemas =
-        ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas();
+  private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema,
+      byte[][] minValues) {
+    CarbonRowSchema[] minSchemas =
+        ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
     DataMapRow minRow = new DataMapRowImpl(minSchemas);
     int minOrdinal = 0;
     // min value adding
@@ -269,46 +270,48 @@ public class BlockletDataMap implements DataMap, Cacheable {
   }
 
   private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
-    List<DataMapSchema> indexSchemas = new ArrayList<>();
+    List<CarbonRowSchema> indexSchemas = new ArrayList<>();
 
     // Index key
-    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
+    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     // do it 2 times, one for min and one for max.
     for (int k = 0; k < 2; k++) {
-      DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
+      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
       for (int i = 0; i < minMaxLen.length; i++) {
         if (minMaxLen[i] <= 0) {
-          mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY);
+          mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
         } else {
-          mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
+          mapSchemas[i] =
+              new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
         }
       }
-      DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(
-          DataTypes.createDefaultStructType(), mapSchemas);
+      CarbonRowSchema mapSchema =
+          new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+              mapSchemas);
       indexSchemas.add(mapSchema);
     }
 
     // for number of rows.
-    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.INT));
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
 
     // for table block path
-    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
+    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
 
     // for number of pages.
-    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.SHORT));
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
 
     // for version number.
-    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.SHORT));
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
 
     // for schema updated time.
-    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.LONG));
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
 
     //for blocklet info
-    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
+    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
 
     unsafeMemoryDMStore =
-        new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));
+        new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index 631e0ad..b764bdf 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.core.indexstore.row;
 
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 
 /**
  * It is just a normal row to store data. Implementation classes could be safe and unsafe.
@@ -24,9 +24,9 @@ import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
  */
 public abstract class DataMapRow {
 
-  protected DataMapSchema[] schemas;
+  protected CarbonRowSchema[] schemas;
 
-  public DataMapRow(DataMapSchema[] schemas) {
+  public DataMapRow(CarbonRowSchema[] schemas) {
     this.schemas = schemas;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index 032b29e..0bb4a5c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.core.indexstore.row;
 
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 /**
@@ -26,7 +26,7 @@ public class DataMapRowImpl extends DataMapRow {
 
   private Object[] data;
 
-  public DataMapRowImpl(DataMapSchema[] schemas) {
+  public DataMapRowImpl(CarbonRowSchema[] schemas) {
     super(schemas);
     this.data = new Object[schemas.length];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 2c76990..932865d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.core.indexstore.row;
 
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryBlock;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
@@ -32,7 +32,7 @@ public class UnsafeDataMapRow extends DataMapRow {
 
   private int pointer;
 
-  public UnsafeDataMapRow(DataMapSchema[] schemas, MemoryBlock block, int pointer) {
+  public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointer) {
     super(schemas);
     this.block = block;
     this.pointer = pointer;
@@ -84,8 +84,8 @@ public class UnsafeDataMapRow extends DataMapRow {
   }
 
   @Override public DataMapRow getRow(int ordinal) {
-    DataMapSchema[] childSchemas =
-        ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
+    CarbonRowSchema[] childSchemas =
+        ((CarbonRowSchema.StructCarbonRowSchema) schemas[ordinal]).getChildSchemas();
     return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
new file mode 100644
index 0000000..813be4a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * It just have 2 types right now, either fixed or variable.
+ */
+public abstract class CarbonRowSchema {
+
+  protected DataType dataType;
+
+  public CarbonRowSchema(DataType dataType) {
+    this.dataType = dataType;
+  }
+
+  /**
+   * Either fixed or variable length.
+   *
+   * @return
+   */
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  /**
+   * Gives length in case of fixed schema other wise returns length
+   *
+   * @return
+   */
+  public abstract int getLength();
+
+  /**
+   * schema type
+   * @return
+   */
+  public abstract DataMapSchemaType getSchemaType();
+
+  /*
+ * It has always fixed length, length cannot be updated later.
+ * Usage examples : all primitive types like short, int etc
+ */
+  public static class FixedCarbonRowSchema extends CarbonRowSchema {
+
+    private int length;
+
+    public FixedCarbonRowSchema(DataType dataType) {
+      super(dataType);
+    }
+
+    public FixedCarbonRowSchema(DataType dataType, int length) {
+      super(dataType);
+      this.length = length;
+    }
+
+    @Override public int getLength() {
+      if (length == 0) {
+        return dataType.getSizeInBytes();
+      } else {
+        return length;
+      }
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.FIXED;
+    }
+  }
+
+  public static class VariableCarbonRowSchema extends CarbonRowSchema {
+
+    public VariableCarbonRowSchema(DataType dataType) {
+      super(dataType);
+    }
+
+    @Override public int getLength() {
+      return dataType.getSizeInBytes();
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.VARIABLE;
+    }
+  }
+
+  public static class StructCarbonRowSchema extends CarbonRowSchema {
+
+    private CarbonRowSchema[] childSchemas;
+
+    public StructCarbonRowSchema(DataType dataType, CarbonRowSchema[] childSchemas) {
+      super(dataType);
+      this.childSchemas = childSchemas;
+    }
+
+    @Override public int getLength() {
+      return dataType.getSizeInBytes();
+    }
+
+    public CarbonRowSchema[] getChildSchemas() {
+      return childSchemas;
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.STRUCT;
+    }
+  }
+
+  public enum DataMapSchemaType {
+    FIXED, VARIABLE, STRUCT
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
deleted file mode 100644
index 80c68ac..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.schema;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * It just have 2 types right now, either fixed or variable.
- */
-public abstract class DataMapSchema {
-
-  protected DataType dataType;
-
-  public DataMapSchema(DataType dataType) {
-    this.dataType = dataType;
-  }
-
-  /**
-   * Either fixed or variable length.
-   *
-   * @return
-   */
-  public DataType getDataType() {
-    return dataType;
-  }
-
-  /**
-   * Gives length in case of fixed schema other wise returns length
-   *
-   * @return
-   */
-  public abstract int getLength();
-
-  /**
-   * schema type
-   * @return
-   */
-  public abstract DataMapSchemaType getSchemaType();
-
-  /*
- * It has always fixed length, length cannot be updated later.
- * Usage examples : all primitive types like short, int etc
- */
-  public static class FixedDataMapSchema extends DataMapSchema {
-
-    private int length;
-
-    public FixedDataMapSchema(DataType dataType) {
-      super(dataType);
-    }
-
-    public FixedDataMapSchema(DataType dataType, int length) {
-      super(dataType);
-      this.length = length;
-    }
-
-    @Override public int getLength() {
-      if (length == 0) {
-        return dataType.getSizeInBytes();
-      } else {
-        return length;
-      }
-    }
-
-    @Override public DataMapSchemaType getSchemaType() {
-      return DataMapSchemaType.FIXED;
-    }
-  }
-
-  public static class VariableDataMapSchema extends DataMapSchema {
-
-    public VariableDataMapSchema(DataType dataType) {
-      super(dataType);
-    }
-
-    @Override public int getLength() {
-      return dataType.getSizeInBytes();
-    }
-
-    @Override public DataMapSchemaType getSchemaType() {
-      return DataMapSchemaType.VARIABLE;
-    }
-  }
-
-  public static class StructDataMapSchema extends DataMapSchema {
-
-    private DataMapSchema[] childSchemas;
-
-    public StructDataMapSchema(DataType dataType, DataMapSchema[] childSchemas) {
-      super(dataType);
-      this.childSchemas = childSchemas;
-    }
-
-    @Override public int getLength() {
-      return dataType.getSizeInBytes();
-    }
-
-    public DataMapSchema[] getChildSchemas() {
-      return childSchemas;
-    }
-
-    @Override public DataMapSchemaType getSchemaType() {
-      return DataMapSchemaType.STRUCT;
-    }
-  }
-
-  public enum DataMapSchemaType {
-    FIXED, VARIABLE, STRUCT
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
index bfbb6f7..af86253 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
@@ -106,7 +106,7 @@ public interface SchemaConverter {
    * method to convert thrift datamap schema object to wrapper
    * data map object
    * @param thriftchildSchema
-   * @return DataMapSchema
+   * @return CarbonRowSchema
    */
   DataMapSchema fromExternalToWrapperDataMapSchema(
       org.apache.carbondata.format.DataMapSchema thriftchildSchema);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index e63f4e3..4a6fb8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -702,4 +702,9 @@ public class CarbonTable implements Serializable {
     this.dimensionOrdinalMax = dimensionOrdinalMax;
   }
 
+  public boolean isPreAggregateTable() {
+    return tableInfo.getParentRelationIdentifiers() != null && !tableInfo
+        .getParentRelationIdentifiers().isEmpty();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 5f92ec8..9c71e37 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -28,6 +28,8 @@ import java.util.Map;
  */
 public class DataMapSchema implements Serializable, Writable {
 
+  private static final long serialVersionUID = 6577149126264181553L;
+
   private String className;
 
   private RelationIdentifier relationIdentifier;
@@ -100,7 +102,7 @@ public class DataMapSchema implements Serializable, Writable {
     this.className = in.readUTF();
     boolean isRelationIdnentifierExists = in.readBoolean();
     if (isRelationIdnentifierExists) {
-      this.relationIdentifier = new RelationIdentifier(null, null, null);
+      this.relationIdentifier = new RelationIdentifier();
       this.relationIdentifier.readFields(in);
     }
     boolean isChildSchemaExists = in.readBoolean();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
index 9a70b8b..2a2d937 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
@@ -38,6 +38,10 @@ public class RelationIdentifier implements Serializable, Writable {
     this.tableId = tableId;
   }
 
+  public RelationIdentifier() {
+    this(null, null, null);
+  }
+
   public String getDatabaseName() {
     return databaseName;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 3acd6d6..44d8126 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -261,7 +261,7 @@ public class TableInfo implements Serializable, Writable {
     out.writeUTF(metaDataFilepath);
     out.writeUTF(storePath);
     boolean isChildSchemaExists =
-        null != dataMapSchemaList && dataMapSchemaList.size() > 0 ? true : false;
+        null != dataMapSchemaList && dataMapSchemaList.size() > 0;
     out.writeBoolean(isChildSchemaExists);
     if (isChildSchemaExists) {
       out.writeShort(dataMapSchemaList.size());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index b090e59..4d13462 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -353,14 +354,15 @@ public class QueryUtil {
     CacheProvider cacheProvider = CacheProvider.getInstance();
     Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
         .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
-
     List<Dictionary> columnDictionaryList =
         forwardDictionaryCache.getAll(dictionaryColumnUniqueIdentifiers);
     Map<String, Dictionary> columnDictionaryMap = new HashMap<>(columnDictionaryList.size());
     for (int i = 0; i < dictionaryColumnUniqueIdentifiers.size(); i++) {
       // TODO: null check for column dictionary, if cache size is less it
       // might return null here, in that case throw exception
-      columnDictionaryMap.put(dictionaryColumnIdList.get(i), columnDictionaryList.get(i));
+      columnDictionaryMap
+          .put(dictionaryColumnUniqueIdentifiers.get(i).getColumnIdentifier().getColumnId(),
+              columnDictionaryList.get(i));
     }
     return columnDictionaryMap;
   }
@@ -376,27 +378,47 @@ public class QueryUtil {
       List<String> dictionaryColumnIdList, CarbonTableIdentifier carbonTableIdentifier,
       TableProvider tableProvider) throws IOException {
     CarbonTable carbonTable = tableProvider.getCarbonTable(carbonTableIdentifier);
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath(), carbonTableIdentifier);
     List<DictionaryColumnUniqueIdentifier> dictionaryColumnUniqueIdentifiers =
         new ArrayList<>(dictionaryColumnIdList.size());
     for (String columnId : dictionaryColumnIdList) {
       CarbonDimension dimension = CarbonMetadata.getInstance()
           .getCarbonDimensionBasedOnColIdentifier(carbonTable, columnId);
       if (dimension != null) {
+        CarbonTableIdentifier newCarbonTableIdentifier;
+        ColumnIdentifier columnIdentifier;
+        if (null != dimension.getColumnSchema().getParentColumnTableRelations() && !dimension
+            .getColumnSchema().getParentColumnTableRelations().isEmpty()) {
+          newCarbonTableIdentifier = getTableIdentifierForColumn(dimension);
+          columnIdentifier = new ColumnIdentifier(
+              dimension.getColumnSchema().getParentColumnTableRelations().get(0).getColumnId(),
+              dimension.getColumnProperties(), dimension.getDataType());
+        } else {
+          newCarbonTableIdentifier = carbonTableIdentifier;
+          columnIdentifier = dimension.getColumnIdentifier();
+        }
+        CarbonTablePath newCarbonTablePath = CarbonStorePath
+            .getCarbonTablePath(carbonTable.getStorePath(), newCarbonTableIdentifier);
+
         dictionaryColumnUniqueIdentifiers.add(
-            new DictionaryColumnUniqueIdentifier(
-                carbonTableIdentifier,
-                dimension.getColumnIdentifier(),
-                dimension.getDataType(),
-                carbonTablePath
-            )
-        );
+            new DictionaryColumnUniqueIdentifier(newCarbonTableIdentifier, columnIdentifier,
+                dimension.getDataType(), newCarbonTablePath));
       }
     }
     return dictionaryColumnUniqueIdentifiers;
   }
 
+  public static CarbonTableIdentifier getTableIdentifierForColumn(CarbonDimension carbonDimension) {
+    String parentTableName =
+        carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
+            .getRelationIdentifier().getTableName();
+    String parentDatabaseName =
+        carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
+            .getRelationIdentifier().getDatabaseName();
+    String parentTableId = carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
+        .getRelationIdentifier().getTableId();
+    return new CarbonTableIdentifier(parentDatabaseName, parentTableName, parentTableId);
+  }
+
   /**
    * Below method will used to get the method will be used to get the measure
    * block indexes to be read from the file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 1a91272..5dda9e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -66,6 +66,13 @@ public class SessionParams implements Serializable {
     return sProps.get(key);
   }
 
+  public String getProperty(String key, String defaultValue) {
+    if (!sProps.containsKey(key)) {
+      return defaultValue;
+    }
+    return sProps.get(key);
+  }
+
   /**
    * This method will be used to add a new property
    *
@@ -172,6 +179,8 @@ public class SessionParams implements Serializable {
           if (!isValid) {
             throw new InvalidConfigurationException("Invalid CARBON_INPUT_SEGMENT_IDs");
           }
+        } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
+          isValid = true;
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");
@@ -180,6 +189,10 @@ public class SessionParams implements Serializable {
     return isValid;
   }
 
+  public void removeProperty(String property) {
+    sProps.remove(property);
+  }
+
   /**
    * clear the set properties
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 8d9bddc..e19ab24 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -49,7 +49,7 @@ public class CarbonIndexFileMergeWriter {
         return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT);
       }
     });
-    if (indexFiles.length > 0) {
+    if (indexFiles != null && indexFiles.length > 0) {
       SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
       fileStore.readAllIIndexOfSegment(segmentPath);
       openThriftWriter(segmentPath + "/" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
index 01ecb04..321ddd5 100644
--- a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
+++ b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
@@ -53,7 +53,7 @@ public class OperationListenerBus {
    * @param eventClass
    * @param operationEventListener
    */
-  public void addListener(Class<? extends Event> eventClass,
+  public OperationListenerBus addListener(Class<? extends Event> eventClass,
       OperationEventListener operationEventListener) {
 
     String eventType = eventClass.getName();
@@ -63,6 +63,7 @@ public class OperationListenerBus {
       eventMap.put(eventType, operationEventListeners);
     }
     operationEventListeners.add(operationEventListener);
+    return INSTANCE;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 57359fc..92ef6da 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -263,6 +263,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
   }
 
+  public static void setAggeragateTableSegments(Configuration configuration, String segments) {
+    configuration.set(CarbonCommonConstants.CARBON_INPUT_SEGMENTS, segments);
+  }
+
+  private static String getAggeragateTableSegments(Configuration configuration) {
+    return configuration.get(CarbonCommonConstants.CARBON_INPUT_SEGMENTS);
+  }
+
   /**
    * get list of segment to access
    */
@@ -300,6 +308,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+    if (null == carbonTable) {
+      throw new IOException("Missing/Corrupt schema file for table.");
+    }
+    String aggregateTableSegments = getAggeragateTableSegments(job.getConfiguration());
     TableDataMap blockletMap =
         DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
             BlockletDataMapFactory.class.getName());
@@ -352,6 +365,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       if (invalidSegments.size() > 0) {
         blockletMap.clear(invalidSegments);
       }
+    } else {
+      filteredSegmentToAccess = Arrays.asList(aggregateTableSegments.split(","));
     }
 
     // Clean the updated segments from memory if the update happens on segments
@@ -376,12 +391,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
-    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     TableProvider tableProvider = new SingleTableProvider(carbonTable);
     // this will be null in case of corrupt schema file.
-    if (null == carbonTable) {
-      throw new IOException("Missing/Corrupt schema file for table.");
-    }
     PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/resources/sample.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/sample.csv b/integration/spark-common-test/src/test/resources/sample.csv
index 7c57de7..06985e8 100644
--- a/integration/spark-common-test/src/test/resources/sample.csv
+++ b/integration/spark-common-test/src/test/resources/sample.csv
@@ -3,3 +3,5 @@ id,name,city,age
 2,eason,shenzhen,27
 3,jarry,wuhan,35
 3,jarry,Bangalore,35
+4,kunal,Delhi,26
+4,vishal,Bangalore,29

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index b90a5ea..23d1292 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -65,7 +65,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
     checkAnswer(
       sql("SELECT COUNT(*) FROM loadtest"),
-      Seq(Row(4))
+      Seq(Row(6))
     )
   }
 
@@ -74,7 +74,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
     checkAnswer(
       sql("SELECT COUNT(*) FROM loadtest"),
-      Seq(Row(8))
+      Seq(Row(10))
     )
   }
 
@@ -83,7 +83,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
     checkAnswer(
       sql("SELECT COUNT(*) FROM loadtest"),
-      Seq(Row(12))
+      Seq(Row(14))
     )
   }
 
@@ -92,7 +92,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
     checkAnswer(
       sql("SELECT COUNT(*) FROM loadtest"),
-      Seq(Row(16))
+      Seq(Row(18))
     )
   }
 
@@ -101,7 +101,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest options ('delimiter'='\\017')")
     checkAnswer(
       sql("SELECT COUNT(*) FROM loadtest"),
-      Seq(Row(20))
+      Seq(Row(22))
     )
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
index 5211e9e..51e84d4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
@@ -44,7 +44,7 @@ class TestLoadDataWithAutoLoadMerge extends QueryTest with BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge")
     checkAnswer(
       sql("SELECT COUNT(*) FROM automerge"),
-      Seq(Row(4))
+      Seq(Row(6))
     )
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
index e92a7fd..ff415ae 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
@@ -87,7 +87,7 @@ class TestLoadDataWithYarnLocalDirs extends QueryTest with BeforeAndAfterAll {
     disableMultipleDir
 
     checkAnswer(sql("select id from carbontable_yarnLocalDirs"),
-      Seq(Row(1), Row(2), Row(3), Row(3)))
+      Seq(Row(1), Row(2), Row(3), Row(3), Row(4), Row(4)))
 
     cleanUpYarnLocalDir
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
new file mode 100644
index 0000000..0c65577
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
+  
+  val testData = s"$resourcesPath/sample.csv"
+  
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS maintable")
+  }
+
+  private def createAllAggregateTables(parentTableName: String): Unit = {
+    sql(
+      s"""create table ${ parentTableName }_preagg_sum stored BY 'carbondata' tblproperties
+         |('parent'='$parentTableName') as select id,sum(age) from $parentTableName group by id"""
+        .stripMargin)
+    sql(
+      s"""create table ${ parentTableName }_preagg_avg stored BY 'carbondata' tblproperties
+         |('parent'='$parentTableName') as select id,avg(age) from $parentTableName group by id"""
+        .stripMargin)
+    sql(
+      s"""create table ${ parentTableName }_preagg_count stored BY 'carbondata' tblproperties
+         |('parent'='$parentTableName') as select id,count(age) from $parentTableName group by id"""
+        .stripMargin)
+    sql(
+      s"""create table ${ parentTableName }_preagg_min stored BY 'carbondata' tblproperties
+         |('parent'='$parentTableName') as select id,min(age) from $parentTableName group by id"""
+        .stripMargin)
+    sql(
+      s"""create table ${ parentTableName }_preagg_max stored BY 'carbondata' tblproperties
+         |('parent'='$parentTableName') as select id,max(age) from $parentTableName group by id"""
+        .stripMargin)
+  }
+
+  test("test load into main table with pre-aggregate table") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    createAllAggregateTables("maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    checkAnswer(sql(s"select * from maintable_preagg_sum"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    checkAnswer(sql(s"select * from maintable_preagg_avg"),
+      Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2)))
+    checkAnswer(sql(s"select * from maintable_preagg_count"),
+      Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+    checkAnswer(sql(s"select * from maintable_preagg_min"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
+    checkAnswer(sql(s"select * from maintable_preagg_max"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
+    sql("drop table if exists maintable")
+  }
+
+  test("test load into main table with pre-aggregate table with dictionary_include") {
+    sql("drop table if exists maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
+      """.stripMargin)
+    createAllAggregateTables("maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    checkAnswer(sql(s"select * from maintable_preagg_sum"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    checkAnswer(sql(s"select * from maintable_preagg_avg"),
+      Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55,2)))
+    checkAnswer(sql(s"select * from maintable_preagg_count"),
+      Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+    checkAnswer(sql(s"select * from maintable_preagg_min"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
+    checkAnswer(sql(s"select * from maintable_preagg_max"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
+    sql("drop table if exists maintable")
+  }
+
+  test("test load into main table with pre-aggregate table with single_pass") {
+    sql("drop table if exists maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
+      """.stripMargin)
+    createAllAggregateTables("maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable options('single_pass'='true')")
+    checkAnswer(sql(s"select * from maintable_preagg_sum"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    checkAnswer(sql(s"select * from maintable_preagg_avg"),
+      Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55,2)))
+    checkAnswer(sql(s"select * from maintable_preagg_count"),
+      Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+    checkAnswer(sql(s"select * from maintable_preagg_min"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
+    checkAnswer(sql(s"select * from maintable_preagg_max"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
+    sql("drop table if exists maintable")
+  }
+
+  test("test load into main table with incremental load") {
+    sql("drop table if exists maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
+      """.stripMargin)
+    createAllAggregateTables("maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    checkAnswer(sql(s"select * from maintable_preagg_sum"),
+      Seq(Row(1, 31),
+        Row(2, 27),
+        Row(3, 70),
+        Row(4, 55),
+        Row(1, 31),
+        Row(2, 27),
+        Row(3, 70),
+        Row(4, 55)))
+    checkAnswer(sql(s"select * from maintable_preagg_avg"),
+      Seq(Row(1, 31, 1),
+        Row(2, 27, 1),
+        Row(3, 70, 2),
+        Row(4, 55, 2),
+        Row(1, 31, 1),
+        Row(2, 27, 1),
+        Row(3, 70, 2),
+        Row(4, 55, 2)))
+    checkAnswer(sql(s"select * from maintable_preagg_count"),
+      Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2), Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+    checkAnswer(sql(s"select * from maintable_preagg_min"),
+      Seq(Row(1, 31),
+        Row(2, 27),
+        Row(3, 35),
+        Row(4, 26),
+        Row(1, 31),
+        Row(2, 27),
+        Row(3, 35),
+        Row(4, 26)))
+    checkAnswer(sql(s"select * from maintable_preagg_max"),
+      Seq(Row(1, 31),
+        Row(2, 27),
+        Row(3, 35),
+        Row(4, 29),
+        Row(1, 31),
+        Row(2, 27),
+        Row(3, 35),
+        Row(4, 29)))
+    sql("drop table if exists maintable")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala
index 485b94b..b82a0af 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala
@@ -182,6 +182,7 @@ class TestDeleteTableNewDDL extends QueryTest with BeforeAndAfterAll {
 
   test("drop table and create table with dictionary exclude string scenario") {
     try {
+
       sql("create database test")
       sql(
         "CREATE table test.dropTableTest3 (ID int, date String, country String, name " +
@@ -242,6 +243,7 @@ class TestDeleteTableNewDDL extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists dropTableTest4")
     sql("drop table if exists table1")
     sql("drop table if exists table2")
+    sql("drop database if exists test cascade")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 5e46417..52a31a9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
 import org.apache.carbondata.core.statusmanager.FileFormat
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, DataTypeUtil, TaskMetricsMap}
+import org.apache.carbondata.core.util._
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
@@ -357,6 +357,17 @@ class CarbonScanRDD(
       CarbonTableInputFormat
         .setSegmentsToAccess(conf, segmentNumbersFromProperty.split(",").toList.asJava)
     }
+    val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (carbonSessionInfo != null) {
+      CarbonTableInputFormat.setAggeragateTableSegments(conf, carbonSessionInfo.getSessionParams
+        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                     identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+                     identifier.getCarbonTableIdentifier.getTableName, ""))
+      CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+          .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+                       identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+                       identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+    }
     format
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1083669..9899be1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -265,8 +265,7 @@ object CarbonDataRDDFactory {
       result: Option[DictionaryServer],
       overwriteTable: Boolean,
       dataFrame: Option[DataFrame] = None,
-      updateModel: Option[UpdateTableModel] = None
-  ): Unit = {
+      updateModel: Option[UpdateTableModel] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val operationContext = new OperationContext
     // for handling of the segment Merging.
@@ -350,7 +349,7 @@ object CarbonDataRDDFactory {
               if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
                 loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
               } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-                         !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+                  !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
                 loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
               }
           }
@@ -464,7 +463,8 @@ object CarbonDataRDDFactory {
         throw new Exception(status(0)._2._2.errorMsg)
       }
       // if segment is empty then fail the data load
-      if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
+      if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPreAggregateTable &&
+          !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
         // update the load entry in table status file for changing the status to failure
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)
         LOGGER.info("********starting clean up**********")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 18f76d1..2671aad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -34,10 +34,11 @@ import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.scan.executor.util.QueryUtil
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
@@ -271,11 +272,24 @@ case class CarbonDictionaryDecoder(
       case (tableName, columnIdentifier, carbonDimension) =>
         if (columnIdentifier != null) {
           try {
+            val (newCarbonTableIdentifier, newColumnIdentifier) =
+              if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
+                  !carbonDimension
+                    .getColumnSchema.getParentColumnTableRelations.isEmpty) {
+                (QueryUtil.getTableIdentifierForColumn(carbonDimension),
+                  new ColumnIdentifier(carbonDimension.getColumnSchema
+                    .getParentColumnTableRelations.get(0).getColumnId,
+                    carbonDimension.getColumnProperties,
+                    carbonDimension.getDataType))
+              } else {
+                (atiMap(tableName).getCarbonTableIdentifier, columnIdentifier)
+              }
             val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(
-              atiMap(tableName).getCarbonTableIdentifier,
-              columnIdentifier, carbonDimension.getDataType,
-              CarbonStorePath.getCarbonTablePath(atiMap(tableName)))
-            allDictIdentifiers += dictionaryColumnUniqueIdentifier;
+              newCarbonTableIdentifier,
+              newColumnIdentifier, carbonDimension.getDataType,
+              CarbonStorePath
+                .getCarbonTablePath(atiMap(tableName).getStorePath, newCarbonTableIdentifier))
+            allDictIdentifiers += dictionaryColumnUniqueIdentifier
             new ForwardDictionaryWrapper(
               storePath,
               dictionaryColumnUniqueIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 222c30d..b28ec10 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -152,6 +152,10 @@ case class LoadTableCommand(
           LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
           carbonLoadModel.setUseOnePass(false)
         }
+        // if table is an aggregate table then disable single pass.
+        if (carbonLoadModel.isAggLoadRequest) {
+          carbonLoadModel.setUseOnePass(false)
+        }
         // Create table and metadata folders if not exist
         val carbonTablePath = CarbonStorePath
           .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index e42e933..b952285 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -115,6 +115,11 @@ case class CreatePreAggregateTableCommand(
             .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
           // upadting the parent table about child table
           PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
+          val loadAvailable = PreAggregateUtil
+            .checkMainTableLoad(parentTable)
+          if (loadAvailable) {
+            sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString")
+          }
         } catch {
           case e: Exception =>
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
deleted file mode 100644
index 7127c46..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.preaaggregate
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.execution.command.CarbonDropTableCommand
-
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.events.{DropTablePostEvent, Event, OperationContext, OperationEventListener}
-
-class DropPreAggregateTablePostListener extends OperationEventListener {
-
-  /**
-   * Called on a specified event occurrence
-   *
-   * @param event
-   */
-  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
-    val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
-    val carbonTable = dropPostEvent.carbonTable
-    val sparkSession = dropPostEvent.sparkSession
-    if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList != null &&
-        !carbonTable.get.getTableInfo.getDataMapSchemaList.isEmpty) {
-      val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
-      for (childSchema: DataMapSchema <- childSchemas.asScala) {
-        CarbonDropTableCommand(ifExistsSet = true,
-          Some(childSchema.getRelationIdentifier.getDatabaseName),
-          childSchema.getRelationIdentifier.getTableName).run(sparkSession)
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
new file mode 100644
index 0000000..b507856
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.CarbonSession
+import org.apache.spark.sql.execution.command.CarbonDropTableCommand
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.events.{DropTablePostEvent, Event, LoadTablePostExecutionEvent, OperationContext, OperationEventListener}
+
+object DropPreAggregateTablePostListener extends OperationEventListener {
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
+    val carbonTable = dropPostEvent.carbonTable
+    val sparkSession = dropPostEvent.sparkSession
+    if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList != null &&
+        !carbonTable.get.getTableInfo.getDataMapSchemaList.isEmpty) {
+      val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
+      for (childSchema: DataMapSchema <- childSchemas.asScala) {
+        CarbonDropTableCommand(ifExistsSet = true,
+          Some(childSchema.getRelationIdentifier.getDatabaseName),
+          childSchema.getRelationIdentifier.getTableName).run(sparkSession)
+      }
+    }
+
+  }
+}
+
+object LoadPostAggregateListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val loadEvent = event.asInstanceOf[LoadTablePostExecutionEvent]
+    val sparkSession = loadEvent.sparkSession
+    val carbonLoadModel = loadEvent.carbonLoadModel
+    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    if (!table.getTableInfo.getDataMapSchemaList.isEmpty) {
+      for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) {
+        CarbonSession
+          .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                     carbonLoadModel.getDatabaseName + "." +
+                     carbonLoadModel.getTableName,
+            carbonLoadModel.getSegmentId)
+        CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+                                carbonLoadModel.getDatabaseName + "." +
+                                carbonLoadModel.getTableName, "false")
+        val childTableName = dataMapSchema.getRelationIdentifier.getTableName
+        val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
+        val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY")
+        sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index fd0e543..b35b525 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
 import org.apache.spark.sql.types.DataType
 
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.TableInfo
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -428,4 +429,24 @@ object PreAggregateUtil {
           thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }
+
+  def getChildCarbonTable(databaseName: String, tableName: String)
+    (sparkSession: SparkSession): Option[CarbonTable] = {
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    metaStore.getTableFromMetadataCache(databaseName, tableName) match {
+      case Some(tableMeta) => Some(tableMeta.carbonTable)
+      case None => try {
+        Some(metaStore.lookupRelation(Some(databaseName), tableName)(sparkSession)
+          .asInstanceOf[CarbonRelation].metaData.carbonTable)
+      } catch {
+        case _: Exception =>
+          None
+      }
+    }
+  }
+
+  def checkMainTableLoad(carbonTable: CarbonTable): Boolean = {
+    SegmentStatusManager.readLoadMetadata(
+      carbonTable.getMetaDataFilepath).nonEmpty
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index f61ab84..ba7e1eb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.hive
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, ExprId, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, _}
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -57,8 +58,16 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
           .DEFAULT_MAX_NUMBER_OF_COLUMNS
         )
     }
-    if (child.output.size >= relation.carbonRelation.output.size) {
-      val newChildOutput = child.output.zipWithIndex.map { columnWithIndex =>
+    val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
+      .getParentRelationIdentifiers.isEmpty
+    // transform logical plan if the load is for aggregate table.
+    val childPlan = if (isAggregateTable) {
+      transformAggregatePlan(child)
+    } else {
+      child
+    }
+    if (childPlan.output.size >= relation.carbonRelation.output.size) {
+      val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
         columnWithIndex._1 match {
           case attr: Alias =>
             Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
@@ -67,16 +76,54 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
           case attr => attr
         }
       }
-      val newChild: LogicalPlan = if (newChildOutput == child.output) {
+      val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
         p.child
       } else {
-        Project(newChildOutput, child)
+        Project(newChildOutput, childPlan)
       }
       InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
     } else {
       sys.error("Cannot insert into target table because column number are different")
     }
   }
+
+  /**
+   * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
+   *
+   * @param logicalPlan
+   * @return
+   */
+  private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
+    logicalPlan transform {
+      case aggregate@Aggregate(_, aExp, _) =>
+        val newExpressions = aExp flatMap {
+          case alias@Alias(attrExpression: AggregateExpression, _) =>
+            attrExpression.aggregateFunction flatMap {
+              case Average(attr: AttributeReference) =>
+                Seq(Alias(attrExpression
+                  .copy(aggregateFunction = Sum(attr.withName(attr.name)),
+                    resultId = NamedExpression.newExprId),
+                  attr.name)(),
+                  Alias(attrExpression
+                    .copy(aggregateFunction = Count(attr.withName(attr.name)),
+                      resultId = NamedExpression.newExprId), attr.name)())
+              case Average(Cast(attr: AttributeReference, _)) =>
+                Seq(Alias(attrExpression
+                  .copy(aggregateFunction = Sum(attr.withName(attr.name)),
+                    resultId = NamedExpression.newExprId),
+                  attr.name)(),
+                  Alias(attrExpression
+                    .copy(aggregateFunction = Count(attr.withName(attr.name)),
+                      resultId = NamedExpression.newExprId), attr.name)())
+              case _: DeclarativeAggregate => Seq(alias)
+              case _ => Nil
+            }
+          case namedExpr: NamedExpression => Seq(namedExpr)
+        }
+        aggregate.copy(aggregateExpressions = newExpressions)
+      case plan: LogicalPlan => plan
+    }
+  }
 }
 
 case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 97ea7f8..d17dd11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.execution.command.preaaggregate.DropPreAggregateTablePostListener
+import org.apache.spark.sql.execution.command.preaaggregate.{DropPreAggregateTablePostListener, LoadPostAggregateListener}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.internal.SQLConf
@@ -35,7 +35,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.events.{DropTablePostEvent, Event, OperationListenerBus}
+import org.apache.carbondata.events.{DropTablePostEvent, LoadTablePostExecutionEvent, OperationListenerBus}
 
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
@@ -130,7 +130,8 @@ object CarbonSessionState {
 
   def init(): Unit = {
     OperationListenerBus.getInstance()
-      .addListener(classOf[DropTablePostEvent], new DropPreAggregateTablePostListener)
+      .addListener(classOf[DropTablePostEvent], DropPreAggregateTablePostListener)
+      .addListener(classOf[LoadTablePostExecutionEvent], LoadPostAggregateListener)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index f2c8a0a..7d25efd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -83,6 +84,8 @@ object CarbonSetCommand {
           "property should be in \" carbon.input.segments.<database_name>" +
           ".<table_name>=<seg_id list> \" format.")
       }
+    } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
+      sessionParams.addProperty(key.toLowerCase(), value)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
index 2671393..7045101 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
@@ -37,7 +37,6 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
@@ -68,15 +67,12 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
       DictionaryClient client, boolean useOnePass, String storePath,
-      Map<Object, Integer> localCache, boolean isEmptyBadRecord) throws IOException {
+      Map<Object, Integer> localCache, boolean isEmptyBadRecord,
+      DictionaryColumnUniqueIdentifier identifier) throws IOException {
     this.index = index;
     this.carbonDimension = (CarbonDimension) dataField.getColumn();
     this.nullFormat = nullFormat;
     this.isEmptyBadRecord = isEmptyBadRecord;
-    DictionaryColumnUniqueIdentifier identifier =
-        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
-            dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(),
-            CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
 
     // if use one pass, use DictionaryServerClientDictionary
     if (useOnePass) {