You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/12/24 14:13:04 UTC

[2/3] incubator-carbondata git commit: extract command to common

extract command to common

fix

fix testcase

remove redundant QueryTest

fix comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b50866b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b50866b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b50866b3

Branch: refs/heads/master
Commit: b50866b3a2e3eb7d704c6fa9f6e6fcdfc5e5de21
Parents: 956eb33
Author: jackylk <ja...@huawei.com>
Authored: Thu Dec 22 15:09:53 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Sat Dec 24 22:10:22 2016 +0800

----------------------------------------------------------------------
 .../core/carbon/AbsoluteTableIdentifier.java    |   6 +
 .../core/carbon/path/CarbonStorePath.java       |   6 +-
 .../core/carbon/path/CarbonTablePath.java       |  35 +-
 .../carbondata/examples/CarbonExample.scala     |   1 +
 .../carbondata/spark/load/CarbonLoaderUtil.java |  19 +-
 .../spark/load/DeleteLoadFolders.java           |  37 +-
 .../carbondata/spark/util/LoadMetadataUtil.java |   7 +-
 .../org/apache/carbondata/api/CarbonStore.scala | 176 ++++++++++
 .../spark/rdd/DataManagementFunc.scala          | 163 ++-------
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../execution/command/carbonTableSchema.scala   | 349 ++++---------------
 .../apache/spark/sql/hive/CarbonMetastore.scala |   6 +-
 .../spark/load/CarbonLoaderUtilTest.java        |   1 -
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../execution/command/carbonTableSchema.scala   | 275 +--------------
 .../apache/spark/sql/hive/CarbonMetastore.scala |  10 +-
 .../org/apache/spark/util/CleanFiles.scala      |  15 +-
 .../org/apache/spark/util/Compaction.scala      |  11 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |  14 +-
 .../apache/spark/util/DeleteSegmentById.scala   |  14 +-
 .../org/apache/spark/util/ShowSegments.scala    |  38 +-
 .../org/apache/spark/util/TableAPIUtil.scala    |  23 +-
 .../spark2/src/test/resources/data_alltypes.csv |  10 +
 .../AllDataTypesTestCaseAggregate.scala         |  10 +-
 .../carbondata/CarbonDataSourceSuite.scala      |   3 +-
 .../spark/carbondata/util/QueryTest.scala       |  66 ----
 .../vectorreader/VectorReaderTestCase.scala     |   6 +-
 .../sql/common/util/CarbonSessionTest.scala     |  74 ----
 .../spark/sql/common/util/QueryTest.scala       | 111 +++++-
 .../apache/spark/util/CarbonCommandSuite.scala  |  78 +++++
 .../lcm/status/SegmentStatusManager.java        |   4 +
 31 files changed, 608 insertions(+), 968 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
index cf7d92f..1424ba5 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * identifier which will have store path and carbon table identifier
@@ -65,6 +66,11 @@ public class AbsoluteTableIdentifier implements Serializable {
     return carbonTableIdentifier;
   }
 
+  public static AbsoluteTableIdentifier from(String dbName, String tableName) {
+    CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, "");
+    return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier);
+  }
+
   public static AbsoluteTableIdentifier fromTablePath(String tablePath) {
     String formattedTablePath = tablePath.replace('\\', '/');
     String[] names = formattedTablePath.split("/");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java
index 214c633..afe4d9a 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java
@@ -45,10 +45,14 @@ public class CarbonStorePath extends Path {
     CarbonTablePath carbonTablePath = new CarbonTablePath(tableIdentifier,
         storePath + File.separator + tableIdentifier.getDatabaseName() + File.separator
             + tableIdentifier.getTableName());
-
     return carbonTablePath;
   }
 
+  public static CarbonTablePath getCarbonTablePath(String storePath,
+      String dbName, String tableName) {
+    return new CarbonTablePath(storePath, dbName, tableName);
+  }
+
   public static CarbonTablePath getCarbonTablePath(AbsoluteTableIdentifier identifier) {
     CarbonTableIdentifier id = identifier.getCarbonTableIdentifier();
     return new CarbonTablePath(id, identifier.getTablePath());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index f90073e..54e7266 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -63,6 +63,12 @@ public class CarbonTablePath extends Path {
     this.tablePath = tablePathString;
   }
 
+  public CarbonTablePath(String storePath, String dbName, String tableName) {
+    super(storePath + File.separator + dbName + File.separator + tableName);
+    this.carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "");
+    this.tablePath = storePath + File.separator + dbName + File.separator + tableName;
+  }
+
   /**
    * The method returns the folder path containing the carbon file.
    *
@@ -249,22 +255,6 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * Gets absolute path of data file of given aggregate table
-   *
-   * @param aggTableID          unique aggregate table identifier
-   * @param partitionId         unique partition identifier
-   * @param segmentId           unique partition identifier
-   * @param filePartNo          data file part number
-   * @param factUpdateTimeStamp unique identifier to identify an update
-   * @return absolute path of data file stored in carbon data format
-   */
-  public String getCarbonAggDataFilePath(String aggTableID, String partitionId, String segmentId,
-      Integer filePartNo, Integer taskNo, String factUpdateTimeStamp) {
-    return getAggSegmentDir(aggTableID, partitionId, segmentId) + File.separator
-        + getCarbonDataFileName(filePartNo, taskNo, factUpdateTimeStamp);
-  }
-
-  /**
    * Gets data file name only with out path
    *
    * @param filePartNo          data file part number
@@ -297,15 +287,6 @@ public class CarbonTablePath extends Path {
     return getFactDir() + File.separator + PARTITION_PREFIX + partitionId;
   }
 
-  private String getAggSegmentDir(String aggTableID, String partitionId, String segmentId) {
-    return getAggPartitionDir(aggTableID, partitionId) + File.separator + SEGMENT_PREFIX
-        + segmentId;
-  }
-
-  private String getAggPartitionDir(String aggTableID, String partitionId) {
-    return getAggregateTableDir(aggTableID) + File.separator + PARTITION_PREFIX + partitionId;
-  }
-
   private String getMetaDataDir() {
     return tablePath + File.separator + METADATA_DIR;
   }
@@ -314,10 +295,6 @@ public class CarbonTablePath extends Path {
     return tablePath + File.separator + FACT_DIR;
   }
 
-  private String getAggregateTableDir(String aggTableId) {
-    return tablePath + File.separator + AGGREGATE_TABLE_PREFIX + aggTableId;
-  }
-
   @Override public boolean equals(Object o) {
     if (!(o instanceof CarbonTablePath)) {
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index c2e135a..273de95 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{CleanFiles, ShowSegments}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 492ceee..0d2ab6f 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.ColumnIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
@@ -86,7 +85,6 @@ import com.google.gson.Gson;
 import org.apache.spark.SparkConf;
 import org.apache.spark.util.Utils;
 
-
 public final class CarbonLoaderUtil {
 
   private static final LogService LOGGER =
@@ -449,11 +447,10 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
-  public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
-      String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(schema.getCarbonTable().getStorePath(),
-            schema.getCarbonTable().getCarbonTableIdentifier());
+  public static void writeLoadMetadata(String storeLocation, String dbName, String tableName,
+      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(storeLocation, dbName, tableName);
     String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
 
     DataOutputStream dataOutputStream;
@@ -464,11 +461,9 @@ public final class CarbonLoaderUtil {
         new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
 
     try {
-
       dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
       brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
               Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
       String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
       brWriter.write(metadataInstance);
     } finally {
@@ -478,12 +473,10 @@ public final class CarbonLoaderUtil {
         }
       } catch (Exception e) {
         LOGGER.error("error in  flushing ");
-
       }
       CarbonUtil.closeStreams(brWriter);
       writeOperation.close();
     }
-
   }
 
   public static String readCurrentTime() {
@@ -495,10 +488,10 @@ public final class CarbonLoaderUtil {
     return date;
   }
 
-  public static String extractLoadMetadataFileLocation(CarbonLoadModel loadModel) {
+  public static String extractLoadMetadataFileLocation(String dbName, String tableName) {
     CarbonTable carbonTable =
         org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-            .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
+            .getCarbonTable(dbName + '_' + tableName);
     return carbonTable.getMetaDataFilepath();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index 2b3979f..0663abc 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -37,6 +37,7 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -45,7 +46,6 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
 
 public final class DeleteLoadFolders {
 
@@ -58,23 +58,13 @@ public final class DeleteLoadFolders {
 
   /**
    * returns segment path
-   *
-   * @param loadModel
-   * @param storeLocation
-   * @param partitionId
-   * @param oneLoad
-   * @return
    */
-  private static String getSegmentPath(CarbonLoadModel loadModel, String storeLocation,
+  private static String getSegmentPath(String dbName, String tableName, String storeLocation,
       int partitionId, LoadMetadataDetails oneLoad) {
-
-    String path = null;
+    CarbonTablePath carbon = new CarbonStorePath(storeLocation).getCarbonTablePath(
+        new CarbonTableIdentifier(dbName, tableName, ""));
     String segmentId = oneLoad.getLoadName();
-
-    path = new CarbonStorePath(storeLocation).getCarbonTablePath(
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())
-        .getCarbonDataDirectoryPath("" + partitionId, segmentId);
-    return path;
+    return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
   }
 
   private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
@@ -221,32 +211,21 @@ public final class DeleteLoadFolders {
     }
   }
 
-  /**
-   * @param loadModel
-   * @param storeLocation
-   * @param isForceDelete
-   * @param details
-   * @return
-   *
-   */
-  public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
+  public static boolean deleteLoadFoldersFromFileSystem(String dbName, String tableName,
       String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
     List<LoadMetadataDetails> deletedLoads =
         new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
     boolean isDeleted = false;
-
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneLoad : details) {
         if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
-          String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
+          String path = getSegmentPath(dbName, tableName, storeLocation, 0, oneLoad);
           boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
           if (deletionStatus) {
             isDeleted = true;
             oneLoad.setVisibility("false");
             deletedLoads.add(oneLoad);
-            LOGGER.info("Info: " +
-                " Deleted the load " + oneLoad.getLoadName());
+            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index 11cf9f8..1c21fab 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -28,20 +28,19 @@
  */
 package org.apache.carbondata.spark.util;
 
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
 
 public final class LoadMetadataUtil {
   private LoadMetadataUtil() {
 
   }
 
-  public static boolean isLoadDeletionRequired(CarbonLoadModel loadModel) {
-    CarbonTable table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-        .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
+  public static boolean isLoadDeletionRequired(String dbName, String tableName) {
+    CarbonTable table = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName);
 
     String metaDataLocation = table.getMetaDataFilepath();
     LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
new file mode 100644
index 0000000..6fb2df1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.api
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.types.TimestampType
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.rdd.DataManagementFunc
+
+object CarbonStore {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def showSegments(
+      dbName: String,
+      tableName: String,
+      limit: Option[String]): Seq[Row] = {
+    val tableUniqueName = dbName + "_" + tableName
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val path = carbonTable.getMetaDataFilepath
+    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
+    if (loadMetadataDetailsArray.nonEmpty) {
+      val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
+      var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { (l1, l2) =>
+        java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double.parseDouble(l2.getLoadName)
+      }
+      if (limit.isDefined) {
+        loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
+          .filter(load => load.getVisibility.equalsIgnoreCase("true"))
+        val limitLoads = limit.get
+        try {
+          val lim = Integer.parseInt(limitLoads)
+          loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
+        } catch {
+          case _: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
+        }
+      }
+
+      loadMetadataDetailsSortedArray
+          .filter(_.getVisibility.equalsIgnoreCase("true"))
+          .map { load =>
+            Row(
+              load.getLoadName,
+              load.getLoadStatus,
+              new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
+              new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime)
+            )
+          }.toSeq
+    } else {
+      Seq.empty
+    }
+  }
+
+  def cleanFiles(
+      dbName: String,
+      tableName: String,
+      storePath: String): Unit = {
+    LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
+    try {
+      DataManagementFunc.cleanFiles(dbName, tableName, storePath)
+      LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
+    } catch {
+      case ex: Exception =>
+        sys.error(ex.getMessage)
+    }
+    Seq.empty
+  }
+
+  // validates load ids
+  private def validateLoadIds(loadids: Seq[String]): Unit = {
+    if (loadids.isEmpty) {
+      val errorMessage = "Error: Segment id(s) should not be empty."
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+  }
+
+  def deleteLoadById(
+      loadids: Seq[String],
+      dbName: String,
+      tableName: String): Unit = {
+
+    LOGGER.audit(s"Delete segment by Id request has been received for $dbName.$tableName")
+    validateLoadIds(loadids)
+
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
+    val path = carbonTable.getMetaDataFilepath
+
+    try {
+      val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
+        carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
+      if (invalidLoadIds.isEmpty) {
+        LOGGER.audit(s"Delete segment by Id is successfull for $dbName.$tableName.")
+      } else {
+        sys.error(s"Delete segment by Id is failed. Invalid ID is: ${invalidLoadIds.mkString(",")}")
+      }
+    } catch {
+      case ex: Exception =>
+        sys.error(ex.getMessage)
+    }
+    Seq.empty
+  }
+
+  def deleteLoadByDate(
+      timestamp: String,
+      dbName: String,
+      tableName: String): Unit = {
+    LOGGER.audit(s"Delete segment by Id request has been received for $dbName.$tableName")
+
+    val time = validateTimeFormat(timestamp)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
+    val path = carbonTable.getMetaDataFilepath
+
+    try {
+      val invalidLoadTimestamps =
+        SegmentStatusManager.updateDeletionStatus(
+          carbonTable.getAbsoluteTableIdentifier,
+          timestamp,
+          path,
+          time).asScala
+      if (invalidLoadTimestamps.isEmpty) {
+        LOGGER.audit(s"Delete segment by date is successful for $dbName.$tableName.")
+      } else {
+        sys.error("Delete segment by date is failed. No matching segment found.")
+      }
+    } catch {
+      case ex: Exception =>
+        sys.error(ex.getMessage)
+    }
+  }
+
+  // this function is for test only
+  def isSegmentValid(
+      dbName: String,
+      tableName: String,
+      segmentId: String): Boolean = {
+    val identifier = AbsoluteTableIdentifier.from(dbName, tableName)
+    val status = SegmentStatusManager.getSegmentStatus(identifier)
+    status.isValid(segmentId)
+  }
+
+  private def validateTimeFormat(timestamp: String): Long = {
+    val timeObj = Cast(Literal(timestamp), TimestampType).eval()
+    if (null == timeObj) {
+      val errorMessage = "Error: Invalid load start time format: " + timestamp
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+    timeObj.asInstanceOf[Long]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 28a9140..c2f06a4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -21,22 +21,18 @@ import java.util
 import java.util.concurrent._
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.LoadMetadataDetails
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionCallable, CompactionType}
 import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
@@ -48,104 +44,6 @@ object DataManagementFunc {
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def deleteLoadByDate(
-      sqlContext: SQLContext,
-      schema: CarbonDataLoadSchema,
-      databaseName: String,
-      tableName: String,
-      storePath: String,
-      dateField: String,
-      dateFieldActualName: String,
-      dateValue: String) {
-
-    val sc = sqlContext
-    // Delete the records based on data
-    val table = CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)
-    val loadMetadataDetailsArray =
-      SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
-    val resultMap = new CarbonDeleteLoadByDateRDD(
-      sc.sparkContext,
-      new DeletedLoadResultImpl(),
-      databaseName,
-      table.getDatabaseName,
-      dateField,
-      dateFieldActualName,
-      dateValue,
-      table.getFactTableName,
-      tableName,
-      storePath,
-      loadMetadataDetailsArray).collect.groupBy(_._1)
-
-    var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
-    if (resultMap.nonEmpty) {
-      if (resultMap.size == 1) {
-        if (resultMap.contains("")) {
-          LOGGER.error("Delete by Date request is failed")
-          sys.error("Delete by Date request is failed, potential causes " +
-              "Empty store or Invalid column type, For more details please refer logs.")
-        }
-      }
-      val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
-        var statusList = resultMap.get(elem.getLoadName)
-        // check for the merged load folder.
-        if (statusList.isEmpty && null != elem.getMergedLoadName) {
-          statusList = resultMap.get(elem.getMergedLoadName)
-        }
-
-        if (statusList.isDefined) {
-          elem.setModificationOrdeletionTimesStamp(CarbonLoaderUtil.readCurrentTime())
-          // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
-          // use MARKED_FOR_UPDATE
-          if (statusList.get
-              .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
-            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
-          } else {
-            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
-            updatedLoadMetadataDetailsList += elem
-          }
-          elem
-        } else {
-          elem
-        }
-      }
-
-      }
-
-      // Save the load metadata
-      val carbonLock = CarbonLockFactory
-          .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-            LockUsage.METADATA_LOCK
-          )
-      try {
-        if (carbonLock.lockWithRetries()) {
-          LOGGER.info("Successfully got the table metadata file lock")
-          if (updatedLoadMetadataDetailsList.nonEmpty) {
-            // TODO: Load Aggregate tables after retention.
-          }
-
-          // write
-          CarbonLoaderUtil.writeLoadMetadata(
-            schema,
-            databaseName,
-            table.getDatabaseName,
-            updatedloadMetadataDetails.asJava
-          )
-        }
-      } finally {
-        if (carbonLock.unlock()) {
-          LOGGER.info("unlock the table metadata file successfully")
-        } else {
-          LOGGER.error("Unable to unlock the metadata lock")
-        }
-      }
-    } else {
-      LOGGER.error("Delete by Date request is failed")
-      LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
-      sys.error("Delete by Date request is failed, potential causes " +
-          "Empty store or Invalid column type, For more details please refer logs.")
-    }
-  }
-
   def executeCompaction(carbonLoadModel: CarbonLoadModel,
       storePath: String,
       compactionModel: CompactionModel,
@@ -224,7 +122,7 @@ object DataManagementFunc {
    *
    * @param futureList
    */
-  def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
+  private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
       loadsToMerge: util
       .List[LoadMetadataDetails],
       executor: ExecutorService,
@@ -289,21 +187,29 @@ object DataManagementFunc {
   }
 
   def deleteLoadsAndUpdateMetadata(
-      carbonLoadModel: CarbonLoadModel,
-      table: CarbonTable,
+      dbName: String,
+      tableName: String,
       storePath: String,
       isForceDeletion: Boolean): Unit = {
-    if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
-      val loadMetadataFilePath = CarbonLoaderUtil
-          .extractLoadMetadataFileLocation(carbonLoadModel)
+    if (LoadMetadataUtil.isLoadDeletionRequired(dbName, tableName)) {
+      val loadMetadataFilePath =
+        CarbonLoaderUtil.extractLoadMetadataFileLocation(dbName, tableName)
       val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
-      val carbonTableStatusLock = CarbonLockFactory
-          .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-            LockUsage.TABLE_STATUS_LOCK)
+      val carbonTableStatusLock =
+        CarbonLockFactory.getCarbonLockObj(
+          new CarbonTableIdentifier(dbName, tableName, ""),
+          LockUsage.TABLE_STATUS_LOCK
+        )
 
       // Delete marked loads
-      val isUpdationRequired = DeleteLoadFolders
-          .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details)
+      val isUpdationRequired =
+        DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+          dbName,
+          tableName,
+          storePath,
+          isForceDeletion,
+          details
+        )
 
       if (isUpdationRequired) {
         try {
@@ -318,14 +224,10 @@ object DataManagementFunc {
             val latestStatus = CarbonLoaderUtil
                 .updateLoadMetadataFromOldToNew(details, latestMetadata)
 
-            CarbonLoaderUtil.writeLoadMetadata(
-              carbonLoadModel.getCarbonDataLoadSchema,
-              carbonLoadModel.getDatabaseName,
-              carbonLoadModel.getTableName, latestStatus)
+            CarbonLoaderUtil.writeLoadMetadata(storePath, dbName, tableName, latestStatus)
           } else {
             val errorMsg = "Clean files request is failed for " +
-                s"${ carbonLoadModel.getDatabaseName }." +
-                s"${ carbonLoadModel.getTableName }" +
+                s"$dbName.$tableName" +
                 ". Not able to acquire the table status lock due to other operation " +
                 "running in the background."
             LOGGER.audit(errorMsg)
@@ -340,29 +242,24 @@ object DataManagementFunc {
   }
 
   def cleanFiles(
-      sc: SparkContext,
-      carbonLoadModel: CarbonLoadModel,
-      storePath: String) {
-    val table = CarbonMetadata.getInstance.getCarbonTable(
-      carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
-    val carbonCleanFilesLock = CarbonLockFactory.getCarbonLockObj(
-      table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.CLEAN_FILES_LOCK)
+      dbName: String,
+      tableName: String,
+      storePath: String): Unit = {
+    val identifier = new CarbonTableIdentifier(dbName, tableName, "")
+    val carbonCleanFilesLock =
+      CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK)
     try {
       if (carbonCleanFilesLock.lockWithRetries()) {
         LOGGER.info("Clean files lock has been successfully acquired.")
-        deleteLoadsAndUpdateMetadata(carbonLoadModel,
-          table,
-          storePath,
-          isForceDeletion = true)
+        deleteLoadsAndUpdateMetadata(dbName, tableName, storePath, isForceDeletion = true)
       } else {
         val errorMsg = "Clean files request is failed for " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            s"$dbName.$tableName" +
             ". Not able to acquire the clean files lock due to another clean files " +
             "operation is running in the background."
         LOGGER.audit(errorMsg)
         LOGGER.error(errorMsg)
         throw new Exception(errorMsg + " Please try after some time.")
-
       }
     } finally {
       CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 53a5f67..93194c8 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -433,8 +433,8 @@ object CarbonDataRDDFactory {
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
-      DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
-        isForceDeletion = false)
+      DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
+        carbonLoadModel.getTableName, storePath, isForceDeletion = false)
       if (null == carbonLoadModel.getLoadMetadataDetails) {
         CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 041b65f..d82290e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -18,14 +18,13 @@
 package org.apache.spark.sql.execution.command
 
 import java.io.File
-import java.text.SimpleDateFormat
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
 import org.apache.spark.sql.hive.CarbonMetastore
@@ -33,9 +32,9 @@ import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
+import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
@@ -49,9 +48,23 @@ import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DataManagementFunc$, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 
+object Checker {
+  def validateTableExists(
+      dbName: Option[String],
+      tableName: String,
+      sqlContext: SQLContext): Unit = {
+    val identifier = TableIdentifier(tableName, dbName)
+    if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(sqlContext)) {
+      val err = s"table $dbName.$tableName not found"
+      LogServiceFactory.getLogService(this.getClass.getName).error(err)
+      throw new IllegalArgumentException(err)
+    }
+  }
+}
+
 /**
  * Command for the compaction in alter table command
  *
@@ -179,61 +192,14 @@ private[sql] case class DeleteLoadsById(
     databaseNameOp: Option[String],
     tableName: String) extends RunnableCommand {
 
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
   def run(sqlContext: SQLContext): Seq[Row] = {
-
-    val databaseName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    LOGGER.audit(s"Delete segment by Id request has been received for $databaseName.$tableName")
-
-    // validate load ids first
-    validateLoadIds
-    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore.lookupRelation1(
-      identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      LOGGER.audit(s"Delete segment by Id is failed. Table $dbName.$tableName does not exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
-
-    if (null == carbonTable) {
-      CarbonEnv.get.carbonMetastore
-        .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
-    }
-    val path = carbonTable.getMetaDataFilepath
-
-    try {
-      val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
-        carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
-
-      if (invalidLoadIds.isEmpty) {
-
-        LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.")
-      }
-      else {
-        sys.error("Delete segment by Id is failed. Invalid ID is:" +
-                  s" ${ invalidLoadIds.mkString(",") }")
-      }
-    } catch {
-      case ex: Exception =>
-        sys.error(ex.getMessage)
-    }
-
+    Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+    CarbonStore.deleteLoadById(
+      loadids,
+      getDB.getDatabaseName(databaseNameOp, sqlContext),
+      tableName
+    )
     Seq.empty
-
-  }
-
-  // validates load ids
-  private def validateLoadIds: Unit = {
-    if (loadids.isEmpty) {
-      val errorMessage = "Error: Segment id(s) should not be empty."
-      throw new MalformedCarbonCommandException(errorMessage)
-
-    }
   }
 }
 
@@ -243,90 +209,14 @@ private[sql] case class DeleteLoadsByLoadDate(
     dateField: String,
     loadDate: String) extends RunnableCommand {
 
-  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
-
   def run(sqlContext: SQLContext): Seq[Row] = {
-
-    LOGGER.audit("The delete segment by load date request has been received.")
-    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      LOGGER
-        .audit(s"Delete segment by load date is failed. Table $dbName.$tableName does not " +
-               s"exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-
-    val timeObj = Cast(Literal(loadDate), TimestampType).eval()
-    if (null == timeObj) {
-      val errorMessage = "Error: Invalid load start time format " + loadDate
-      throw new MalformedCarbonCommandException(errorMessage)
-    }
-
-    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-      .getCarbonTable(dbName + '_' + tableName)
-    if (null == carbonTable) {
-      var relation = CarbonEnv.get.carbonMetastore
-        .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
-    }
-    val path = carbonTable.getMetaDataFilepath()
-
-    try {
-      val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
-        carbonTable.getAbsoluteTableIdentifier, loadDate, path,
-        timeObj.asInstanceOf[java.lang.Long]).asScala
-      if (invalidLoadTimestamps.isEmpty) {
-        LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
-      }
-      else {
-        sys.error("Delete segment by date is failed. No matching segment found.")
-      }
-    } catch {
-      case ex: Exception =>
-        sys.error(ex.getMessage)
-    }
+    Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+    CarbonStore.deleteLoadByDate(
+      loadDate,
+      getDB.getDatabaseName(databaseNameOp, sqlContext),
+      tableName
+    )
     Seq.empty
-
-  }
-
-}
-
-object LoadTable {
-
-  def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      model: DictionaryLoadModel,
-      noDictDimension: Array[CarbonDimension]): Unit = {
-
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
-      model.table)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-
-    // read TableInfo
-    val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
-
-    // modify TableInfo
-    val columns = tableInfo.getFact_table.getTable_columns
-    for (i <- 0 until columns.size) {
-      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
-        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
-      }
-    }
-
-    // write TableInfo
-    CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
-    // update Metadata
-    val catalog = CarbonEnv.get.carbonMetastore
-    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
-      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
-
-    // update CarbonDataLoadSchema
-    val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
-      model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
-    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
   }
 
 }
@@ -511,7 +401,7 @@ case class LoadTable(
         carbonLoadModel.setCsvHeader(fileHeader)
         carbonLoadModel.setColDictFilePath(columnDict)
         carbonLoadModel.setDirectLoad(true)
-        GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
+        GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
         GlobalDictionaryUtil
           .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
             dataFrame)
@@ -565,6 +455,40 @@ case class LoadTable(
     Seq.empty
   }
 
+  private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      noDictDimension: Array[CarbonDimension]): Unit = {
+
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
+      model.table)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+
+    // read TableInfo
+    val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+
+    // modify TableInfo
+    val columns = tableInfo.getFact_table.getTable_columns
+    for (i <- 0 until columns.size) {
+      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
+        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+      }
+    }
+
+    // write TableInfo
+    CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
+
+    // update Metadata
+    val catalog = CarbonEnv.get.carbonMetastore
+    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
+      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+
+    // update CarbonDataLoadSchema
+    val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
+      model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+  }
+
   private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
     val dimensions = table.getDimensionByTableName(tableName).asScala
     if (dateFormat != null) {
@@ -651,60 +575,14 @@ private[sql] case class ShowLoads(
     limit: Option[String],
     override val output: Seq[Attribute]) extends RunnableCommand {
 
-
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    val databaseName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    val tableUniqueName = databaseName + "_" + tableName
-    // Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata if
-    // schema is changed by other process, so that tableInfoMap woulb be refilled.
-    val tableExists = CarbonEnv.get.carbonMetastore
-      .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
-    if (!tableExists) {
-      sys.error(s"$databaseName.$tableName is not found")
-    }
-    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-      .getCarbonTable(tableUniqueName)
-    if (carbonTable == null) {
-      sys.error(s"$databaseName.$tableName is not found")
-    }
-    val path = carbonTable.getMetaDataFilepath
-    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
-    if (loadMetadataDetailsArray.nonEmpty) {
-
-      val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
-
-      var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith(
-        (l1, l2) => java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double
-          .parseDouble(l2.getLoadName)
-      )
-
-
-      if (limit.isDefined) {
-        loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
-          .filter(load => load.getVisibility.equalsIgnoreCase("true"))
-        val limitLoads = limit.get
-        try {
-          val lim = Integer.parseInt(limitLoads)
-          loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
-        } catch {
-          case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
-        }
-
-      }
-
-      loadMetadataDetailsSortedArray.filter(load => load.getVisibility.equalsIgnoreCase("true"))
-        .map(load =>
-          Row(
-            load.getLoadName,
-            load.getLoadStatus,
-            new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
-            new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime))).toSeq
-    } else {
-      Seq.empty
-
-    }
+    Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+    CarbonStore.showSegments(
+      getDB.getDatabaseName(databaseNameOp, sqlContext),
+      tableName,
+      limit
+    )
   }
-
 }
 
 private[sql] case class DescribeCommandFormatted(
@@ -786,92 +664,17 @@ private[sql] case class DescribeCommandFormatted(
   }
 }
 
-private[sql] case class DeleteLoadByDate(
-    databaseNameOp: Option[String],
-    tableName: String,
-    dateField: String,
-    dateValue: String
-) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
-    var level: String = ""
-    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-      .getInstance().getCarbonTable(dbName + '_' + tableName)
-    if (relation == null) {
-      LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
-      filter => filter.name.equalsIgnoreCase(dateField) &&
-                filter.dataType.isInstanceOf[TimestampType]).toList
-    if (matches.isEmpty) {
-      LOGGER.audit("The delete load by date is failed. " +
-                   s"Table $dbName.$tableName does not contain date field: $dateField")
-      sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
-    } else {
-      level = matches.asJava.get(0).name
-    }
-    val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
-      .getColName
-    DataManagementFunc.deleteLoadByDate(
-      sqlContext,
-      new CarbonDataLoadSchema(carbonTable),
-      dbName,
-      tableName,
-      CarbonEnv.get.carbonMetastore.storePath,
-      level,
-      actualColName,
-      dateValue)
-    LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
-    Seq.empty
-  }
-
-}
-
 private[sql] case class CleanFiles(
     databaseNameOp: Option[String],
     tableName: String) extends RunnableCommand {
 
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
   def run(sqlContext: SQLContext): Seq[Row] = {
-    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(identifier)(sqlContext).
-      asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      LOGGER.audit(s"The clean files request is failed. Table $dbName.$tableName does not exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-
-    val carbonLoadModel = new CarbonLoadModel()
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    val table = relation.tableMeta.carbonTable
-    carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
-    carbonLoadModel.setTableName(table.getFactTableName)
-    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    try {
-      DataManagementFunc.cleanFiles(
-        sqlContext.sparkContext,
-        carbonLoadModel,
-        relation.tableMeta.storePath)
-      LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
-    } catch {
-      case ex: Exception =>
-        sys.error(ex.getMessage)
-    }
+    Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+    CarbonStore.cleanFiles(
+      getDB.getDatabaseName(databaseNameOp, sqlContext),
+      tableName,
+      sqlContext.asInstanceOf[CarbonContext].storePath
+    )
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 62803c7..9cdbf86 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -148,12 +148,12 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
                  c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
   }
 
-  def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
+  def tableExists(identifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
     checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+    val database = identifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
     val tables = metadata.tablesMeta.filter(
       c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(identifier.table))
     tables.nonEmpty
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java b/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
index 0161a45..ed4f95b 100644
--- a/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
+++ b/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
@@ -28,7 +28,6 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 79b1953..de07707 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -431,8 +431,8 @@ object CarbonDataRDDFactory {
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
-      DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
-        isForceDeletion = false)
+      DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
+        carbonLoadModel.getTableName, storePath, isForceDeletion = false)
       if (null == carbonLoadModel.getLoadMetadataDetails) {
         CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 10fffd9..68ad4d6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,17 +17,13 @@
 
 package org.apache.spark.sql.execution.command
 
-import java.text.SimpleDateFormat
-
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
-import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -40,12 +36,11 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
 
 /**
@@ -58,7 +53,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   def run(sparkSession: SparkSession): Seq[Row] = {
-    // TODO : Implement it.
     val tableName = alterTableModel.tableName
     val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
@@ -142,124 +136,6 @@ case class CreateTable(cm: TableModel) {
   }
 }
 
-case class DeleteLoadsById(
-    loadids: Seq[String],
-    databaseNameOp: Option[String],
-    tableName: String) {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-
-    val databaseName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Delete segment by Id request has been received for $databaseName.$tableName")
-
-    // validate load ids first
-    validateLoadIds
-    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore.lookupRelation(
-      identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      LOGGER.audit(s"Delete segment by Id is failed. Table $dbName.$tableName does not exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
-
-    if (null == carbonTable) {
-      CarbonEnv.get.carbonMetastore
-        .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
-    }
-    val path = carbonTable.getMetaDataFilepath
-
-    try {
-      val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
-        carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
-
-      if (invalidLoadIds.isEmpty) {
-
-        LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.")
-      }
-      else {
-        sys.error("Delete segment by Id is failed. Invalid ID is:" +
-                  s" ${ invalidLoadIds.mkString(",") }")
-      }
-    } catch {
-      case ex: Exception =>
-        sys.error(ex.getMessage)
-    }
-
-    Seq.empty
-
-  }
-
-  // validates load ids
-  private def validateLoadIds: Unit = {
-    if (loadids.isEmpty) {
-      val errorMessage = "Error: Segment id(s) should not be empty."
-      throw new MalformedCarbonCommandException(errorMessage)
-
-    }
-  }
-}
-
-case class DeleteLoadsByLoadDate(
-    databaseNameOp: Option[String],
-    tableName: String,
-    dateField: String,
-    loadDate: String) {
-
-  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-
-    LOGGER.audit("The delete segment by load date request has been received.")
-    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      LOGGER
-        .audit(s"Delete segment by load date is failed. Table $dbName.$tableName does not " +
-               s"exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-
-    val timeObj = Cast(Literal(loadDate), TimestampType).eval()
-    if (null == timeObj) {
-      val errorMessage = "Error: Invalid load start time format " + loadDate
-      throw new MalformedCarbonCommandException(errorMessage)
-    }
-
-    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-      .getCarbonTable(dbName + '_' + tableName)
-
-    if (null == carbonTable) {
-      var relation = CarbonEnv.get.carbonMetastore
-        .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
-    }
-    val path = carbonTable.getMetaDataFilepath()
-
-    try {
-      val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
-        carbonTable.getAbsoluteTableIdentifier, loadDate, path,
-        timeObj.asInstanceOf[java.lang.Long]).asScala
-      if (invalidLoadTimestamps.isEmpty) {
-        LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
-      }
-      else {
-        sys.error("Delete segment by date is failed. No matching segment found.")
-      }
-    } catch {
-      case ex: Exception =>
-        sys.error(ex.getMessage)
-    }
-    Seq.empty
-
-  }
-
-}
 
 object LoadTable {
 
@@ -557,152 +433,3 @@ case class LoadTable(
     }
   }
 }
-
-private[sql] case class DeleteLoadByDate(
-    databaseNameOp: Option[String],
-    tableName: String,
-    dateField: String,
-    dateValue: String) {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
-    var level: String = ""
-    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-      .getInstance().getCarbonTable(dbName + '_' + tableName)
-    if (relation == null) {
-      LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
-      filter => filter.name.equalsIgnoreCase(dateField) &&
-                filter.dataType.isInstanceOf[TimestampType]).toList
-    if (matches.isEmpty) {
-      LOGGER.audit("The delete load by date is failed. " +
-                   s"Table $dbName.$tableName does not contain date field: $dateField")
-      sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
-    } else {
-      level = matches.asJava.get(0).name
-    }
-    val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
-      .getColName
-    DataManagementFunc.deleteLoadByDate(
-      sparkSession.sqlContext,
-      new CarbonDataLoadSchema(carbonTable),
-      dbName,
-      tableName,
-      CarbonEnv.get.carbonMetastore.storePath,
-      level,
-      actualColName,
-      dateValue)
-    LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
-    Seq.empty
-  }
-
-}
-
-case class CleanFiles(
-    databaseNameOp: Option[String],
-    tableName: String) {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      LOGGER.audit(s"The clean files request is failed. Table $dbName.$tableName does not exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-
-    val carbonLoadModel = new CarbonLoadModel()
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    val table = relation.tableMeta.carbonTable
-    carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
-    carbonLoadModel.setTableName(table.getFactTableName)
-    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    try {
-      DataManagementFunc.cleanFiles(
-        sparkSession.sqlContext.sparkContext,
-        carbonLoadModel,
-        relation.tableMeta.storePath)
-      LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
-    } catch {
-      case ex: Exception =>
-        sys.error(ex.getMessage)
-    }
-    Seq.empty
-  }
-}
-
-case class ShowLoads(
-    databaseNameOp: Option[String],
-    tableName: String,
-    limit: Option[String],
-    val output: Seq[Attribute]) {
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-    val databaseName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableUniqueName = databaseName + "_" + tableName
-    // Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata if
-    // schema is changed by other process, so that tableInfoMap woulb be refilled.
-    val tableExists = CarbonEnv.get.carbonMetastore
-        .tableExists(TableIdentifier(tableName, databaseNameOp))(sparkSession)
-    if (!tableExists) {
-      sys.error(s"$databaseName.$tableName is not found")
-    }
-    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-        .getCarbonTable(tableUniqueName)
-    if (carbonTable == null) {
-      sys.error(s"$databaseName.$tableName is not found")
-    }
-    val path = carbonTable.getMetaDataFilepath
-    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
-    if (loadMetadataDetailsArray.nonEmpty) {
-
-      val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
-
-      var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith(
-        (l1, l2) => java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double
-            .parseDouble(l2.getLoadName)
-      )
-
-
-      if (limit.isDefined) {
-        loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
-            .filter(load => load.getVisibility.equalsIgnoreCase("true"))
-        val limitLoads = limit.get
-        try {
-          val lim = Integer.parseInt(limitLoads)
-          loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
-        } catch {
-          case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
-        }
-
-      }
-
-      loadMetadataDetailsSortedArray.filter(load => load.getVisibility.equalsIgnoreCase("true"))
-          .map(load =>
-            Row(
-              load.getLoadName,
-              load.getLoadStatus,
-              new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
-              new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime))).toSeq
-    } else {
-      Seq.empty
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 9467804..9638b8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -176,12 +176,14 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
                  c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
   }
 
-  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+  def tableExists(
+      table: String,
+      databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = {
     checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val database = databaseOp.getOrElse(sparkSession.catalog.currentDatabase)
     val tables = metadata.tablesMeta.filter(
       c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(table))
     tables.nonEmpty
   }
 
@@ -294,7 +296,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
       tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
       dbName: String, tableName: String)
     (sparkSession: SparkSession): String = {
-    if (tableExists(TableIdentifier(tableName, Some(dbName)))(sparkSession)) {
+    if (tableExists(tableName, Some(dbName))(sparkSession)) {
       sys.error(s"Table [$tableName] already exists under Database [$dbName]")
     }
     val schemaConverter = new ThriftWrapperSchemaConverterImpl

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index f493af1..c84882e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.execution.command.{CleanFiles => TableCleanFiles}
+
+import org.apache.carbondata.api.CarbonStore
 
 /**
  * clean files api
@@ -26,21 +27,23 @@ import org.apache.spark.sql.execution.command.{CleanFiles => TableCleanFiles}
  // scalastyle:off
 object CleanFiles {
 
-  def cleanFiles(spark: SparkSession, dbName: Option[String], tableName: String): Unit = {
-    TableCleanFiles(dbName, tableName).run(spark)
+  def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
+      storePath: String): Unit = {
+    TableAPIUtil.validateTableExists(spark, dbName, tableName)
+    CarbonStore.cleanFiles(dbName, tableName, storePath)
   }
 
   def main(args: Array[String]): Unit = {
 
     if (args.length < 2) {
-      System.err.println("Usage: TableCleanFiles <store path> <table name>");
+      System.err.println("Usage: CleanFiles <store path> <table name>");
       System.exit(1)
     }
 
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
-    val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
+    val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
     CarbonEnv.init(spark.sqlContext)
-    cleanFiles(spark, Option(dbName), tableName)
+    cleanFiles(spark, dbName, tableName, storePath)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 90310d3..1e891fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -25,22 +25,23 @@ import org.apache.spark.sql.execution.command.{AlterTableCompaction, AlterTableM
  // scalastyle:off
 object Compaction {
 
-  def compaction(spark: SparkSession, dbName: Option[String], tableName: String,
+  def compaction(spark: SparkSession, dbName: String, tableName: String,
       compactionType: String): Unit = {
-    AlterTableCompaction(AlterTableModel(dbName, tableName, compactionType, "")).run(spark)
+    TableAPIUtil.validateTableExists(spark, dbName, tableName)
+    AlterTableCompaction(AlterTableModel(Some(dbName), tableName, compactionType, "")).run(spark)
   }
 
   def main(args: Array[String]): Unit = {
     if (args.length < 3) {
-      System.err.println("Usage: TableCompaction <store path> <table name> <major|minor>");
+      System.err.println("Usage: Compaction <store path> <table name> <major|minor>");
       System.exit(1)
     }
 
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val compactionType = TableAPIUtil.escape(args(2))
-    val spark = TableAPIUtil.spark(storePath, s"TableCompaction: $dbName.$tableName")
+    val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
     CarbonEnv.init(spark.sqlContext)
-    compaction(spark, Option(dbName), tableName, compactionType)
+    compaction(spark, dbName, tableName, compactionType)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index f77a16e..ae95bf6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -17,7 +17,8 @@
  package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.execution.command.DeleteLoadsByLoadDate
+
+import org.apache.carbondata.api.CarbonStore
 
 /**
  * delete segments before some date
@@ -25,23 +26,24 @@ import org.apache.spark.sql.execution.command.DeleteLoadsByLoadDate
 // scalastyle:off
 object DeleteSegmentByDate {
 
-  def deleteSegmentByDate(spark: SparkSession, dbName: Option[String], tableName: String,
+  def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
       dateValue: String): Unit = {
-    DeleteLoadsByLoadDate(dbName, tableName, "", dateValue).run(spark)
+    TableAPIUtil.validateTableExists(spark, dbName, tableName)
+    CarbonStore.deleteLoadByDate(dateValue, dbName, tableName)
   }
 
   def main(args: Array[String]): Unit = {
     if (args.length < 3) {
       System.err.println(
-        "Usage: TableDeleteSegmentByDate <store path> <table name> <before date value>");
+        "Usage: DeleteSegmentByDate <store path> <table name> <before date value>");
       System.exit(1)
     }
 
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val dateValue = TableAPIUtil.escape(args(2))
-    val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
+    val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
     CarbonEnv.init(spark.sqlContext)
-    deleteSegmentByDate(spark, Option(dbName), tableName, dateValue)
+    deleteSegmentByDate(spark, dbName, tableName, dateValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index c3e8626..d5a6861 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.execution.command.DeleteLoadsById
+
+import org.apache.carbondata.api.CarbonStore
 
 /**
  * delete segments by id list
@@ -29,24 +30,25 @@ object DeleteSegmentById {
     segmentIds.split(",").toSeq
   }
 
-  def deleteSegmentById(spark: SparkSession, dbName: Option[String], tableName: String,
+  def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
       segmentIds: Seq[String]): Unit = {
-    DeleteLoadsById(segmentIds, dbName, tableName).run(spark)
+    TableAPIUtil.validateTableExists(spark, dbName, tableName)
+    CarbonStore.deleteLoadById(segmentIds, dbName, tableName)
   }
 
   def main(args: Array[String]): Unit = {
 
     if (args.length < 3) {
       System.err.println(
-        "Usage: TableDeleteSegmentByID <store path> <table name> <segment id list>");
+        "Usage: DeleteSegmentByID <store path> <table name> <segment id list>");
       System.exit(1)
     }
 
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
-    val spark = TableAPIUtil.spark(storePath, s"TableDeleteSegmentById: $dbName.$tableName")
+    val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
     CarbonEnv.init(spark.sqlContext)
-    deleteSegmentById(spark, Option(dbName), tableName, segmentIds)
+    deleteSegmentById(spark, dbName, tableName, segmentIds)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 5ddffcd..1a02c8c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -17,41 +17,41 @@
 
 package org.apache.spark.util
 
+import java.text.SimpleDateFormat
+
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.command.ShowLoads
-import org.apache.spark.sql.types.{StringType, TimestampType}
+
+import org.apache.carbondata.api.CarbonStore
 
 // scalastyle:off
 object ShowSegments {
 
-  def showSegments(spark: SparkSession, dbName: Option[String], tableName: String,
+  def showSegments(spark: SparkSession, dbName: String, tableName: String,
       limit: Option[String]): Seq[Row] = {
-    val output =  Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
-      AttributeReference("Status", StringType, nullable = false)(),
-      AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-      AttributeReference("Load End Time", TimestampType, nullable = false)())
-    ShowLoads(dbName, tableName, limit: Option[String], output).run(spark)
+    //val databaseName = dbName.getOrElse(spark.catalog.currentDatabase)
+    TableAPIUtil.validateTableExists(spark, dbName, tableName)
+    CarbonStore.showSegments(dbName, tableName, limit)
   }
 
   def showString(rows: Seq[Row]): String = {
+    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.s")
     val sb = new StringBuilder
-    sb.append("+-----------------+---------------+---------------------+---------------------+\n")
-      .append("|SegmentSequenceId|Status         |Load Start Time      |Load End Time        |\n")
-      .append("+-----------------+---------------+---------------------+---------------------+\n")
+    sb.append("+------------+------------------+----------------------+----------------------+\n")
+      .append("|SegmentId   |Status            |Load Start Time       |Load End Time         |\n")
+      .append("+------------+------------------+----------------------+----------------------+\n")
       rows.foreach{row =>
         sb.append("|")
-          .append(StringUtils.rightPad(row.getString(0), 17))
+          .append(StringUtils.rightPad(row.getString(0), 12))
           .append("|")
-          .append(StringUtils.rightPad(row.getString(1).substring(0, 15), 15))
+          .append(StringUtils.rightPad(row.getString(1), 18))
           .append("|")
-          .append(row.getAs[java.sql.Timestamp](2).formatted("yyyy-MM-dd HH:mm:ss.s"))
+          .append(sdf.format(row.getAs[java.sql.Timestamp](2)))
           .append("|")
-          .append(row.getAs[java.sql.Timestamp](3).formatted("yyyy-MM-dd HH:mm:ss.s"))
+          .append(sdf.format(row.getAs[java.sql.Timestamp](3)))
           .append("|\n")
       }
-    sb.append("+-----------------+---------------+---------------------+---------------------+\n")
+    sb.append("+------------+------------------+----------------------+----------------------+\n")
     sb.toString
   }
 
@@ -74,9 +74,9 @@ object ShowSegments {
     } else {
       None
     }
-    val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
+    val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
     CarbonEnv.init(spark.sqlContext)
-    val rows = showSegments(spark, Option(dbName), tableName, limit)
+    val rows = showSegments(spark, dbName, tableName, limit)
     System.out.println(showString(rows))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index 6954981..cb444d6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.util
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * table api util
  */
 object TableAPIUtil {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   def parseSchemaName(tableName: String): (String, String) = {
     if (tableName.contains(".")) {
       val parts = tableName.split(".")
@@ -44,11 +50,22 @@ object TableAPIUtil {
   }
 
   def spark(storePath: String, appName: String): SparkSession = {
+    // CarbonEnv depends on CarbonProperty to get the store path, so set it here
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
     SparkSession
         .builder
         .appName(appName)
-        .master("local")
-        .config(CarbonCommonConstants.STORE_LOCATION, storePath)
         .getOrCreate()
   }
+
+  def validateTableExists(
+      spark: SparkSession,
+      dbName: String,
+      tableName: String): Unit = {
+    if (!CarbonEnv.get.carbonMetastore.tableExists(tableName, Some(dbName))(spark)) {
+      val err = s"table $dbName.$tableName not found"
+      LOGGER.error(err)
+      throw new MalformedCarbonCommandException(err)
+    }
+  }
 }