You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/11 11:56:12 UTC

[3/3] carbondata git commit: [CARBONDATA-1284]Implement hive based schema storage in carbon

[CARBONDATA-1284]Implement hive based schema storage in carbon

This closes #1149


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a9debfc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a9debfc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a9debfc

Branch: refs/heads/metadata
Commit: 2a9debfccfdc5633e5e00e95b860efb8b368ea12
Parents: 4b69c9d
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jul 8 17:14:04 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Tue Jul 11 19:48:57 2017 +0800

----------------------------------------------------------------------
 .../dictionary/ManageDictionaryAndBTree.java    |  39 +-
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../carbondata/core/locks/CarbonLockUtil.java   |  11 +-
 .../apache/carbondata/core/util/CarbonUtil.java | 127 +++
 .../core/util/path/CarbonTablePath.java         |   4 +
 .../partition/TestDDLForPartitionTable.scala    |   1 +
 .../spark/rdd/AlterTableAddColumnRDD.scala      |   7 +
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   2 +
 .../apache/carbondata/spark/rdd/Compactor.scala |   2 -
 .../spark/rdd/DataManagementFunc.scala          |   1 -
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   4 -
 .../execution/command/carbonTableSchema.scala   |   3 -
 .../spark/rdd/CarbonDataRDDFactory.scala        |  47 +-
 .../spark/DictionaryDetailHelper.scala          |  18 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  44 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |  15 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   8 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   6 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  93 +-
 .../execution/command/AlterTableCommands.scala  |  53 +-
 .../execution/command/CarbonHiveCommands.scala  |  14 +-
 .../sql/execution/command/DDLStrategy.scala     |  14 +-
 .../sql/execution/command/IUDCommands.scala     |   5 +-
 .../execution/command/carbonTableSchema.scala   |  87 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    | 531 ++++++++++
 .../spark/sql/hive/CarbonHiveMetaStore.scala    | 287 ++++++
 .../apache/spark/sql/hive/CarbonMetaStore.scala | 170 ++++
 .../apache/spark/sql/hive/CarbonMetastore.scala | 960 -------------------
 .../apache/spark/sql/hive/CarbonRelation.scala  | 292 ++++++
 .../org/apache/spark/util/AlterTableUtil.scala  |  57 +-
 .../org/apache/spark/util/CleanFiles.scala      |   6 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   8 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   6 +-
 .../org/apache/spark/util/ShowSegments.scala    |   6 +-
 .../spark/util/AllDictionaryTestCase.scala      |  10 +
 .../util/ExternalColumnDictionaryTestCase.scala |  10 +
 .../carbondata/CarbonDataSourceSuite.scala      |   1 +
 .../processing/merger/CarbonCompactionUtil.java |  11 +-
 38 files changed, 1744 insertions(+), 1220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
index a50bf15..bae9189 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -67,25 +67,28 @@ public class ManageDictionaryAndBTree {
     String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
     CarbonFile metadataDir = FileFactory
         .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
-    // sort index file is created with dictionary size appended to it. So all the files
-    // with a given column ID need to be listed
-    CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile path) {
-        if (path.getName().startsWith(columnSchema.getColumnUniqueId())) {
-          return true;
+    if (metadataDir.exists()) {
+      // sort index file is created with dictionary size appended to it. So all the files
+      // with a given column ID need to be listed
+      CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile path) {
+          if (path.getName().startsWith(columnSchema.getColumnUniqueId())) {
+            return true;
+          }
+          return false;
+        }
+      });
+      for (CarbonFile file : listFiles) {
+        // try catch is inside for loop because even if one deletion fails, other files
+        // still need to be deleted
+        try {
+          FileFactory.deleteFile(file.getCanonicalPath(),
+              FileFactory.getFileType(file.getCanonicalPath()));
+        } catch (IOException e) {
+          LOGGER.error("Failed to delete dictionary or sortIndex file for column "
+              + columnSchema.getColumnName() + "with column ID "
+              + columnSchema.getColumnUniqueId());
         }
-        return false;
-      }
-    });
-    for (CarbonFile file : listFiles) {
-      // try catch is inside for loop because even if one deletion fails, other files
-      // still need to be deleted
-      try {
-        FileFactory
-            .deleteFile(file.getCanonicalPath(), FileFactory.getFileType(file.getCanonicalPath()));
-      } catch (IOException e) {
-        LOGGER.error("Failed to delete dictionary or sortIndex file for column " + columnSchema
-            .getColumnName() + "with column ID " + columnSchema.getColumnUniqueId());
       }
     }
     // remove dictionary cache

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6393131..55a292e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1264,6 +1264,10 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";
 
+  public static final String ENABLE_HIVE_SCHEMA_META_STORE = "spark.carbon.hive.schema.store";
+
+  public static final String ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT = "false";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index eaaaf94..60a7564 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.locks;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 
 /**
  * This class contains all carbon lock utilities
@@ -66,15 +66,12 @@ public class CarbonLockUtil {
    * Given a lock type this method will return a new lock object if not acquired by any other
    * operation
    *
-   * @param carbonTable
+   * @param identifier
    * @param lockType
    * @return
    */
-  public static ICarbonLock getLockObject(CarbonTable carbonTable,
-                            String lockType) {
-    ICarbonLock carbonLock = CarbonLockFactory
-            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier().getCarbonTableIdentifier(),
-                    lockType);
+  public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) {
+    ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(identifier, lockType);
     LOGGER.info("Trying to acquire lock: " + carbonLock);
     if (carbonLock.lockWithRetries()) {
       LOGGER.info("Successfully acquired the lock " + carbonLock);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8298600..b9c164a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -31,8 +31,10 @@ import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -58,6 +60,7 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
@@ -70,9 +73,11 @@ import org.apache.carbondata.core.service.PathService;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.ThriftWriter;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
 
+import com.google.gson.Gson;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -1720,5 +1725,127 @@ public final class CarbonUtil {
   public static boolean isValidBadStorePath(String badRecordsLocation) {
     return !(null == badRecordsLocation || badRecordsLocation.length() == 0);
   }
+
+  public static String convertToMultiGsonStrings(TableInfo tableInfo, String seperator,
+      String quote, String prefix) {
+    Gson gson = new Gson();
+    String schemaString = gson.toJson(tableInfo);
+    int schemaLen = schemaString.length();
+    int splitLen = 4000;
+    int parts = schemaLen / splitLen;
+    if (schemaLen % splitLen > 0) {
+      parts++;
+    }
+    StringBuilder builder =
+        new StringBuilder(prefix).append(quote).append("carbonSchemaPartsNo").append(quote)
+            .append(seperator).append("'").append(parts).append("',");
+    int runningLen = 0;
+    int endLen = splitLen;
+    for (int i = 0; i < parts; i++) {
+      if (i == parts - 1) {
+        endLen = schemaLen % splitLen;
+      }
+      builder.append(quote).append("carbonSchema").append(i).append(quote).append(seperator);
+      builder.append("'").append(schemaString.substring(runningLen, runningLen + endLen))
+          .append("'");
+      if (i < parts - 1) {
+        builder.append(",");
+      }
+      runningLen += splitLen;
+    }
+    return builder.toString();
+  }
+
+  public static Map<String, String> convertToMultiStringMap(TableInfo tableInfo) {
+    Map<String, String> map = new HashMap<>();
+    Gson gson = new Gson();
+    String schemaString = gson.toJson(tableInfo);
+    int schemaLen = schemaString.length();
+    int splitLen = 4000;
+    int parts = schemaLen / splitLen;
+    if (schemaLen % splitLen > 0) {
+      parts++;
+    }
+    map.put("carbonSchemaPartsNo", parts + "");
+    int runningLen = 0;
+    int endLen = splitLen;
+    for (int i = 0; i < parts; i++) {
+      if (i == parts - 1) {
+        endLen = schemaLen % splitLen;
+      }
+      map.put("carbonSchema" + i, schemaString.substring(runningLen, runningLen + endLen));
+      runningLen += splitLen;
+    }
+    return map;
+  }
+
+
+  public static TableInfo convertGsonToTableInfo(Map<String, String> properties) {
+    Gson gson = new Gson();
+    String partsNo = properties.get("carbonSchemaPartsNo");
+    if (partsNo == null) {
+      return null;
+    }
+    int no = Integer.parseInt(partsNo);
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < no; i++) {
+      String part = properties.get("carbonSchema" + i);
+      if (part == null) {
+        throw new RuntimeException("Some thing wrong in getting schema from hive metastore");
+      }
+      builder.append(part);
+    }
+    TableInfo tableInfo = gson.fromJson(builder.toString(), TableInfo.class);
+    return tableInfo;
+  }
+
+  /**
+   * This method will read the schema file from a given path
+   *
+   * @param schemaFilePath
+   * @return
+   */
+  public static org.apache.carbondata.format.TableInfo readSchemaFile(String schemaFilePath)
+      throws IOException {
+    TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+      public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo,
+          org.apache.carbondata.format.TableInfo._Fields> create() {
+        return new org.apache.carbondata.format.TableInfo();
+      }
+    };
+    ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase);
+    thriftReader.open();
+    org.apache.carbondata.format.TableInfo tableInfo =
+        (org.apache.carbondata.format.TableInfo) thriftReader.read();
+    thriftReader.close();
+    return tableInfo;
+  }
+
+  public static void writeThriftTableToSchemaFile(String schemaFilePath,
+      org.apache.carbondata.format.TableInfo tableInfo) throws IOException {
+    ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+    try {
+      thriftWriter.open();
+      thriftWriter.write(tableInfo);
+    } finally {
+      thriftWriter.close();
+    }
+  }
+
+  public static void createDatabaseDirectory(String dbName, String storePath) throws IOException {
+    String databasePath = storePath + File.separator + dbName.toLowerCase();
+    FileFactory.FileType fileType = FileFactory.getFileType(databasePath);
+    FileFactory.mkdirs(databasePath, fileType);
+  }
+
+  public static void dropDatabaseDirectory(String dbName, String storePath)
+      throws IOException, InterruptedException {
+    String databasePath = storePath + File.separator + dbName;
+    FileFactory.FileType fileType = FileFactory.getFileType(databasePath);
+    if (FileFactory.isFileExist(databasePath, fileType)) {
+      CarbonFile dbPath = FileFactory.getCarbonFile(databasePath, fileType);
+      CarbonUtil.deleteFoldersAndFiles(dbPath);
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 101419d..5824d76 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -432,6 +432,10 @@ public class CarbonTablePath extends Path {
     return tablePath + File.separator + FACT_DIR;
   }
 
+  public CarbonTableIdentifier getCarbonTableIdentifier() {
+    return carbonTableIdentifier;
+  }
+
   @Override public boolean equals(Object o) {
     if (!(o instanceof CarbonTablePath)) {
       return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index 5b77a24..c38b249 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -128,6 +128,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test describe formatted for partition column") {
+    sql("drop table if exists des")
     sql(
       """create table des(a int, b string) partitioned by (c string) stored by 'carbondata'
         |tblproperties ('partition_type'='list','list_info'='1,2')""".stripMargin)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 7eea95d..2fb72f7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -22,6 +22,7 @@ import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -80,6 +81,12 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
               CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
           }
           CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE, lockType)
+          // Create table and metadata folders if not exist
+          val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+          val fileType = FileFactory.getFileType(metadataDirectoryPath)
+          if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+            FileFactory.mkdirs(metadataDirectoryPath, fileType)
+          }
           GlobalDictionaryUtil
             .loadDefaultDictionaryValueForNewColumn(carbonTablePath,
               columnSchema,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 815dba3..bc5ca06 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -265,6 +265,8 @@ class CarbonMergerRDD[K, V](
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    CarbonInputFormat.setTableInfo(job.getConfiguration,
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
     var updateDetails: UpdateVO = null
     // initialise query_id for job
     job.getConfiguration.set("query.id", queryId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 1a237f6..4cebcd8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -39,7 +39,6 @@ object Compactor {
     val storePath = compactionCallableModel.storePath
     val storeLocation = compactionCallableModel.storeLocation
     val carbonTable = compactionCallableModel.carbonTable
-    val cubeCreationTime = compactionCallableModel.cubeCreationTime
     val loadsToMerge = compactionCallableModel.loadsToMerge
     val sc = compactionCallableModel.sqlContext
     val carbonLoadModel = compactionCallableModel.carbonLoadModel
@@ -57,7 +56,6 @@ object Compactor {
       storePath,
       carbonTable.getMetaDataFilepath,
       mergedLoadName,
-      cubeCreationTime,
       databaseName,
       factTableName,
       validSegments,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 954303f..5ab8160 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -250,7 +250,6 @@ object DataManagementFunc {
       carbonLoadModel,
       storeLocation,
       compactionModel.carbonTable,
-      compactionModel.tableCreationTime,
       loadsToMerge,
       sqlContext,
       compactionModel.compactionType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 129c642..7cbe9a4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -388,8 +388,6 @@ class NewDataFrameLoaderRDD[K, V](
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
     loadCount: Integer,
-    tableCreationTime: Long,
-    schemaLastUpdatedTime: Long,
     prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) {
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
@@ -583,8 +581,6 @@ class PartitionTableDataLoaderRDD[K, V](
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
     loadCount: Integer,
-    tableCreationTime: Long,
-    schemaLastUpdatedTime: Long,
     prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) {
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 6174f7c..500e18e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -90,7 +90,6 @@ case class CarbonMergerMapping(storeLocation: String,
     hdfsStoreLocation: String,
     metadataFilePath: String,
     var mergedLoadName: String,
-    tableCreationTime: Long,
     databaseName: String,
     factTableName: String,
     validSegments: Array[String],
@@ -117,14 +116,12 @@ case class UpdateTableModel(isUpdate: Boolean,
 case class CompactionModel(compactionSize: Long,
     compactionType: CompactionType,
     carbonTable: CarbonTable,
-    tableCreationTime: Long,
     isDDLTrigger: Boolean)
 
 case class CompactionCallableModel(storePath: String,
     carbonLoadModel: CarbonLoadModel,
     storeLocation: String,
     carbonTable: CarbonTable,
-    cubeCreationTime: Long,
     loadsToMerge: util.List[LoadMetadataDetails],
     sqlContext: SQLContext,
     compactionType: CompactionType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3579b8a..31b05bc 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -104,8 +104,6 @@ object CarbonDataRDDFactory {
     LOGGER.audit(s"Compaction request received for table " +
         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val tableCreationTime = CarbonEnv.get.carbonMetastore
-        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -118,7 +116,6 @@ object CarbonDataRDDFactory {
     val compactionModel = CompactionModel(compactionSize,
       compactionType,
       carbonTable,
-      tableCreationTime,
       isCompactionTriggerByDDl
     )
 
@@ -272,23 +269,19 @@ object CarbonDataRDDFactory {
           if (!isConcurrentCompactionAllowed) {
             LOGGER.info("System level compaction lock is enabled.")
             val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
-            var tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.tablesMeta.toArray,
+            var table: CarbonTable = CarbonCompactionUtil
+                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.
+                  tablesMeta.map(_.carbonTable).toArray,
                   skipCompactionTables.toList.asJava)
-            while (null != tableForCompaction) {
+            while (null != table) {
               LOGGER.info("Compaction request has been identified for table " +
-                  s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                  s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
-              val table: CarbonTable = tableForCompaction.carbonTable
+                  s"${ table.getDatabaseName }." +
+                  s"${ table.getFactTableName }")
               val metadataPath = table.getMetaDataFilepath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
 
               val newCarbonLoadModel = new CarbonLoadModel()
               DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
-              val tableCreationTime = CarbonEnv.get.carbonMetastore
-                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                    newCarbonLoadModel.getTableName
-                  )
 
               val compactionSize = CarbonDataMergerUtil
                   .getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -296,7 +289,6 @@ object CarbonDataRDDFactory {
               val newcompactionModel = CompactionModel(compactionSize,
                 compactionType,
                 table,
-                tableCreationTime,
                 compactionModel.isDDLTrigger
               )
               // proceed for compaction
@@ -309,8 +301,8 @@ object CarbonDataRDDFactory {
               } catch {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
-                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ table.getDatabaseName }." +
+                      s"${ table.getFactTableName }")
                 // not handling the exception. only logging as this is not the table triggered
                 // by user.
               } finally {
@@ -319,16 +311,16 @@ object CarbonDataRDDFactory {
                     .deleteCompactionRequiredFile(metadataPath, compactionType)) {
                   // if the compaction request file is not been able to delete then
                   // add those tables details to the skip list so that it wont be considered next.
-                  skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
+                  skipCompactionTables.+=:(table.getCarbonTableIdentifier)
                   LOGGER.error("Compaction request file can not be deleted for table " +
-                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ table.getDatabaseName }." +
+                      s"${ table.getFactTableName }")
                 }
               }
               // ********* check again for all the tables.
-              tableForCompaction = CarbonCompactionUtil
+              table = CarbonCompactionUtil
                   .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
-                      .tablesMeta.toArray, skipCompactionTables.asJava
+                      .tablesMeta.map(_.carbonTable).toArray, skipCompactionTables.asJava
                   )
             }
             // giving the user his error for telling in the beeline if his triggered table
@@ -360,7 +352,7 @@ object CarbonDataRDDFactory {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val isAgg = false
     // for handling of the segment Merging.
-    def handleSegmentMerging(tableCreationTime: Long): Unit = {
+    def handleSegmentMerging(): Unit = {
       LOGGER.info(s"compaction need status is" +
           s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
       if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
@@ -371,7 +363,6 @@ object CarbonDataRDDFactory {
         val compactionModel = CompactionModel(compactionSize,
           CompactionType.MINOR_COMPACTION,
           carbonTable,
-          tableCreationTime,
           isCompactionTriggerByDDl
         )
         var storeLocation = ""
@@ -479,10 +470,6 @@ object CarbonDataRDDFactory {
       // reading the start time of data load.
       val loadStartTime = CarbonUpdateUtil.readCurrentTime();
       carbonLoadModel.setFactTimeStamp(loadStartTime)
-      val tableCreationTime = CarbonEnv.get.carbonMetastore
-          .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
-          .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
@@ -634,8 +621,6 @@ object CarbonDataRDDFactory {
             new DataLoadResultImpl(),
             carbonLoadModel,
             currentLoadCount,
-            tableCreationTime,
-            schemaLastUpdatedTime,
             newRdd).collect()
         } catch {
           case ex: Exception =>
@@ -748,8 +733,6 @@ object CarbonDataRDDFactory {
             new DataLoadResultImpl(),
             carbonLoadModel,
             currentLoadCount,
-            tableCreationTime,
-            schemaLastUpdatedTime,
             rdd).collect()
         } catch {
           case ex: Exception =>
@@ -974,7 +957,7 @@ object CarbonDataRDDFactory {
         }
         try {
           // compaction handling
-          handleSegmentMerging(tableCreationTime)
+          handleSegmentMerging()
         } catch {
           case e: Exception =>
             throw new Exception(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
index 32ba6cf..779ace1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
@@ -36,15 +36,17 @@ class DictionaryDetailHelper extends DictionaryDetailService {
     // Metadata folder
     val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
     // need list all dictionary file paths with exists flag
-    val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
-      @Override def accept(pathname: CarbonFile): Boolean = {
-        CarbonTablePath.isDictionaryFile(pathname)
-      }
-    })
-    // 2 put dictionary file names to fileNamesMap
     val fileNamesMap = new HashMap[String, Int]
-    for (i <- 0 until carbonFiles.length) {
-      fileNamesMap.put(carbonFiles(i).getName, i)
+    if (metadataDirectory.exists()) {
+      val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
+        @Override def accept(pathname: CarbonFile): Boolean = {
+          CarbonTablePath.isDictionaryFile(pathname)
+        }
+      })
+      // 2 put dictionary file names to fileNamesMap
+      for (i <- 0 until carbonFiles.length) {
+        fileNamesMap.put(carbonFiles(i).getName, i)
+      }
     }
     // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
     primDimensions.zipWithIndex.foreach { f =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5c20808..fc813d1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -105,8 +105,6 @@ object CarbonDataRDDFactory {
     LOGGER.audit(s"Compaction request received for table " +
         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
-        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -124,7 +122,6 @@ object CarbonDataRDDFactory {
     val compactionModel = CompactionModel(compactionSize,
       compactionType,
       carbonTable,
-      tableCreationTime,
       isCompactionTriggerByDDl
     )
 
@@ -285,22 +282,18 @@ object CarbonDataRDDFactory {
             val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
             var tableForCompaction = CarbonCompactionUtil
               .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
-                .metadata.tablesMeta.toArray,
+                .listAllTables(sqlContext.sparkSession).toArray,
                 skipCompactionTables.toList.asJava)
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
-                  s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                  s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
-              val table: CarbonTable = tableForCompaction.carbonTable
+                  s"${ tableForCompaction.getDatabaseName }." +
+                  s"${ tableForCompaction.getFactTableName}")
+              val table: CarbonTable = tableForCompaction
               val metadataPath = table.getMetaDataFilepath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
 
               val newCarbonLoadModel = new CarbonLoadModel()
               DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
-              val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession)
-                .carbonMetastore.getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                newCarbonLoadModel.getTableName
-              )
 
               val compactionSize = CarbonDataMergerUtil
                   .getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -308,7 +301,6 @@ object CarbonDataRDDFactory {
               val newcompactionModel = CompactionModel(compactionSize,
                 compactionType,
                 table,
-                tableCreationTime,
                 compactionModel.isDDLTrigger
               )
               // proceed for compaction
@@ -321,8 +313,8 @@ object CarbonDataRDDFactory {
               } catch {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
-                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.getDatabaseName }." +
+                      s"${ tableForCompaction.getFactTableName }")
                 // not handling the exception. only logging as this is not the table triggered
                 // by user.
               } finally {
@@ -331,17 +323,17 @@ object CarbonDataRDDFactory {
                     .deleteCompactionRequiredFile(metadataPath, compactionType)) {
                   // if the compaction request file is not been able to delete then
                   // add those tables details to the skip list so that it wont be considered next.
-                  skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
+                  skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier)
                   LOGGER.error("Compaction request file can not be deleted for table " +
-                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.getDatabaseName }." +
+                      s"${ tableForCompaction.getFactTableName }")
                 }
               }
               // ********* check again for all the tables.
               tableForCompaction = CarbonCompactionUtil
                 .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession)
-                  .carbonMetastore.metadata
-                  .tablesMeta.toArray, skipCompactionTables.asJava
+                  .carbonMetastore.listAllTables(sqlContext.sparkSession).toArray,
+                  skipCompactionTables.asJava
                 )
             }
           }
@@ -373,7 +365,7 @@ object CarbonDataRDDFactory {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val isAgg = false
     // for handling of the segment Merging.
-    def handleSegmentMerging(tableCreationTime: Long): Unit = {
+    def handleSegmentMerging(): Unit = {
       LOGGER.info(s"compaction need status is" +
           s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
       if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
@@ -384,7 +376,6 @@ object CarbonDataRDDFactory {
         val compactionModel = CompactionModel(compactionSize,
           CompactionType.MINOR_COMPACTION,
           carbonTable,
-          tableCreationTime,
           isCompactionTriggerByDDl
         )
         var storeLocation = ""
@@ -492,11 +483,6 @@ object CarbonDataRDDFactory {
       // reading the start time of data load.
       val loadStartTime = CarbonUpdateUtil.readCurrentTime();
       carbonLoadModel.setFactTimeStamp(loadStartTime)
-      val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
-          .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
-          .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
       // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
@@ -645,8 +631,6 @@ object CarbonDataRDDFactory {
             new DataLoadResultImpl(),
             carbonLoadModel,
             currentLoadCount,
-            tableCreationTime,
-            schemaLastUpdatedTime,
             newRdd).collect()
 
         } catch {
@@ -760,8 +744,6 @@ object CarbonDataRDDFactory {
             new DataLoadResultImpl(),
             carbonLoadModel,
             currentLoadCount,
-            tableCreationTime,
-            schemaLastUpdatedTime,
             rdd).collect()
         } catch {
           case ex: Exception =>
@@ -998,7 +980,7 @@ object CarbonDataRDDFactory {
         }
         try {
           // compaction handling
-          handleSegmentMerging(tableCreationTime)
+          handleSegmentMerging()
         } catch {
           case e: Exception =>
             throw new Exception(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 2f65fbc..d1d3015 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -19,11 +19,13 @@ package org.apache.carbondata.spark.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
+import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap}
 
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.processing.merger.TableMeta
 
 case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
 
@@ -42,4 +44,13 @@ object CarbonSparkUtil {
       }
     CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
   }
+
+  def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val table = CarbonTable.buildFromTableInfo(tableInfo)
+    val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table)
+    CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName,
+      CarbonSparkUtil.createSparkMeta(table), meta)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index d1baf79..7411e6e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -54,11 +54,9 @@ case class CarbonDatasourceHadoopRelation(
   ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
 
   @transient lazy val carbonRelation: CarbonRelation =
-    CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .lookupRelation(
-          Some(identifier.getCarbonTableIdentifier.getDatabaseName),
-          identifier.getCarbonTableIdentifier.getTableName)(sparkSession)
-    .asInstanceOf[CarbonRelation]
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.
+    createCarbonRelation(parameters, identifier, sparkSession)
+
 
   @transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 925b82b..d19eb39 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import java.util.Map
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonSessionCatalog}
 import org.apache.spark.sql.internal.CarbonSQLConf
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -34,7 +34,7 @@ import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
  */
 class CarbonEnv {
 
-  var carbonMetastore: CarbonMetastore = _
+  var carbonMetastore: CarbonMetaStore = _
 
   var sessionParams: SessionParams = _
 
@@ -64,7 +64,7 @@ class CarbonEnv {
         val storePath =
         CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
         LOGGER.info(s"carbon env initial: $storePath")
-        new CarbonMetastore(sparkSession.conf, storePath)
+        CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
       initialized = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 1c16143..498ea03 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -17,20 +17,23 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DecimalType, StructType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -125,7 +128,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     }
     val path = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
-        getPathForTable(sqlContext.sparkSession, dbName, tableName)
+        getPathForTable(sqlContext.sparkSession, dbName, tableName, parameters)
     } else {
         createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
     }
@@ -148,20 +151,22 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     val dbName: String = parameters.getOrElse("dbName",
       CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
     val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
+
     try {
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .lookupRelation(Option(dbName), tableName)(sparkSession)
-      CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+      if (parameters.contains("carbonSchemaPartsNo")) {
+        getPathForTable(sparkSession, dbName, tableName, parameters)
+      } else {
+        CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession)
+        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+      }
     } catch {
       case ex: NoSuchTableException =>
-        val sqlParser = new CarbonSpark2SqlParser
-        val fields = sqlParser.getFields(dataSchema)
-        val map = scala.collection.mutable.Map[String, String]()
-        parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
-        val options = new CarbonOption(parameters)
-        val bucketFields = sqlParser.getBucketFields(map, fields, options)
-        val cm = sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
-          tableName, fields, Nil, map, bucketFields)
+        val cm: TableModel = CarbonSource.createTableInfoFromParams(
+          parameters,
+          dataSchema,
+          dbName,
+          tableName)
         CreateTable(cm, false).run(sparkSession)
         CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
       case ex: Exception =>
@@ -171,13 +176,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
   /**
    * Returns the path of the table
+   *
    * @param sparkSession
    * @param dbName
    * @param tableName
    * @return
    */
   private def getPathForTable(sparkSession: SparkSession, dbName: String,
-      tableName : String): String = {
+      tableName : String, parameters: Map[String, String]): String = {
 
     if (StringUtils.isBlank(tableName)) {
       throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
@@ -186,9 +192,13 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       throw new MalformedCarbonCommandException("Table Name Should not have spaces ")
     }
     try {
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .lookupRelation(Option(dbName), tableName)(sparkSession)
-      CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+      if (parameters.contains("tablePath")) {
+        parameters.get("tablePath").get
+      } else {
+        CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession)
+        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+      }
     } catch {
       case ex: Exception =>
         throw new Exception(s"Do not have $dbName and $tableName", ex)
@@ -196,3 +206,50 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
   }
 
 }
+
+object CarbonSource {
+
+  def createTableInfoFromParams(parameters: Map[String, String],
+      dataSchema: StructType,
+      dbName: String,
+      tableName: String): TableModel = {
+    val sqlParser = new CarbonSpark2SqlParser
+    val fields = sqlParser.getFields(dataSchema)
+    val map = scala.collection.mutable.Map[String, String]()
+    parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
+    val options = new CarbonOption(parameters)
+    val bucketFields = sqlParser.getBucketFields(map, fields, options)
+    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
+      tableName, fields, Nil, map, bucketFields)
+  }
+
+  /**
+   * Update spark catalog table with schema information in case of schema storage is hive metastore
+   * @param tableDesc
+   * @param sparkSession
+   * @return
+   */
+  def updateCatalogTableWithCarbonSchema(tableDesc: CatalogTable,
+      sparkSession: SparkSession): CatalogTable = {
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val storageFormat = tableDesc.storage
+    val properties = storageFormat.properties
+    if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) {
+      val dbName: String = properties.getOrElse("dbName",
+        CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+      val tableName: String = properties.getOrElse("tableName", "").toLowerCase
+      val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName)
+      val tableInfo: TableInfo = TableNewProcessor(model)
+      val (tablePath, carbonSchemaString) =
+      metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession)
+      val map = CarbonUtil.convertToMultiStringMap(tableInfo)
+      properties.foreach(e => map.put(e._1, e._2))
+      map.put("tablePath", tablePath)
+      // updating params
+      val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
+      tableDesc.copy(storage = updatedFormat)
+    } else {
+      tableDesc
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index b08c113..0d5a821 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -63,16 +63,15 @@ private[sql] case class AlterTableAddColumns(
       // completion of 1st operation but as look up relation is called before it will have the
       // older carbon table and this can lead to inconsistent state in the system. Therefor look
       // up relation should be called after acquiring the lock
-      carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
         .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
         .tableMeta.carbonTable
       // get the latest carbon table and check for column existence
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
-      val tableMetadataFile = carbonTablePath.getSchemaFilePath
-      val thriftTableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .readSchemaFile(tableMetadataFile)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter
         .fromExternalToWrapperTableInfo(thriftTableInfo,
@@ -104,8 +103,8 @@ private[sql] case class AlterTableAddColumns(
       LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
     } catch {
-      case e: Exception => LOGGER
-        .error("Alter table add columns failed :" + e.getMessage)
+      case e: Exception =>
+        LOGGER.error(e, "Alter table add columns failed")
         if (newCols.nonEmpty) {
           LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
           new AlterTableDropColumnRDD(sparkSession.sparkContext,
@@ -147,9 +146,9 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
     val newTableName = newTableIdentifier.table.toLowerCase
     LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
     LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val relation: CarbonRelation =
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
+      metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
         .asInstanceOf[CarbonRelation]
     if (relation == null) {
       LOGGER.audit(s"Rename table request has failed. " +
@@ -168,15 +167,14 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       locks = AlterTableUtil
         .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
             sparkSession)
-      carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+      carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
         .asInstanceOf[CarbonRelation].tableMeta.carbonTable
       // get the latest carbon table and check for column existence
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
-      val tableMetadataFile = carbonTablePath.getSchemaFilePath
-      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
-        .carbonMetastore.readSchemaFile(tableMetadataFile)
+      val tableMetadataFile = carbonTablePath.getPath
+      val tableInfo: org.apache.carbondata.format.TableInfo =
+        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()
@@ -193,15 +191,13 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
         }
       }
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
-        newTableName,
-        carbonTable.getCarbonTableIdentifier.getTableId)
-      val newTablePath = CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .updateTableSchema(newTableIdentifier,
+        newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+      val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+          carbonTable.getCarbonTableIdentifier,
           tableInfo,
           schemaEvolutionEntry,
           carbonTable.getStorePath)(sparkSession)
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .removeTableFromMetadata(oldDatabaseName, oldTableName)
+      metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
         .runSqlHive(
           s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
@@ -213,8 +209,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {
-      case e: Exception => LOGGER
-        .error("Rename table failed: " + e.getMessage)
+      case e: Exception =>
+        LOGGER.error(e, "Rename table failed: " + e.getMessage)
         if (carbonTable != null) {
           AlterTableUtil
             .revertRenameTableChanges(oldTableIdentifier,
@@ -279,7 +275,8 @@ private[sql] case class AlterTableDropColumns(
     try {
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
         .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
         .tableMeta.carbonTable
       val partitionInfo = carbonTable.getPartitionInfo(tableName)
@@ -329,9 +326,8 @@ private[sql] case class AlterTableDropColumns(
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
-      val tableMetadataFile = carbonTablePath.getSchemaFilePath
-      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
-        .carbonMetastore.readSchemaFile(tableMetadataFile)
+      val tableInfo: org.apache.carbondata.format.TableInfo =
+        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       // maintain the deleted columns for schema evolution history
       var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala
@@ -393,7 +389,8 @@ private[sql] case class AlterTableDataTypeChange(
     try {
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
         .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
         .tableMeta.carbonTable
       val columnName = alterTableDataTypeChangeModel.columnName
@@ -415,9 +412,7 @@ private[sql] case class AlterTableDataTypeChange(
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
-      val tableMetadataFile = carbonTablePath.getSchemaFilePath
-      val tableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .readSchemaFile(tableMetadataFile)
+      val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       // maintain the added column for schema evolution history
       var addColumnSchema: ColumnSchema = null
       var deletedColumnSchema: ColumnSchema = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index d2022be..609f39b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, ResetCommand, RunnableCommand, SetCommand}
 
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
   extends RunnableCommand {
@@ -29,16 +30,19 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val dbName = command.databaseName
+    var tablesInDB: Seq[TableIdentifier] = null
+    if (sparkSession.sessionState.catalog.listDatabases().exists(_.equalsIgnoreCase(dbName))) {
+      tablesInDB = sparkSession.sessionState.catalog.listTables(dbName)
+    }
     // DropHiveDB command will fail if cascade is false and one or more table exists in database
     val rows = command.run(sparkSession)
-    if (command.cascade) {
-      val tablesInDB = CarbonEnv.getInstance(sparkSession).carbonMetastore.getAllTables()
-        .filter(_.database.exists(_.equalsIgnoreCase(dbName)))
+    if (command.cascade && tablesInDB != null) {
       tablesInDB.foreach { tableName =>
         CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
       }
     }
-    CarbonEnv.getInstance(sparkSession).carbonMetastore.dropDatabaseDirectory(dbName.toLowerCase)
+    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession)
+      .carbonMetastore.storePath)
     rows
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 6087736..760cb06 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -16,13 +16,14 @@
  */
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
@@ -61,7 +62,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       _, child: LogicalPlan, _, _) =>
         ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.createDatabaseDirectory(dbName)
+        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).
+          carbonMetastore.storePath)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
@@ -127,6 +129,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
       case reset@ResetCommand =>
         ExecutedCommandExec(CarbonResetCommand()) :: Nil
+      case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, None)
+        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
+           && tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") =>
+        val updatedCatalog =
+          CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession)
+        val cmd =
+          CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
+        ExecutedCommandExec(cmd) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 2fccd0c..2c1de52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUp
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
+import org.apache.carbondata.hadoop.CarbonInputFormat
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
 import org.apache.carbondata.spark.DeleteDelataResultImpl
@@ -107,7 +108,7 @@ private[sql] case class ProjectForDeleteCommand(
           CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
 
       case e: Exception =>
-        LOG.error("Exception in Delete data operation " + e.getMessage)
+        LOG.error(e, "Exception in Delete data operation " + e.getMessage)
         // ****** start clean up.
         // In case of failure , clean all related delete delta files
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
@@ -548,7 +549,7 @@ object deleteExecution {
 
     val (carbonInputFormat, job) =
       QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
-
+    CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
     val keyRdd = deleteRdd.map({ row =>
       val tupleId: String = row
         .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ce66733..88e89ad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -29,14 +29,13 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
@@ -48,6 +47,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -186,23 +186,24 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
     } else {
       // Add Database to catalog and persist
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      // Need to fill partitioner class when we support partition
-      val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+      val (tablePath, carbonSchemaString) =
+        catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
       if (createDSTable) {
         try {
           val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
           cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
           cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
+
           sparkSession.sql(
             s"""CREATE TABLE $dbName.$tbName
                 |(${ fields.map(f => f.rawSchema).mkString(",") })
                 |USING org.apache.spark.sql.CarbonSource""".stripMargin +
-            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath "$tablePath") """)
+            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+            s""""$tablePath" $carbonSchemaString) """)
         } catch {
           case e: Exception =>
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
             // call the drop table to delete the created table.
-
             CarbonEnv.getInstance(sparkSession).carbonMetastore
               .dropTable(catalog.storePath, identifier)(sparkSession)
 
@@ -234,9 +235,9 @@ case class DeleteLoadsById(
   def run(sparkSession: SparkSession): Seq[Row] = {
 
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
-      .map(_.carbonTable).getOrElse(null)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.deleteLoadById(
       loadids,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -259,9 +260,9 @@ case class DeleteLoadsByLoadDate(
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
-      .map(_.carbonTable).getOrElse(null)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.deleteLoadByDate(
       loadDate,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -279,13 +280,13 @@ object LoadTable {
       sqlContext: SQLContext,
       model: DictionaryLoadModel,
       noDictDimension: Array[CarbonDimension]): Unit = {
-
+    val sparkSession = sqlContext.sparkSession
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
       model.table)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
 
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     // read TableInfo
-    val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+    val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
 
     // modify TableInfo
     val columns = tableInfo.getFact_table.getTable_columns
@@ -294,23 +295,21 @@ object LoadTable {
         columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
       }
     }
+    val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+    entry.setTime_stamp(System.currentTimeMillis())
 
     // write TableInfo
-    CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
-    val catalog = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
+    metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier,
+      carbonTablePath.getCarbonTableIdentifier,
+      tableInfo, entry, carbonTablePath.getPath)(sparkSession)
 
     // update the schema modified time
-    catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime(
+    metastore.updateAndTouchSchemasUpdatedTime(
       carbonLoadModel.getDatabaseName,
-      carbonLoadModel.getTableName))
-
-    // update Metadata
-    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
-      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+      carbonLoadModel.getTableName)
 
     // update CarbonDataLoadSchema
-    val carbonTable = catalog.lookupRelation(Option(model.table.getDatabaseName),
+    val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
       model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
     carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
@@ -628,6 +627,14 @@ case class LoadTable(
           LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
           carbonLoadModel.setUseOnePass(false)
         }
+        // Create table and metadata folders if not exist
+        val carbonTablePath = CarbonStorePath
+          .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
+        val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+        val fileType = FileFactory.getFileType(metadataDirectoryPath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+          FileFactory.mkdirs(metadataDirectoryPath, fileType)
+        }
         if (carbonLoadModel.getUseOnePass) {
           val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
           val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
@@ -691,8 +698,7 @@ case class LoadTable(
             server,
             dataFrame,
             updateModel)
-        }
-        else {
+        } else {
           val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
             val fields = dataFrame.get.schema.fields
             import org.apache.spark.sql.functions.udf
@@ -726,6 +732,7 @@ case class LoadTable(
           } else {
             (dataFrame, dataFrame)
           }
+
           GlobalDictionaryUtil
             .generateGlobalDictionary(
               sparkSession.sqlContext,
@@ -794,9 +801,9 @@ private[sql] case class DeleteLoadByDate(
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
-      .map(_.carbonTable).getOrElse(null)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.deleteLoadByDate(
       loadDate,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -818,9 +825,7 @@ case class CleanFiles(
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val relation = catalog
       .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
-    val carbonTable = catalog
-      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
-      .map(_.carbonTable).getOrElse(null)
+    val carbonTable = relation.tableMeta.carbonTable
     CarbonStore.cleanFiles(
       getDB.getDatabaseName(databaseNameOp, sparkSession),
       tableName,
@@ -839,9 +844,9 @@ case class ShowLoads(
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
-      .map(_.carbonTable).getOrElse(null)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.showSegments(
       getDB.getDatabaseName(databaseNameOp, sparkSession),
       tableName,
@@ -867,15 +872,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
     catalog.checkSchemasModifiedTimeAndReloadTables()
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
-      val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull
-       locksToBeAcquired foreach {
-        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTable, lock)
+      locksToBeAcquired foreach {
+        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-      if (null != carbonTable) {
-        // clear driver B-tree and dictionary cache
-        ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
-      }
+
       CarbonEnv.getInstance(sparkSession).carbonMetastore
         .dropTable(storePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")