You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/27 12:42:07 UTC

[7/7] carbondata git commit: Rebase datamap branch onto master

Rebase datamap branch onto master

This closes #1196


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/79feac96
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/79feac96
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/79feac96

Branch: refs/heads/master
Commit: 79feac96ae789851c5ad7306a7acaaba25d8e6c9
Parents: b681244
Author: Raghunandan S <ra...@gmail.com>
Authored: Thu Jul 27 20:38:48 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Jul 27 20:41:22 2017 +0800

----------------------------------------------------------------------
 .../core/indexstore/UnsafeMemoryDMStore.java    |  31 +-
 .../blockletindex/BlockletDataMap.java          |  11 +-
 .../core/indexstore/row/DataMapRow.java         |   4 +-
 .../core/indexstore/row/DataMapRowImpl.java     |   4 +
 .../core/indexstore/row/UnsafeDataMapRow.java   |  40 +-
 .../core/memory/UnsafeMemoryManager.java        |  19 +-
 .../datatype/DecimalConverterFactory.java       |   2 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  83 ++--
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   2 +-
 .../execution/command/carbonTableSchema.scala   |   6 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    | 392 +++++++++----------
 11 files changed, 301 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 8246f99..13951dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -19,9 +19,10 @@ package org.apache.carbondata.core.indexstore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
 import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
-import org.apache.carbondata.core.memory.MemoryAllocator;
-import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
@@ -39,8 +40,6 @@ public class UnsafeMemoryDMStore {
 
   private int runningLength;
 
-  private MemoryAllocator memoryAllocator;
-
   private boolean isMemoryFreed;
 
   private DataMapSchema[] schema;
@@ -49,11 +48,13 @@ public class UnsafeMemoryDMStore {
 
   private int rowCount;
 
-  public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+  private final long taskId = null != ThreadLocalTaskInfo.getCarbonTaskInfo() ?
+      ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId() : System.nanoTime();
+
+  public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException {
     this.schema = schema;
-    this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
     this.allocatedSize = capacity;
-    this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+    this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
     this.pointers = new int[1000];
   }
 
@@ -63,13 +64,13 @@ public class UnsafeMemoryDMStore {
    *
    * @param rowSize
    */
-  private void ensureSize(int rowSize) {
+  private void ensureSize(int rowSize) throws MemoryException {
     if (runningLength + rowSize >= allocatedSize) {
       MemoryBlock allocate =
-          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity);
+          UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity);
       unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
           allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
       allocatedSize = allocatedSize + capacity;
       memoryBlock = allocate;
     }
@@ -86,7 +87,7 @@ public class UnsafeMemoryDMStore {
    * @param indexRow
    * @return
    */
-  public void addIndexRowToUnsafe(DataMapRow indexRow) {
+  public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
     // First calculate the required memory to keep the row in unsafe
     int rowSize = indexRow.getTotalSizeInBytes();
     // Check whether allocated memory is sufficient or not.
@@ -168,13 +169,13 @@ public class UnsafeMemoryDMStore {
     return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
   }
 
-  public void finishWriting() {
+  public void finishWriting() throws MemoryException {
     if (runningLength < allocatedSize) {
       MemoryBlock allocate =
-          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+          UnsafeMemoryManager.allocateMemoryWithRetry(taskId, runningLength);
       unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
           allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
       memoryBlock = allocate;
     }
     // Compact pointers.
@@ -187,7 +188,7 @@ public class UnsafeMemoryDMStore {
 
   public void freeMemory() {
     if (!isMemoryFreed) {
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
       isMemoryFreed = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 79aa091..680852d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
 import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -110,7 +111,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       if (unsafeMemoryDMStore != null) {
         unsafeMemoryDMStore.finishWriting();
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
@@ -156,11 +157,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
         DataOutput dataOutput = new DataOutputStream(stream);
         blockletInfo.write(dataOutput);
         serializedData = stream.toByteArray();
-      } catch (IOException e) {
+        row.setByteArray(serializedData, ordinal);
+        unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+      } catch (Exception e) {
         throw new RuntimeException(e);
       }
-      row.setByteArray(serializedData, ordinal);
-      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
     }
   }
 
@@ -176,7 +177,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return minRow;
   }
 
-  private void createSchema(SegmentProperties segmentProperties) {
+  private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
     List<DataMapSchema> indexSchemas = new ArrayList<>();
 
     // Index key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index defe766..631e0ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -62,6 +62,8 @@ public abstract class DataMapRow {
 
   public abstract double getDouble(int ordinal);
 
+  public abstract int getLengthInBytes(int ordinal);
+
   public int getTotalSizeInBytes() {
     int len = 0;
     for (int i = 0; i < schemas.length; i++) {
@@ -75,7 +77,7 @@ public abstract class DataMapRow {
       case FIXED:
         return schemas[ordinal].getLength();
       case VARIABLE:
-        return getByteArray(ordinal).length + 2;
+        return getLengthInBytes(ordinal) + 2;
       case STRUCT:
         return getRow(ordinal).getTotalSizeInBytes();
       default:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index adec346..32d15d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -35,6 +35,10 @@ public class DataMapRowImpl extends DataMapRow {
     return (byte[]) data[ordinal];
   }
 
+  @Override public int getLengthInBytes(int ordinal) {
+    return ((byte[]) data[ordinal]).length;
+  }
+
   @Override public DataMapRow getRow(int ordinal) {
     return (DataMapRow) data[ordinal];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index ef78514..c398115 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -55,6 +55,31 @@ public class UnsafeDataMapRow extends DataMapRow {
     return data;
   }
 
+  @Override public int getLengthInBytes(int ordinal) {
+    int length;
+    int position = getPosition(ordinal);
+    switch (schemas[ordinal].getSchemaType()) {
+      case VARIABLE:
+        length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+        break;
+      default:
+        length = schemas[ordinal].getLength();
+    }
+    return length;
+  }
+
+  private int getLengthInBytes(int ordinal, int position) {
+    int length;
+    switch (schemas[ordinal].getSchemaType()) {
+      case VARIABLE:
+        length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+        break;
+      default:
+        length = schemas[ordinal].getLength();
+    }
+    return length;
+  }
+
   @Override public DataMapRow getRow(int ordinal) {
     DataMapSchema[] childSchemas =
         ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
@@ -123,10 +148,23 @@ public class UnsafeDataMapRow extends DataMapRow {
     throw new UnsupportedOperationException("Not supported to set on unsafe row");
   }
 
+  private int getSizeInBytes(int ordinal, int position) {
+    switch (schemas[ordinal].getSchemaType()) {
+      case FIXED:
+        return schemas[ordinal].getLength();
+      case VARIABLE:
+        return getLengthInBytes(ordinal, position) + 2;
+      case STRUCT:
+        return getRow(ordinal).getTotalSizeInBytes();
+      default:
+        throw new UnsupportedOperationException("wrong type");
+    }
+  }
+
   private int getPosition(int ordinal) {
     int position = 0;
     for (int i = 0; i < ordinal; i++) {
-      position += getSizeInBytes(i);
+      position += getSizeInBytes(i, position);
     }
     return position;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 991bc90..d433b5e 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -90,33 +90,28 @@ public class UnsafeMemoryManager {
     if (memoryUsed + memoryRequested <= totalMemory) {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Working Memory block (" + allocate + ") is created with size " + allocate.size()
-                + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
-                + "Bytes");
-      }
       Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
       if (null == listOfMemoryBlock) {
         listOfMemoryBlock = new HashSet<>();
         taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
       }
       listOfMemoryBlock.add(allocate);
+      LOGGER.info("Memory block (" + allocate + ") is created with size " + allocate.size()
+          + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed)
+          + "Bytes");
       return allocate;
     }
     return null;
   }
 
-  public synchronized void freeMemory(long taskId,MemoryBlock memoryBlock) {
+  public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
     taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
     allocator.free(memoryBlock);
     memoryUsed -= memoryBlock.size();
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(
-          "Freeing memory of size: " + memoryBlock.size() + ": Current available memory is: " + (
-              totalMemory - memoryUsed));
-    }
+    LOGGER.info(
+        "Freeing memory of size: " + memoryBlock.size() + "available memory:  " + (totalMemory
+            - memoryUsed));
   }
 
   public void freeMemoryAll(long taskId) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 555df1c..459eb24 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
  */
 public final class DecimalConverterFactory {
 
-  public static DecimalConverterFactory INSTANCE = new DecimalConverterFactory();
+  public static final DecimalConverterFactory INSTANCE = new DecimalConverterFactory();
 
   private int[] minBytesForPrecision = minBytesForPrecision();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e73c04a..9e6e284 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -99,59 +100,56 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   private static final String FILTER_PREDICATE =
       "mapreduce.input.carboninputformat.filter.predicate";
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
-  private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
 
+  // a cache for carbon table, it will be used in task side
+  private CarbonTable carbonTable;
+
   /**
-   * It is optional, if user does not set then it reads from store
-   *
-   * @param configuration
-   * @param carbonTable
-   * @throws IOException
+   * Set the `tableInfo` in `configuration`
    */
-  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
       throws IOException {
-    if (null != carbonTable) {
-      configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+    if (null != tableInfo) {
+      configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo));
     }
   }
 
-  public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
-    String carbonTableStr = configuration.get(CARBON_TABLE);
-    if (carbonTableStr == null) {
-      populateCarbonTable(configuration);
-      // read it from schema file in the store
-      carbonTableStr = configuration.get(CARBON_TABLE);
-      return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
-    }
-    return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+  /**
+   * Get TableInfo object from `configuration`
+   */
+  private TableInfo getTableInfo(Configuration configuration) throws IOException {
+    String tableInfoStr = configuration.get(TABLE_INFO);
+    return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr);
   }
 
   /**
-   * this method will read the schema from the physical file and populate into CARBON_TABLE
-   *
-   * @param configuration
-   * @throws IOException
+   * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  private static void populateCarbonTable(Configuration configuration) throws IOException {
-    String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
+  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+    if (carbonTable == null) {
+      // carbon table should be created either from deserialized table info (schema saved in
+      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+      TableInfo tableInfo = getTableInfo(configuration);
+      CarbonTable carbonTable;
+      if (tableInfo != null) {
+        carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      } else {
+        carbonTable = SchemaReader.readCarbonTableFromStore(
+            getAbsoluteTableIdentifier(configuration));
+      }
+      this.carbonTable = carbonTable;
+      return carbonTable;
+    } else {
+      return this.carbonTable;
     }
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
-    // read the schema file to get the absoluteTableIdentifier having the correct table id
-    // persisted in the schema
-    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
-    setCarbonTable(configuration, carbonTable);
   }
 
   public static void setTablePath(Configuration configuration, String tablePath)
       throws IOException {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
-
   /**
    * It sets unresolved filter expression.
    *
@@ -213,9 +211,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
   }
 
-  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+  private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
       throws IOException {
-    return getCarbonTable(configuration).getAbsoluteTableIdentifier();
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
+    }
+    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
   }
 
   /**
@@ -262,7 +265,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
-    CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     // this will be null in case of corrupt schema file.
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
@@ -277,7 +280,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       if (null != partitionInfo) {
         Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
         matchedPartitions = new FilterExpressionProcessor()
-            .getFilteredPartitions(filter, partitionInfo, partitioner);
+            .getFilteredPartitions(filter, partitionInfo);
         if (matchedPartitions.cardinality() == 0) {
           // no partition is required
           return new ArrayList<InputSplit>();
@@ -320,7 +323,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     Boolean isIUDTable = false;
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
-        getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+        getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(absoluteTableIdentifier);
 
@@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
-    CarbonTable carbonTable = getCarbonTable(configuration);
+    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
     // getting the table absoluteTableIdentifier from the carbonTable
     // to avoid unnecessary deserialization
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 1a8183c..add0578 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -265,7 +265,7 @@ class CarbonMergerRDD[K, V](
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
-    CarbonInputFormat.setTableInfo(job.getConfiguration,
+    CarbonTableInputFormat.setTableInfo(job.getConfiguration,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
     var updateDetails: UpdateVO = null
     // initialise query_id for job

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f3baf58..d34b91d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -268,7 +268,7 @@ case class DeleteLoadsById(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-        lookupRelation(databaseNameOp,  tableName)(sparkSession).asInstanceOf[CarbonRelation].
+        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
         tableMeta.carbonTable
     CarbonStore.deleteLoadById(
       loadids,
@@ -293,7 +293,7 @@ case class DeleteLoadsByLoadDate(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-        lookupRelation(databaseNameOp,  tableName)(sparkSession).asInstanceOf[CarbonRelation].
+        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
         tableMeta.carbonTable
     CarbonStore.deleteLoadByDate(
       loadDate,
@@ -847,7 +847,7 @@ case class ShowLoads(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-        lookupRelation(databaseNameOp,  tableName)(sparkSession).asInstanceOf[CarbonRelation].
+        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
         tableMeta.carbonTable
     CarbonStore.showSegments(
       getDB.getDatabaseName(databaseNameOp, sparkSession),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/79feac96/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 549841b..c9eaf6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     System.nanoTime() + ""
   }
 
-  lazy val metadata = loadMetadata(storePath, nextQueryId)
+  val metadata = MetaData(new ArrayBuffer[TableMeta]())
 
 
   /**
@@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
   override def createCarbonRelation(parameters: Map[String, String],
       absIdentifier: AbsoluteTableIdentifier,
       sparkSession: SparkSession): CarbonRelation = {
-    lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
-      Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
-      .asInstanceOf[CarbonRelation]
+    val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
+    val tables = getTableFromMetadataCache(database, tableName)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableName,
+          CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+      case None =>
+        readCarbonSchema(absIdentifier) match {
+          case Some(meta) =>
+            CarbonRelation(database, tableName,
+              CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+          case None =>
+            throw new NoSuchTableException(database, tableName)
+        }
+    }
   }
 
   def lookupRelation(dbName: Option[String], tableName: String)
@@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
   }
 
-  def lookupRelation(tableIdentifier: TableIdentifier)
+  override def lookupRelation(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): LogicalPlan = {
-    checkSchemasModifiedTimeAndReloadTables()
     val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase
-    )
-    val tables = getTableFromMetadata(database, tableIdentifier.table, true)
-    tables match {
-      case Some(t) =>
-        CarbonRelation(database, tableIdentifier.table,
-          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
-      case None =>
-        throw new NoSuchTableException(database, tableIdentifier.table)
+      sparkSession.catalog.currentDatabase)
+    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+      _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case LogicalRelation(
+      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
     }
+    relation
   }
 
   /**
@@ -123,11 +138,10 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param tableName
    * @return
    */
-  def getTableFromMetadata(database: String,
-      tableName: String, readStore: Boolean = false): Option[TableMeta] = {
+  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
     metadata.tablesMeta
       .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-                 c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+        c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
   }
 
   def tableExists(
@@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    tableExists(TableIdentifier(table, databaseOp))(sparkSession)
   }
 
-  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
-    tables.nonEmpty
-  }
-
-  def loadMetadata(metadataPath: String, queryId: String): MetaData = {
-    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val statistic = new QueryStatistic()
-    // creating zookeeper instance once.
-    // if zookeeper is configured as carbon lock type.
-    val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
-    if (null != zookeeperurl) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
-    }
-    if (metadataPath == null) {
-      return null
-    }
-    // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
-        FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
-          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
-        )
-      LOGGER.info("Default lock type HDFSLOCK is configured")
+  override def tableExists(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    try {
+      lookupRelation(tableIdentifier)(sparkSession)
+    } catch {
+      case e: Exception =>
+        return false
     }
-    val fileType = FileFactory.getFileType(metadataPath)
-    val metaDataBuffer = new ArrayBuffer[TableMeta]
-    fillMetaData(metadataPath, fileType, metaDataBuffer)
-    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
-    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
-      System.currentTimeMillis())
-    recorder.recordStatisticsForDriver(statistic, queryId)
-    MetaData(metaDataBuffer)
+    true
   }
 
-  private def fillMetaData(basePath: String, fileType: FileType,
-      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
-    val databasePath = basePath // + "/schemas"
-    try {
-      if (FileFactory.isFileExist(databasePath, fileType)) {
-        val file = FileFactory.getCarbonFile(databasePath, fileType)
-        val databaseFolders = file.listFiles()
-
-        databaseFolders.foreach(databaseFolder => {
-          if (databaseFolder.isDirectory) {
-            val dbName = databaseFolder.getName
-            val tableFolders = databaseFolder.listFiles()
-
-            tableFolders.foreach(tableFolder => {
-              if (tableFolder.isDirectory) {
-                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                  tableFolder.getName, UUID.randomUUID().toString)
-                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
-                  carbonTableIdentifier)
-                val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
-                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-                  val tableName = tableFolder.getName
-                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-                  val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
-                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
-                  val wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
-                  val schemaFilePath = CarbonStorePath
-                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-                  wrapperTableInfo.setStorePath(storePath)
-                  wrapperTableInfo
-                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-                  val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    carbonTable)
-                }
-              }
-            })
-          }
-        })
-      } else {
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-      }
-    } catch {
-      case s: java.io.FileNotFoundException =>
-        s.printStackTrace()
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
+  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = identifier.getCarbonTableIdentifier.getTableName
+    val storePath = identifier.getStorePath
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
+      tableName.toLowerCase(), UUID.randomUUID().toString)
+    val carbonTablePath =
+      CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val fileType = FileFactory.getFileType(tableMetadataFile)
+    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+      val tableUniqueName = dbName + "_" + tableName
+      val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+      val schemaFilePath = CarbonStorePath
+        .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+      wrapperTableInfo.setStorePath(storePath)
+      wrapperTableInfo
+        .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+      val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+      val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
+        identifier.getStorePath,
+        identifier.getTablePath,
+        carbonTable)
+      metadata.tablesMeta += tableMeta
+      Some(tableMeta)
+    } else {
+      None
     }
   }
 
@@ -238,28 +201,36 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param newTableIdentifier
    * @param thriftTableInfo
    * @param schemaEvolutionEntry
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
+      tablePath: String) (sparkSession: SparkSession): String = {
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
-          newTableIdentifier.getDatabaseName,
-          newTableIdentifier.getTableName,
-          carbonStorePath)
-    createSchemaThriftFile(wrapperTableInfo,
+        newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName,
+        absoluteTableIdentifier.getStorePath)
+    val identifier =
+      new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName,
+        wrapperTableInfo.getFactTable.getTableId)
+    val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      newTableIdentifier.getDatabaseName,
-      newTableIdentifier.getTableName)(sparkSession)
+      identifier)
+    addTableCache(wrapperTableInfo,
+      AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
+        newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName))
+    path
   }
 
   /**
@@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
+      tablePath: String)(sparkSession: SparkSession): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         carbonTableIdentifier.getDatabaseName,
         carbonTableIdentifier.getTableName,
-        carbonStorePath)
+        tableIdentifier.getStorePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    createSchemaThriftFile(wrapperTableInfo,
+    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+    val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      carbonTableIdentifier.getDatabaseName,
-      carbonTableIdentifier.getTableName)(sparkSession)
+      tableIdentifier.getCarbonTableIdentifier)
+    addTableCache(wrapperTableInfo, tableIdentifier)
+    path
   }
 
 
@@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * Load CarbonTable from wrapper tableInfo
    *
    */
-  def createTableFromThrift(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
-    if (tableExists(tableName, Some(dbName))(sparkSession)) {
-      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
-    }
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+  def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val dbName = tableInfo.getDatabaseName
+    val tableName = tableInfo.getFactTable.getTableName
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
-      .add(schemaEvolutionEntry)
-    val carbonTablePath = createSchemaThriftFile(tableInfo,
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    tableInfo.setStorePath(identifier.getStorePath)
+    createSchemaThriftFile(tableInfo,
       thriftTableInfo,
-      dbName,
-      tableName)(sparkSession)
+      identifier.getCarbonTableIdentifier)
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-    (carbonTablePath, "")
+  }
+
+  /**
+   * Generates schema string from TableInfo
+   */
+  override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+      tablePath: String): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+    val schemaMetadataPath =
+      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(tableIdentifier.getStorePath)
+    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    addTableCache(tableInfo, tableIdentifier)
+    CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
   }
 
   /**
@@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    *
    * @param tableInfo
    * @param thriftTableInfo
-   * @param dbName
-   * @param tableName
-   * @param sparkSession
    * @return
    */
-  private def createSchemaThriftFile(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      dbName: String, tableName: String)
-    (sparkSession: SparkSession): String = {
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+  private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
+      thriftTableInfo: TableInfo,
+      carbonTableIdentifier: CarbonTableIdentifier): String = {
+    val carbonTablePath = CarbonStorePath.
+      getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     thriftWriter.open(FileWriteOperation.OVERWRITE)
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-    removeTableFromMetadata(dbName, tableName)
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+    carbonTablePath.getPath
+  }
+
+  protected def addTableCache(tableInfo: table.TableInfo,
+      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+    val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
+    CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
+    removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
-      CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
+    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+      absoluteTableIdentifier.getTablePath,
+      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
     metadata.tablesMeta += tableMeta
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-    carbonTablePath.getPath
   }
 
   /**
@@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param tableName
    */
   def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
     metadataToBeRemoved match {
       case Some(tableMeta) =>
         metadata.tablesMeta -= tableMeta
         CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
       case None =>
-        LOGGER.debug(s"No entry for table $tableName in database $dbName")
+        if (LOGGER.isDebugEnabled) {
+          LOGGER.debug(s"No entry for table $tableName in database $dbName")
+        }
     }
   }
 
@@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
 
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = tableIdentifier.table.toLowerCase
-
-    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
-    val fileType = FileFactory.getFileType(tablePath)
-    FileFactory.isFileExist(tablePath, fileType)
+    try {
+      val tablePath = lookupRelation(tableIdentifier)(sparkSession).
+        asInstanceOf[CarbonRelation].tableMeta.tablePath
+      val fileType = FileFactory.getFileType(tablePath)
+      FileFactory.isFileExist(tablePath, fileType)
+    } catch {
+      case e: Exception =>
+        false
+    }
   }
 
-  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+  def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession) {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
-
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
@@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
       // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables
-
-      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
-        tableIdentifier.table)
-      metadataToBeRemoved match {
-        case Some(tableMeta) =>
-          metadata.tablesMeta -= tableMeta
-          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
-          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-        case None =>
-          LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
-      }
+      checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+
+      removeTableFromMetadata(dbName, tableName)
+      updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
     }
   }
 
-  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
-    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+  private def getTimestampFileAndType(basePath: String) = {
+    val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
     val timestampFileType = FileFactory.getFileType(timestampFile)
     (timestampFile, timestampFileType)
   }
@@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     tableModifiedTimeStore.put("default", timeStamp)
   }
 
-  def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
+  def updateAndTouchSchemasUpdatedTime(basePath: String) {
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
   }
 
-  /**
-   * This method will read the timestamp of empty schema file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
-    } else {
-      System.currentTimeMillis()
-    }
-  }
 
   /**
    * This method will check and create an empty schema timestamp file
    *
-   * @param databaseName
-   * @param tableName
    * @return
    */
-  private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+  private def touchSchemaFileSystemTime(basePath: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
     if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+      LOGGER.audit(s"Creating timestamp file for $basePath")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
     val systemTime = System.currentTimeMillis()
@@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     systemTime
   }
 
-  def checkSchemasModifiedTimeAndReloadTables() {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+  def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+    val (timestampFile, timestampFileType) =
+      getTimestampFileAndType(storePath)
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==
@@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
   }
 
   private def refreshCache() {
-    metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+    metadata.tablesMeta.clear()
   }
 
   override def isReadFromHiveMetaStore: Boolean = false
@@ -527,4 +492,3 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     CarbonUtil.readSchemaFile(tableMetadataFile)
   }
 }
-