You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/25 17:37:41 UTC

[04/15] carbondata git commit: [CARBONDATA-1286] Change Query related RDD to use TableInfo

[CARBONDATA-1286] Change Query related RDD to use TableInfo

This closes #1146


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ac5aee18
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ac5aee18
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ac5aee18

Branch: refs/heads/master
Commit: ac5aee187f4f1c3332226435ac9d1aa7c396ae29
Parents: 0158968
Author: jackylk <ja...@huawei.com>
Authored: Sat Jul 8 17:34:44 2017 +0800
Committer: Raghunandan S <ca...@gmail.com>
Committed: Tue Jul 25 17:35:09 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/CarbonMetadata.java           |   3 +-
 .../ThriftWrapperSchemaConverterImpl.java       |   8 +-
 .../core/metadata/datatype/DataType.java        |  44 +++++++-
 .../core/metadata/encoder/Encoding.java         |  20 ++++
 .../core/metadata/schema/SchemaEvolution.java   |   1 +
 .../core/metadata/schema/table/CarbonTable.java |  96 ++++-------------
 .../core/metadata/schema/table/TableInfo.java   | 106 +++++++++++++++----
 .../core/metadata/schema/table/TableSchema.java |  29 ++++-
 .../core/metadata/schema/table/Writable.java    |  31 ++++++
 .../metadata/schema/table/WritableUtil.java     |  45 ++++++++
 .../schema/table/column/ColumnSchema.java       |  84 ++++++++++++---
 .../dictionary/client/DictionaryClientTest.java |   3 +-
 ...ncrementalColumnDictionaryGeneratorTest.java |   3 +-
 .../ServerDictionaryGeneratorTest.java          |   3 +-
 .../generator/TableDictionaryGeneratorTest.java |   3 +-
 .../core/metadata/CarbonMetadataTest.java       |   6 +-
 .../ThriftWrapperSchemaConverterImplTest.java   |   9 +-
 .../metadata/schema/table/CarbonTableTest.java  |   3 +-
 .../table/CarbonTableWithComplexTypesTest.java  |   3 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |  90 ++++++++--------
 .../hadoop/ft/CarbonInputMapperTest.java        |   2 +-
 .../hadoop/test/util/StoreCreator.java          |   1 -
 .../testsuite/createTable/TestTableIdTest.scala |  75 -------------
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |  17 +++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  14 +--
 .../spark/rdd/DataManagementFunc.scala          |   1 -
 .../execution/command/carbonTableSchema.scala   |   1 -
 .../scala/org/apache/spark/sql/CarbonScan.scala |   4 +-
 .../execution/command/carbonTableSchema.scala   |   2 -
 .../sql/CarbonDatasourceHadoopRelation.scala    |  42 +++++---
 .../spark/sql/CarbonDictionaryDecoder.scala     |  15 ++-
 .../execution/CarbonLateDecodeStrategy.scala    |  19 ++--
 .../sql/execution/command/IUDCommands.scala     |   8 +-
 .../execution/command/carbonTableSchema.scala   |   2 -
 .../processing/model/CarbonLoadModel.java       |   7 --
 .../carbondata/processing/StoreCreator.java     |   1 -
 36 files changed, 481 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 079540f..75fe78b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -78,8 +78,7 @@ public final class CarbonMetadata {
     CarbonTable carbonTable = tableInfoMap.get(convertToLowerCase(tableInfo.getTableUniqueName()));
     if (null == carbonTable || carbonTable.getTableLastUpdatedTime() < tableInfo
         .getLastUpdatedTime()) {
-      carbonTable = new CarbonTable();
-      carbonTable.loadCarbonTable(tableInfo);
+      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
       tableInfoMap.put(convertToLowerCase(tableInfo.getTableUniqueName()), carbonTable);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 235a7ba..745f477 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -272,12 +272,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
 
     org.apache.carbondata.format.TableSchema thriftFactTable =
         fromWrapperToExternalTableSchema(wrapperTableInfo.getFactTable());
-    List<org.apache.carbondata.format.TableSchema> thriftAggTables =
-        new ArrayList<org.apache.carbondata.format.TableSchema>();
-    for (TableSchema wrapperAggTableSchema : wrapperTableInfo.getAggregateTableList()) {
-      thriftAggTables.add(fromWrapperToExternalTableSchema(wrapperAggTableSchema));
-    }
-    return new org.apache.carbondata.format.TableInfo(thriftFactTable, thriftAggTables);
+    return new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
+        .carbondata.format.TableSchema>());
   }
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index 9371451..e97cce0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -46,8 +46,8 @@ public enum DataType {
   // size of the value of this data type, negative value means variable length
   private int sizeInBytes;
 
-  DataType(int value ,String name, int sizeInBytes) {
-    this.precedenceOrder = value;
+  DataType(int precedenceOrder, String name, int sizeInBytes) {
+    this.precedenceOrder = precedenceOrder;
     this.name = name;
     this.sizeInBytes = sizeInBytes;
   }
@@ -74,4 +74,44 @@ public enum DataType {
     }
     return (int) (Math.log(getSizeInBytes()) / Math.log(2));
   }
+
+  public static DataType valueOf(int ordinal) {
+    if (ordinal == STRING.ordinal()) {
+      return STRING;
+    } else if (ordinal == DATE.ordinal()) {
+      return DATE;
+    } else if (ordinal == TIMESTAMP.ordinal()) {
+      return TIMESTAMP;
+    } else if (ordinal == BOOLEAN.ordinal()) {
+      return BOOLEAN;
+    } else if (ordinal == SHORT.ordinal()) {
+      return SHORT;
+    } else if (ordinal == INT.ordinal()) {
+      return INT;
+    } else if (ordinal == FLOAT.ordinal()) {
+      return FLOAT;
+    } else if (ordinal == LONG.ordinal()) {
+      return LONG;
+    } else if (ordinal == DOUBLE.ordinal()) {
+      return DOUBLE;
+    } else if (ordinal == NULL.ordinal()) {
+      return NULL;
+    } else if (ordinal == DECIMAL.ordinal()) {
+      return DECIMAL;
+    } else if (ordinal == ARRAY.ordinal()) {
+      return ARRAY;
+    } else if (ordinal == STRUCT.ordinal()) {
+      return STRUCT;
+    } else if (ordinal == MAP.ordinal()) {
+      return MAP;
+    } else if (ordinal == BYTE.ordinal()) {
+      return BYTE;
+    } else if (ordinal == BYTE_ARRAY.ordinal()) {
+      return BYTE_ARRAY;
+    } else if (ordinal == SHORT_INT.ordinal()) {
+      return SHORT_INT;
+    } else {
+      throw new RuntimeException("create DataType with invalid ordinal: " + ordinal);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
index fe24975..e3d7a9a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
@@ -27,4 +27,24 @@ public enum Encoding {
   BIT_PACKED,
   DIRECT_DICTIONARY,
   IMPLICIT;
+
+  public static Encoding valueOf(int ordinal) {
+    if (ordinal == DICTIONARY.ordinal()) {
+      return DICTIONARY;
+    } else if (ordinal == DELTA.ordinal()) {
+      return DELTA;
+    } else if (ordinal == RLE.ordinal()) {
+      return RLE;
+    } else if (ordinal == INVERTED_INDEX.ordinal()) {
+      return INVERTED_INDEX;
+    } else if (ordinal == BIT_PACKED.ordinal()) {
+      return BIT_PACKED;
+    } else if (ordinal == DIRECT_DICTIONARY.ordinal()) {
+      return DIRECT_DICTIONARY;
+    } else if (ordinal == IMPLICIT.ordinal()) {
+      return IMPLICIT;
+    } else {
+      throw new RuntimeException("create Encoding with invalid ordinal: " + ordinal);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java
index 4bdbe8d..6960736 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaEvolution.java
@@ -47,4 +47,5 @@ public class SchemaEvolution implements Serializable {
   public void setSchemaEvolutionEntryList(List<SchemaEvolutionEntry> schemaEvolutionEntryList) {
     this.schemaEvolutionEntryList = schemaEvolutionEntryList;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index cec8007..01b3022 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -20,8 +20,6 @@ package org.apache.carbondata.core.metadata.schema.table;
 import java.io.Serializable;
 import java.util.*;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -40,15 +38,14 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 public class CarbonTable implements Serializable {
 
   /**
-   * serialization id
+   * the cached table info
    */
-  private static final long serialVersionUID = 8696507171227156445L;
+  private TableInfo tableInfo;
 
   /**
-   * Attribute for Carbon table LOGGER
+   * serialization id
    */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonTable.class.getName());
+  private static final long serialVersionUID = 8696507171227156445L;
 
   /**
    * Absolute table identifier
@@ -103,11 +100,6 @@ public class CarbonTable implements Serializable {
   private String tableUniqueName;
 
   /**
-   * Aggregate tables name
-   */
-  private List<String> aggregateTablesName;
-
-  /**
    * metadata file path (check if it is really required )
    */
   private String metaDataFilepath;
@@ -132,13 +124,12 @@ public class CarbonTable implements Serializable {
    */
   private int numberOfNoDictSortColumns;
 
-  public CarbonTable() {
+  private CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>();
     this.tableBucketMap = new HashMap<>();
     this.tablePartitionMap = new HashMap<>();
-    this.aggregateTablesName = new ArrayList<String>();
     this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
     this.tablePrimitiveDimensionsMap = new HashMap<String, List<CarbonDimension>>();
   }
@@ -146,39 +137,26 @@ public class CarbonTable implements Serializable {
   /**
    * @param tableInfo
    */
-  public void loadCarbonTable(TableInfo tableInfo) {
-    this.blockSize = getTableBlockSizeInMB(tableInfo);
-    this.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
-    this.tableUniqueName = tableInfo.getTableUniqueName();
-    this.metaDataFilepath = tableInfo.getMetaDataFilepath();
-    //setting unique table identifier
-    CarbonTableIdentifier carbontableIdentifier =
-        new CarbonTableIdentifier(tableInfo.getDatabaseName(),
-            tableInfo.getFactTable().getTableName(), tableInfo.getFactTable().getTableId());
-    this.absoluteTableIdentifier =
-        new AbsoluteTableIdentifier(tableInfo.getStorePath(), carbontableIdentifier);
-
-    fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
-    fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
-    List<TableSchema> aggregateTableList = tableInfo.getAggregateTableList();
-    for (TableSchema aggTable : aggregateTableList) {
-      this.aggregateTablesName.add(aggTable.getTableName());
-      fillDimensionsAndMeasuresForTables(aggTable);
-      if (aggTable.getBucketingInfo() != null) {
-        tableBucketMap.put(aggTable.getTableName(), aggTable.getBucketingInfo());
-      }
-      if (aggTable.getPartitionInfo() != null) {
-        tablePartitionMap.put(aggTable.getTableName(), aggTable.getPartitionInfo());
-      }
-    }
+  public static CarbonTable buildFromTableInfo(TableInfo tableInfo) {
+    CarbonTable table = new CarbonTable();
+    table.tableInfo = tableInfo;
+    table.blockSize = tableInfo.getTableBlockSizeInMB();
+    table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
+    table.tableUniqueName = tableInfo.getTableUniqueName();
+    table.metaDataFilepath = tableInfo.getMetaDataFilepath();
+    table.absoluteTableIdentifier = tableInfo.getOrCreateAbsoluteTableIdentifier();
+
+    table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
+    table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
     if (tableInfo.getFactTable().getBucketingInfo() != null) {
-      tableBucketMap.put(tableInfo.getFactTable().getTableName(),
+      table.tableBucketMap.put(tableInfo.getFactTable().getTableName(),
           tableInfo.getFactTable().getBucketingInfo());
     }
     if (tableInfo.getFactTable().getPartitionInfo() != null) {
-      tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
+      table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
           tableInfo.getFactTable().getPartitionInfo());
     }
+    return table;
   }
 
   /**
@@ -208,29 +186,6 @@ public class CarbonTable implements Serializable {
     this.createOrderColumn.put(tableName, columns);
   }
 
-  /**
-   * This method will return the table size. Default table block size will be considered
-   * in case not specified by the user
-   *
-   * @param tableInfo
-   * @return
-   */
-  private int getTableBlockSizeInMB(TableInfo tableInfo) {
-    String tableBlockSize = null;
-    // In case of old store there will not be any map for table properties so table properties
-    // will be null
-    Map<String, String> tableProperties = tableInfo.getFactTable().getTableProperties();
-    if (null != tableProperties) {
-      tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE);
-    }
-    if (null == tableBlockSize) {
-      tableBlockSize = CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL;
-      LOGGER.info("Table block size not specified for " + tableInfo.getTableUniqueName()
-          + ". Therefore considering the default value "
-          + CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL + " MB");
-    }
-    return Integer.parseInt(tableBlockSize);
-  }
 
   /**
    * Fill allDimensions and allMeasures for carbon table
@@ -410,13 +365,6 @@ public class CarbonTable implements Serializable {
   }
 
   /**
-   * @return list of aggregate TablesName
-   */
-  public List<String> getAggregateTablesName() {
-    return aggregateTablesName;
-  }
-
-  /**
    * @return the tableLastUpdatedTime
    */
   public long getTableLastUpdatedTime() {
@@ -699,8 +647,6 @@ public class CarbonTable implements Serializable {
     return sort_columsList;
   }
 
-
-
   public int getNumberOfSortColumns() {
     return numberOfSortColumns;
   }
@@ -708,4 +654,8 @@ public class CarbonTable implements Serializable {
   public int getNumberOfNoDictSortColumns() {
     return numberOfNoDictSortColumns;
   }
+
+  public TableInfo getTableInfo() {
+    return tableInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 328928d..78cd97b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -16,17 +16,30 @@
  */
 package org.apache.carbondata.core.metadata.schema.table;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 
 /**
  * Store the information about the table.
  * it stores the fact table as well as aggregate table present in the schema
  */
-public class TableInfo implements Serializable {
+public class TableInfo implements Serializable, Writable {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(TableInfo.class.getName());
 
   /**
    * serialization version
@@ -49,11 +62,6 @@ public class TableInfo implements Serializable {
   private TableSchema factTable;
 
   /**
-   * list of aggregate table
-   */
-  private List<TableSchema> aggregateTableList;
-
-  /**
    * last updated time to update the table if any changes
    */
   private long lastUpdatedTime;
@@ -68,8 +76,10 @@ public class TableInfo implements Serializable {
    */
   private String storePath;
 
+  // this idenifier is a lazy field which will be created when it is used first time
+  private AbsoluteTableIdentifier identifier;
+
   public TableInfo() {
-    aggregateTableList = new ArrayList<TableSchema>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   }
 
   /**
@@ -87,20 +97,6 @@ public class TableInfo implements Serializable {
   }
 
   /**
-   * @return the aggregateTableList
-   */
-  public List<TableSchema> getAggregateTableList() {
-    return aggregateTableList;
-  }
-
-  /**
-   * @param aggregateTableList the aggregateTableList to set
-   */
-  public void setAggregateTableList(List<TableSchema> aggregateTableList) {
-    this.aggregateTableList = aggregateTableList;
-  }
-
-  /**
    * @return the databaseName
    */
   public String getDatabaseName() {
@@ -206,4 +202,68 @@ public class TableInfo implements Serializable {
     }
     return true;
   }
+
+  /**
+   * This method will return the table size. Default table block size will be considered
+   * in case not specified by the user
+   */
+  int getTableBlockSizeInMB() {
+    String tableBlockSize = null;
+    // In case of old store there will not be any map for table properties so table properties
+    // will be null
+    Map<String, String> tableProperties = getFactTable().getTableProperties();
+    if (null != tableProperties) {
+      tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE);
+    }
+    if (null == tableBlockSize) {
+      tableBlockSize = CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL;
+      LOGGER.info("Table block size not specified for " + getTableUniqueName()
+          + ". Therefore considering the default value "
+          + CarbonCommonConstants.BLOCK_SIZE_DEFAULT_VAL + " MB");
+    }
+    return Integer.parseInt(tableBlockSize);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(databaseName);
+    out.writeUTF(tableUniqueName);
+    factTable.write(out);
+    out.writeLong(lastUpdatedTime);
+    out.writeUTF(metaDataFilepath);
+    out.writeUTF(storePath);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.databaseName = in.readUTF();
+    this.tableUniqueName = in.readUTF();
+    this.factTable = new TableSchema();
+    this.factTable.readFields(in);
+    this.lastUpdatedTime = in.readLong();
+    this.metaDataFilepath = in.readUTF();
+    this.storePath = in.readUTF();
+  }
+
+  public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
+    if (identifier == null) {
+      CarbonTableIdentifier carbontableIdentifier =
+          new CarbonTableIdentifier(databaseName, factTable.getTableName(), factTable.getTableId());
+      identifier = new AbsoluteTableIdentifier(storePath, carbontableIdentifier);
+    }
+    return identifier;
+  }
+
+  public byte[] serialize() throws IOException {
+    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+    this.write(new DataOutputStream(bao));
+    return bao.toByteArray();
+  }
+
+  public static TableInfo deserialize(byte[] bytes) throws IOException {
+    TableInfo tableInfo = new TableInfo();
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
+    tableInfo.readFields(in);
+    return tableInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index f9d848e..a396d19 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -16,6 +16,9 @@
  */
 package org.apache.carbondata.core.metadata.schema.table;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,7 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 /**
  * Persisting the table information
  */
-public class TableSchema implements Serializable {
+public class TableSchema implements Serializable, Writable {
 
   /**
    * serialization version
@@ -198,4 +201,28 @@ public class TableSchema implements Serializable {
   public void setPartitionInfo(PartitionInfo partitionInfo) {
     this.partitionInfo = partitionInfo;
   }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tableId);
+    out.writeUTF(tableName);
+    out.writeInt(listOfColumns.size());
+    for (ColumnSchema column : listOfColumns) {
+      column.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.tableId = in.readUTF();
+    this.tableName = in.readUTF();
+    int listSize = in.readInt();
+    this.listOfColumns = new ArrayList<>(listSize);
+    for (int i = 0; i < listSize; i++) {
+      ColumnSchema schema = new ColumnSchema();
+      schema.readFields(in);
+      this.listOfColumns.add(schema);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java
new file mode 100644
index 0000000..94a3b78
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/Writable.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+// The same interface as hadoop.io.Writable. We port the interface here to avoid hadoop package
+// dependency
+public interface Writable {
+
+  void write(DataOutput out) throws IOException;
+
+  void readFields(DataInput in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java
new file mode 100644
index 0000000..dbcb1a7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/WritableUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class WritableUtil {
+
+  public static void writeByteArray(DataOutput out, byte[] bytes) throws IOException {
+    if (bytes == null) {
+      out.writeInt(-1);
+    } else {
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    }
+  }
+
+  public static byte[] readByteArray(DataInput in) throws IOException {
+    int length = in.readInt();
+    if (length == -1) {
+      return null;
+    } else {
+      byte[] b = new byte[length];
+      in.readFully(b);
+      return b;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index f5b8116..3680d53 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -16,17 +16,24 @@
  */
 package org.apache.carbondata.core.metadata.schema.table.column;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.metadata.schema.table.WritableUtil;
 
 /**
  * Store the information about the column meta data present the table
  */
-public class ColumnSchema implements Serializable {
+public class ColumnSchema implements Serializable, Writable {
 
   /**
    * serialization version
@@ -37,6 +44,7 @@ public class ColumnSchema implements Serializable {
    * dataType
    */
   private DataType dataType;
+
   /**
    * Name of the column. If it is a complex data type, we follow a naming rule
    * grand_parent_column.parent_column.child_column
@@ -360,17 +368,6 @@ public class ColumnSchema implements Serializable {
   }
 
   /**
-   * @param property
-   * @return
-   */
-  public String getColumnProperty(String property) {
-    if (null != columnProperties) {
-      return columnProperties.get(property);
-    }
-    return null;
-  }
-
-  /**
    * return columnproperties
    */
   public Map<String, String> getColumnProperties() {
@@ -421,4 +418,67 @@ public class ColumnSchema implements Serializable {
   public void setSortColumn(boolean sortColumn) {
     isSortColumn = sortColumn;
   }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeShort(dataType.ordinal());
+    out.writeUTF(columnName);
+    out.writeUTF(columnUniqueId);
+    out.writeUTF(columnReferenceId);
+    if (encodingList == null) {
+      out.writeShort(0);
+    } else {
+      out.writeShort(encodingList.size());
+      for (Encoding encoding : encodingList) {
+        out.writeShort(encoding.ordinal());
+      }
+    }
+    out.writeBoolean(isDimensionColumn);
+    out.writeInt(scale);
+    out.writeInt(precision);
+    out.writeInt(schemaOrdinal);
+    out.writeInt(numberOfChild);
+    WritableUtil.writeByteArray(out, defaultValue);
+    if (columnProperties == null) {
+      out.writeShort(0);
+    } else {
+      out.writeShort(columnProperties.size());
+      for (Map.Entry<String, String> entry : columnProperties.entrySet()) {
+        out.writeUTF(entry.getKey());
+        out.writeUTF(entry.getValue());
+      }
+    }
+    out.writeBoolean(invisible);
+    out.writeBoolean(isSortColumn);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int ordinal = in.readShort();
+    this.dataType = DataType.valueOf(ordinal);
+    this.columnName = in.readUTF();
+    this.columnUniqueId = in.readUTF();
+    this.columnReferenceId = in.readUTF();
+    int encodingListSize = in.readShort();
+    this.encodingList = new ArrayList<>(encodingListSize);
+    for (int i = 0; i < encodingListSize; i++) {
+      ordinal = in.readShort();
+      encodingList.add(Encoding.valueOf(ordinal));
+    }
+    this.isDimensionColumn = in.readBoolean();
+    this.scale = in.readInt();
+    this.precision = in.readInt();
+    this.schemaOrdinal = in.readInt();
+    this.numberOfChild = in.readInt();
+    this.defaultValue = WritableUtil.readByteArray(in);
+    int mapSize = in.readShort();
+    this.columnProperties = new HashMap<>(mapSize);
+    for (int i = 0; i < mapSize; i++) {
+      String key = in.readUTF();
+      String value = in.readUTF();
+      this.columnProperties.put(key, value);
+    }
+    this.invisible = in.readBoolean();
+    this.isSortColumn = in.readBoolean();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
index 60d3c26..dc3b232 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
@@ -88,8 +88,7 @@ public class DictionaryClientTest {
     tableInfo.setDatabaseName("test");
     storePath = System.getProperty("java.io.tmpdir") + "/tmp";
     tableInfo.setStorePath(storePath);
-    CarbonTable carbonTable = new CarbonTable();
-    carbonTable.loadCarbonTable(tableInfo);
+    CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
 
     // Add the created table to metadata
     metadata.addCarbonTable(carbonTable);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java
index 49db6ce..0bac01a 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGeneratorTest.java
@@ -166,8 +166,7 @@ public class IncrementalColumnDictionaryGeneratorTest {
     System.out.print(dictPath.mkdirs());
 
     tableInfo.setStorePath(storePath);
-    CarbonTable carbonTable = new CarbonTable();
-    carbonTable.loadCarbonTable(tableInfo);
+    CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
 
     // Add the table to metadata
     metadata.addCarbonTable(carbonTable);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java
index b66331f..d8df99a 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGeneratorTest.java
@@ -83,8 +83,7 @@ public class ServerDictionaryGeneratorTest {
     tableInfo.setDatabaseName("test");
     storePath = System.getProperty("java.io.tmpdir") + "/tmp";
     tableInfo.setStorePath(storePath);
-    CarbonTable carbonTable = new CarbonTable();
-    carbonTable.loadCarbonTable(tableInfo);
+    CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
 
     // Add the created table to metadata
     metadata.addCarbonTable(carbonTable);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
index 0cb47c4..8a68b72 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
@@ -83,8 +83,7 @@ public class TableDictionaryGeneratorTest {
     tableInfo.setDatabaseName("test");
     storePath = System.getProperty("java.io.tmpdir") + "/tmp";
     tableInfo.setStorePath(storePath);
-    CarbonTable carbonTable = new CarbonTable();
-    carbonTable.loadCarbonTable(tableInfo);
+    CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
 
     // Add the created table to metadata
     metadata.addCarbonTable(carbonTable);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 56d14c2..3af0bdb 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -160,7 +160,7 @@ public class CarbonMetadataTest {
   }
 
   @Test public void testGetCarbonDimensionBasedOnColIdentifier() {
-    CarbonTable carbonTable = new CarbonTable();
+    CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
     String columnIdentifier = "1";
     final List<CarbonDimension> carbonDimensions = new ArrayList();
     ColumnSchema colSchema1 = new ColumnSchema();
@@ -186,7 +186,7 @@ public class CarbonMetadataTest {
 
   @Test
   public void testGetCarbonDimensionBasedOnColIdentifierWhenChildDimensionColumnEqualsColumnIdentifier() {
-    CarbonTable carbonTable = new CarbonTable();
+    CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
     String columnIdentifier = "9";
     final List<CarbonDimension> carbonDimensions = new ArrayList();
     ColumnSchema colSchema1 = new ColumnSchema();
@@ -226,7 +226,7 @@ public class CarbonMetadataTest {
   }
 
   @Test public void testGetCarbonDimensionBasedOnColIdentifierNullCase() {
-    CarbonTable carbonTable = new CarbonTable();
+    CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
     String columnIdentifier = "3";
     final List<CarbonDimension> carbonDimensions = new ArrayList();
     ColumnSchema colSchema1 = new ColumnSchema();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index e8851e0..3961d9c 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -1500,10 +1500,6 @@ public class ThriftWrapperSchemaConverterImplTest {
       @Mock public TableSchema getFactTable() {
         return wrapperTableSchema;
       }
-
-      @Mock public List<TableSchema> getAggregateTableList() {
-        return tableSchemas;
-      }
     };
 
     new MockUp<TableSchema>() {
@@ -1529,12 +1525,11 @@ public class ThriftWrapperSchemaConverterImplTest {
     };
     org.apache.carbondata.format.TableSchema thriftFactTable =
         new org.apache.carbondata.format.TableSchema("tableId", thriftColumnSchemas, schemaEvol);
-    List<org.apache.carbondata.format.TableSchema> thriftAggTables = new ArrayList<>();
-    thriftAggTables.add(thriftFactTable);
     org.apache.carbondata.format.TableInfo actualResult = thriftWrapperSchemaConverter
         .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName);
     org.apache.carbondata.format.TableInfo expectedResult =
-        new org.apache.carbondata.format.TableInfo(thriftFactTable, thriftAggTables);
+        new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
+            .carbondata.format.TableSchema>());
     assertEquals(expectedResult, actualResult);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
index 9ed8b56..f5ffe51 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
@@ -36,8 +36,7 @@ public class CarbonTableTest extends TestCase {
   private CarbonTable carbonTable;
 
   @BeforeClass public void setUp() {
-    carbonTable = new CarbonTable();
-    carbonTable.loadCarbonTable(getTableInfo(1000L));
+    carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
   }
 
   @AfterClass public void tearDown() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
index 9c3ab6a..69cab49 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
@@ -35,8 +35,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase {
   private CarbonTable carbonTable;
 
   @BeforeClass public void setUp() {
-    carbonTable = new CarbonTable();
-    carbonTable.loadCarbonTable(getTableInfo(1000L));
+    carbonTable = CarbonTable.buildFromTableInfo(getTableInfo(1000L));
   }
 
   @AfterClass public void tearDown() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 00e420c..6d14424 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.mutate.data.BlockMappingVO;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -99,51 +100,50 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   private static final String FILTER_PREDICATE =
       "mapreduce.input.carboninputformat.filter.predicate";
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
-  private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
 
+  // a cache for carbon table, it will be used in task side
+  private CarbonTable carbonTable;
+
   /**
-   * It is optional, if user does not set then it reads from store
-   *
-   * @param configuration
-   * @param carbonTable
-   * @throws IOException
+   * Set the `tableInfo` in `configuration`
    */
-  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
       throws IOException {
-    if (null != carbonTable) {
-      configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+    if (null != tableInfo) {
+      configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo));
     }
   }
 
-  public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
-    String carbonTableStr = configuration.get(CARBON_TABLE);
-    if (carbonTableStr == null) {
-      populateCarbonTable(configuration);
-      // read it from schema file in the store
-      carbonTableStr = configuration.get(CARBON_TABLE);
-      return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
-    }
-    return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+  /**
+   * Get TableInfo object from `configuration`
+   */
+  private TableInfo getTableInfo(Configuration configuration) throws IOException {
+    String tableInfoStr = configuration.get(TABLE_INFO);
+    return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr);
   }
 
   /**
-   * this method will read the schema from the physical file and populate into CARBON_TABLE
-   * @param configuration
-   * @throws IOException
+   * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  private static void populateCarbonTable(Configuration configuration) throws IOException {
-    String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
+  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+    if (carbonTable == null) {
+      // carbon table should be created either from deserialized table info (schema saved in
+      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+      TableInfo tableInfo = getTableInfo(configuration);
+      CarbonTable carbonTable;
+      if (tableInfo != null) {
+        carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      } else {
+        carbonTable = SchemaReader.readCarbonTableFromStore(
+            getAbsoluteTableIdentifier(configuration));
+      }
+      this.carbonTable = carbonTable;
+      return carbonTable;
+    } else {
+      return this.carbonTable;
     }
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
-    // read the schema file to get the absoluteTableIdentifier having the correct table id
-    // persisted in the schema
-    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
-    setCarbonTable(configuration, carbonTable);
   }
 
   public static void setTablePath(Configuration configuration, String tablePath)
@@ -210,13 +210,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * Set list of files to access
    */
   public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
-    configuration
-        .set(CarbonInputFormat.INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+    configuration.set(CarbonInputFormat.INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
   }
 
-  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+  private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
       throws IOException {
-    return getCarbonTable(configuration).getAbsoluteTableIdentifier();
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
+    }
+    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
   }
 
   /**
@@ -263,12 +267,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
 
       // process and resolve the expression
       Expression filter = getFilterPredicates(job.getConfiguration());
-      CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
-      // this will be null in case of corrupt schema file.
-      if (null == carbonTable) {
-        throw new IOException("Missing/Corrupt schema file for table.");
-      }
-
+      CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
       CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
       BitSet matchedPartitions = null;
       PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
@@ -358,9 +357,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     Boolean  isIUDTable = false;
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
-            getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
-    SegmentUpdateStatusManager updateStatusManager =
-            new SegmentUpdateStatusManager(absoluteTableIdentifier);
+              getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
 
     isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
 
@@ -726,7 +724,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
-    CarbonTable carbonTable = getCarbonTable(configuration);
+    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
     // getting the table absoluteTableIdentifier from the carbonTable
     // to avoid unnecessary deserialization
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index 9aa1188..86a3326 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -74,8 +74,8 @@ public class CarbonInputMapperTest extends TestCase {
       Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
       Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath));
     } catch (Exception e) {
-      Assert.assertTrue("failed", false);
       e.printStackTrace();
+      Assert.assertTrue("failed", false);
       throw e;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 88c182e..8cad313 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -270,7 +270,6 @@ public class StoreCreator {
             + absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
-    tableInfo.setAggregateTableList(new ArrayList<TableSchema>());
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
             absoluteTableIdentifier.getCarbonTableIdentifier());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
deleted file mode 100644
index a6790c8..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.createTable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.junit.Assert
-import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.hadoop.CarbonInputFormat
-import org.apache.spark.sql.test.util.QueryTest
-
-/**
- * test functionality related the case change for database name
- */
-class TestTableIdTest extends QueryTest with BeforeAndAfterAll {
-
-  override def beforeAll: Unit = {
-    sql("drop table if exists carbontable")
-  }
-
-  def validateTableId: Unit = {
-    val carbonInputFormat: CarbonInputFormat[Array[Object]] = new CarbonInputFormat[Array[Object]]
-    val jobConf: JobConf = new JobConf(new Configuration)
-    val job: Job = Job.getInstance(jobConf)
-    val storePath: String = storeLocation.replaceAll("\\\\", "/")
-    job.getConfiguration
-      .set("mapreduce.input.fileinputformat.inputdir",
-        storePath + "/default/carbontable")
-    val carbonTable: CarbonTable = CarbonInputFormat.getCarbonTable(job.getConfiguration)
-    val getAbsoluteTableIdentifier = classOf[CarbonInputFormat[Array[Object]]]
-      .getDeclaredMethod("getAbsoluteTableIdentifier", classOf[Configuration])
-    getAbsoluteTableIdentifier.setAccessible(true)
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteTableIdentifier
-      .invoke(carbonInputFormat, job.getConfiguration).asInstanceOf[AbsoluteTableIdentifier]
-
-    Assert
-      .assertEquals(carbonTable.getCarbonTableIdentifier.getTableId,
-        absoluteTableIdentifier.getCarbonTableIdentifier.getTableId)
-  }
-
-  test("test create table with database case name change") {
-
-    try {
-      // table creation should be successful
-      sql("create table carbontable(a int, b string)stored by 'carbondata'")
-      assert(true)
-    } catch {
-      case ex: Exception =>
-        assert(false)
-    }
-    validateTableId
-  }
-
-  override def afterAll {
-    sql("drop table if exists carbontable")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 48e97ed..c908c08 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,11 +17,14 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.io.{ByteArrayInputStream, DataInputStream}
+
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonSessionInfo, CarbonTaskInfo, SessionParams, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
 
 /**
@@ -47,3 +50,17 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     internalCompute(split, context)
   }
 }
+
+/**
+ * This RDD contains TableInfo object which is serialized and deserialized in driver and executor
+ */
+abstract class CarbonRDDWithTableInfo[T: ClassTag](
+    @transient sc: SparkContext,
+    @transient private var deps: Seq[Dependency[_]],
+    serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps) {
+
+  def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) =
+    this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+
+  def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo)
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 3868342..85c4bc4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark._
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hive.DistributionUtil
 
@@ -34,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
@@ -53,8 +52,9 @@ class CarbonScanRDD(
     columnProjection: CarbonProjection,
     filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
-    @transient carbonTable: CarbonTable)
-  extends CarbonRDD[InternalRow](sc, Nil) {
+    serializedTableInfo: Array[Byte],
+    @transient tableInfo: TableInfo)
+  extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   private val jobTrackerId: String = {
@@ -65,7 +65,7 @@ class CarbonScanRDD(
 
   private val readSupport = SparkReadSupport.readSupportClass
 
-  private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+  private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
 
   @transient private val jobId = new JobID(jobTrackerId, id)
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -174,6 +174,7 @@ class CarbonScanRDD(
   }
 
   override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
@@ -246,12 +247,13 @@ class CarbonScanRDD(
   }
 
   private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = {
-    CarbonInputFormat.setCarbonTable(conf, carbonTable)
+    CarbonInputFormat.setTableInfo(conf, tableInfo)
     createInputFormat(conf)
   }
 
   private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
     CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
+    CarbonInputFormat.setTableInfo(conf, getTableInfo)
     createInputFormat(conf)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 664cbae..954303f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -266,7 +266,6 @@ object DataManagementFunc {
   def prepareCarbonLoadModel(storePath: String,
       table: CarbonTable,
       newCarbonLoadModel: CarbonLoadModel): Unit = {
-    newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
     newCarbonLoadModel.setTableName(table.getFactTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ee77f35..6174f7c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -526,7 +526,6 @@ class TableNewProcessor(cm: TableModel) {
     tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName)
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
-    tableInfo.setAggregateTableList(new util.ArrayList[TableSchema]())
     tableInfo
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 17d6065..3bd92e9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -122,12 +122,14 @@ case class CarbonScan(
     columnProjection.foreach { attr =>
       projection.addColumn(attr.name)
     }
+
     new CarbonScanRDD(
       ocRaw.sparkContext,
       projection,
       buildCarbonPlan.getFilterExpression,
       carbonTable.getAbsoluteTableIdentifier,
-      carbonTable
+      carbonTable.getTableInfo.serialize(),
+      carbonTable.getTableInfo
     )
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 44d5efb..da6dd98 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -128,7 +128,6 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
 
 
     val table = relation.tableMeta.carbonTable
-    carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
     carbonLoadModel.setTableName(table.getFactTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
@@ -404,7 +403,6 @@ case class LoadTable(
       carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
       val table = relation.tableMeta.carbonTable
-      carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
       carbonLoadModel.setTableName(table.getFactTableName)
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       // Need to fill dimension relation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index d28044f..d1baf79 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import java.io.{ByteArrayOutputStream, DataOutputStream}
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
@@ -28,15 +30,13 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
-import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.CarbonProjection
-import org.apache.carbondata.hadoop.util.SchemaReader
-import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.CarbonFilters
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
-import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 case class CarbonDatasourceHadoopRelation(
     sparkSession: SparkSession,
@@ -46,15 +46,22 @@ case class CarbonDatasourceHadoopRelation(
     isSubquery: ArrayBuffer[Boolean] = new ArrayBuffer[Boolean]())
   extends BaseRelation with InsertableRelation {
 
-  lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
-  lazy val carbonTable = carbonRelation.tableMeta.carbonTable
-  lazy val carbonRelation: CarbonRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    .lookupRelation(Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName),
-      absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
+  lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
+  lazy val databaseName: String = carbonTable.getDatabaseName
+  lazy val tableName: String = carbonTable.getFactTableName
+  lazy val carbonSessionInfo : CarbonSessionInfo =
+    CarbonEnv.getInstance(sparkSession).carbonSessionInfo
+  ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+
+  @transient lazy val carbonRelation: CarbonRelation =
+    CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .lookupRelation(
+          Some(identifier.getCarbonTableIdentifier.getDatabaseName),
+          identifier.getCarbonTableIdentifier.getTableName)(sparkSession)
     .asInstanceOf[CarbonRelation]
 
-  val carbonSessionInfo : CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo
-  ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+  @transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable
+
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
@@ -67,15 +74,20 @@ case class CarbonDatasourceHadoopRelation(
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
 
-    new CarbonScanRDD(sqlContext.sparkContext, projection, filterExpression.orNull,
-      absIdentifier, carbonTable)
+    new CarbonScanRDD(
+      sqlContext.sparkContext,
+      projection,
+      filterExpression.orNull,
+      identifier,
+      carbonTable.getTableInfo.serialize(),
+      carbonTable.getTableInfo)
   }
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
 
   override def toString: String = {
-    "CarbonDatasourceHadoopRelation [ " + "Database name :" + carbonTable.getDatabaseName +
-    ", " + "Table name :" + carbonTable.getFactTableName + ", Schema :" + tableSchema + " ]"
+    "CarbonDatasourceHadoopRelation [ " + "Database name :" + databaseName +
+    ", " + "Table name :" + tableName + ", Schema :" + tableSchema + " ]"
   }
 
   override def sizeInBytes: Long = carbonRelation.sizeInBytes

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index bd1c8b1..33091aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.rdd.CarbonRDD
+import org.apache.carbondata.spark.rdd.{CarbonRDD, CarbonRDDWithTableInfo}
 
 /**
  * It decodes the data.
@@ -444,10 +444,9 @@ class CarbonDecoderRDD(
     aliasMap: CarbonAliasDecoderRelation,
     prev: RDD[InternalRow],
     output: Seq[Attribute],
-    sparkSession: SparkSession)
-  extends CarbonRDD[InternalRow](prev) {
-
-  private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+    storePath: String,
+    serializedTableInfo: Array[Byte])
+  extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) {
 
   def canBeDecoded(attr: Attribute): Boolean = {
     profile match {
@@ -516,13 +515,13 @@ class CarbonDecoderRDD(
 
   override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val absoluteTableIdentifiers = relations.map { relation =>
-      val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
-      (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+      val tableInfo = getTableInfo
+      (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier)
     }.toMap
 
     val cacheProvider: CacheProvider = CacheProvider.getInstance
     val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
-      cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
+      cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
     val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
       forwardDictionaryCache)
     val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index a206bef..1cc6668 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -17,9 +17,6 @@
 
 package org.apache.spark.sql.execution
 
-import java.text.SimpleDateFormat
-import java.util.Date
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
@@ -34,14 +31,13 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{AtomicType, DoubleType, IntegerType, StringType, TimestampType}
+import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampDirectDictionaryGenerator
 import org.apache.carbondata.core.metadata.schema.BucketingInfo
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.{CarbonAliasDecoderRelation}
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -99,8 +95,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       relation.addAttribute(newAttr)
       newAttr
     }
-    new CarbonDecoderRDD(Seq(relation), IncludeProfile(attrs),
-      CarbonAliasDecoderRelation(), rdd, output, SparkSession.getActiveSession.get)
+
+    new CarbonDecoderRDD(
+      Seq(relation),
+      IncludeProfile(attrs),
+      CarbonAliasDecoderRelation(),
+      rdd,
+      output,
+      CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath,
+      table.carbonTable.getTableInfo.serialize())
   }
 
   private[this] def toCatalystRDD(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 0894f23..2fccd0c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -795,8 +795,8 @@ object UpdateExecution {
 
     def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
 
-      val tableName = relation.absIdentifier.getCarbonTableIdentifier.getTableName
-      val dbName = relation.absIdentifier.getCarbonTableIdentifier.getDatabaseName
+      val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
+      val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
       (tableIdentifier.size > 1 &&
         tableIdentifier(0) == dbName &&
         tableIdentifier(1) == tableName) ||
@@ -841,8 +841,8 @@ object UpdateExecution {
     val header = getHeader(carbonRelation, plan)
 
     LoadTable(
-      Some(carbonRelation.absIdentifier.getCarbonTableIdentifier.getDatabaseName),
-      carbonRelation.absIdentifier.getCarbonTableIdentifier.getTableName,
+      Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
+      carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
       null,
       Seq(),
       Map(("fileheader" -> header)),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 0391cb3..136078c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -126,7 +126,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     val carbonLoadModel = new CarbonLoadModel()
 
     val table = relation.tableMeta.carbonTable
-    carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
     carbonLoadModel.setTableName(table.getFactTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
@@ -495,7 +494,6 @@ case class LoadTable(
       carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
       val table = relation.tableMeta.carbonTable
-      carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
       carbonLoadModel.setTableName(table.getFactTableName)
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       // Need to fill dimension relation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 21ef6c4..ca9bb47 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -488,13 +488,6 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * @param aggTables the aggTables to set
-   */
-  public void setAggTables(String[] aggTables) {
-    this.aggTables = aggTables;
-  }
-
-  /**
    * @param storePath The storePath to set.
    */
   public void setStorePath(String storePath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac5aee18/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 91cc195..a7c2057 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -261,7 +261,6 @@ public class StoreCreator {
             + absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
-    tableInfo.setAggregateTableList(new ArrayList<TableSchema>());
 
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),