You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/12/12 09:17:19 UTC

carbondata git commit: [CARBONDATA-1851] Refactor to use only SegmentsToAccess for Aggregatetable, move tableFolderDeletion to TableProcessingOperations

Repository: carbondata
Updated Branches:
  refs/heads/master 4eb37240f -> 34cb55194


[CARBONDATA-1851] Refactor to use only SegmentsToAccess for Aggregatetable, move tableFolderDeletion to TableProcessingOperations

This closes #1616


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/34cb5519
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/34cb5519
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/34cb5519

Branch: refs/heads/master
Commit: 34cb55194c2061efe54acaf4cea67d5b6179034c
Parents: 4eb3724
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue Dec 5 19:48:38 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Dec 12 14:45:41 2017 +0530

----------------------------------------------------------------------
 .../filesystem/AbstractDFSCarbonFile.java       |   4 +-
 .../core/metadata/schema/table/CarbonTable.java |  10 ++
 .../carbondata/core/scan/model/QueryModel.java  |  46 +++---
 .../carbondata/core/util/CarbonProperties.java  |   8 +
 .../carbondata/core/util/SessionParams.java     |   7 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  17 +-
 .../hadoop/util/CarbonInputFormatUtil.java      |   8 +-
 .../org/apache/carbondata/events/Events.scala   |   2 +-
 .../load/DataLoadProcessorStepOnSpark.scala     |   4 +-
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  |   3 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   3 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  20 +--
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  10 +-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |   6 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |   5 +-
 .../management/CarbonCleanFilesCommand.scala    |   9 +-
 .../CarbonAlterTableDropPartitionCommand.scala  |   7 +-
 .../CarbonAlterTableSplitPartitionCommand.scala |   7 +-
 .../loading/TableProcessingOperations.java      | 164 +++++++++++++++++++
 .../store/CarbonFactDataHandlerModel.java       |   2 +-
 .../processing/util/CarbonLoaderUtil.java       | 100 -----------
 21 files changed, 249 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 3eb97bc..fcd230a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -63,7 +63,7 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
       fs = path.getFileSystem(this.hadoopConf);
       fileStatus = fs.getFileStatus(path);
     } catch (IOException e) {
-      LOGGER.error("Exception occurred:" + e.getMessage());
+      LOGGER.debug("Exception occurred:" + e.getMessage());
     }
   }
 
@@ -78,7 +78,7 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
       fs = path.getFileSystem(this.hadoopConf);
       fileStatus = fs.getFileStatus(path);
     } catch (IOException e) {
-      LOGGER.error("Exception occurred:" + e.getMessage());
+      LOGGER.debug("Exception occurred:" + e.getMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index e5d8839..4ebc02d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -273,6 +273,16 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * to get the all dimension of a table
+   *
+   * @param tableName
+   * @return
+   */
+  public List<CarbonDimension> getImplicitDimensionByTableName(String tableName) {
+    return tableImplicitDimensionsMap.get(tableName);
+  }
+
+  /**
    * Read all primitive/complex children and set it as list of child carbon dimension to parent
    * dimension
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 67b8681..5e4872b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -122,10 +122,9 @@ public class QueryModel implements Serializable {
   public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier,
       CarbonQueryPlan queryPlan, CarbonTable carbonTable, DataTypeConverter converter) {
     QueryModel queryModel = new QueryModel();
-    String factTableName = carbonTable.getTableName();
     queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier);
 
-    fillQueryModel(queryPlan, carbonTable, queryModel, factTableName);
+    fillQueryModel(queryPlan, carbonTable, queryModel);
 
     queryModel.setForcedDetailRawQuery(queryPlan.isRawDetailQuery());
     queryModel.setQueryId(queryPlan.getQueryId());
@@ -134,7 +133,7 @@ public class QueryModel implements Serializable {
   }
 
   private static void fillQueryModel(CarbonQueryPlan queryPlan, CarbonTable carbonTable,
-      QueryModel queryModel, String factTableName) {
+      QueryModel queryModel) {
     queryModel.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
     queryModel.setQueryDimension(queryPlan.getDimensions());
     queryModel.setQueryMeasures(queryPlan.getMeasures());
@@ -142,9 +141,8 @@ public class QueryModel implements Serializable {
       boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
       boolean[] isFilterMeasures =
           new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())];
-      processFilterExpression(queryPlan.getFilterExpression(),
-          carbonTable.getDimensionByTableName(factTableName),
-          carbonTable.getMeasureByTableName(factTableName), isFilterDimensions, isFilterMeasures);
+      processFilterExpression(carbonTable, queryPlan.getFilterExpression(), isFilterDimensions,
+          isFilterMeasures);
       queryModel.setIsFilterDimensions(isFilterDimensions);
       queryModel.setIsFilterMeasures(isFilterMeasures);
     }
@@ -153,8 +151,7 @@ public class QueryModel implements Serializable {
     queryModel.setTable(carbonTable);
   }
 
-  public static void processFilterExpression(Expression filterExpression,
-      List<CarbonDimension> dimensions, List<CarbonMeasure> measures,
+  public static void processFilterExpression(CarbonTable carbonTable, Expression filterExpression,
       final boolean[] isFilterDimensions, final boolean[] isFilterMeasures) {
     if (null != filterExpression) {
       if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) {
@@ -162,24 +159,22 @@ public class QueryModel implements Serializable {
           List<ColumnExpression> listOfCol =
               ((ConditionalExpression) filterExpression).getColumnList();
           for (ColumnExpression expression : listOfCol) {
-            setDimAndMsrColumnNode(dimensions, measures, expression, isFilterDimensions,
-                isFilterMeasures);
+            setDimAndMsrColumnNode(carbonTable, expression, isFilterDimensions, isFilterMeasures);
           }
         }
       }
       for (Expression expression : filterExpression.getChildren()) {
         if (expression instanceof ColumnExpression) {
-          setDimAndMsrColumnNode(dimensions, measures, (ColumnExpression) expression,
-              isFilterDimensions, isFilterMeasures);
+          setDimAndMsrColumnNode(carbonTable, (ColumnExpression) expression, isFilterDimensions,
+              isFilterMeasures);
         } else if (expression instanceof UnknownExpression) {
           UnknownExpression exp = ((UnknownExpression) expression);
           List<ColumnExpression> listOfColExpression = exp.getAllColumnList();
           for (ColumnExpression col : listOfColExpression) {
-            setDimAndMsrColumnNode(dimensions, measures, col, isFilterDimensions, isFilterMeasures);
+            setDimAndMsrColumnNode(carbonTable, col, isFilterDimensions, isFilterMeasures);
           }
         } else {
-          processFilterExpression(expression, dimensions, measures, isFilterDimensions,
-              isFilterMeasures);
+          processFilterExpression(carbonTable, expression, isFilterDimensions, isFilterMeasures);
         }
       }
     }
@@ -195,15 +190,16 @@ public class QueryModel implements Serializable {
     return null;
   }
 
-  private static void setDimAndMsrColumnNode(List<CarbonDimension> dimensions,
-      List<CarbonMeasure> measures, ColumnExpression col, boolean[] isFilterDimensions,
-      boolean[] isFilterMeasures) {
+  private static void setDimAndMsrColumnNode(CarbonTable carbonTable, ColumnExpression col,
+      boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
     CarbonDimension dim;
     CarbonMeasure msr;
     String columnName;
     columnName = col.getColumnName();
-    dim = CarbonUtil.findDimension(dimensions, columnName);
-    msr = getCarbonMetadataMeasure(columnName, measures);
+    dim = CarbonUtil
+        .findDimension(carbonTable.getDimensionByTableName(carbonTable.getTableName()), columnName);
+    msr = getCarbonMetadataMeasure(columnName,
+        carbonTable.getMeasureByTableName(carbonTable.getTableName()));
     col.setDimension(false);
     col.setMeasure(false);
 
@@ -215,13 +211,21 @@ public class QueryModel implements Serializable {
       if (null != isFilterDimensions) {
         isFilterDimensions[dim.getOrdinal()] = true;
       }
-    } else {
+    } else if (msr != null) {
       col.setCarbonColumn(msr);
       col.setMeasure(msr);
       col.setMeasure(true);
       if (null != isFilterMeasures) {
         isFilterMeasures[msr.getOrdinal()] = true;
       }
+    } else {
+      // check if this is an implicit dimension
+      dim = CarbonUtil
+          .findDimension(carbonTable.getImplicitDimensionByTableName(carbonTable.getTableName()),
+              columnName);
+      col.setCarbonColumn(dim);
+      col.setDimension(dim);
+      col.setDimension(true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 281ee15..fe396cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -990,4 +990,12 @@ public final class CarbonProperties {
     return addedProperty;
   }
 
+  /**
+   * to add external property
+   *
+   * @param externalPropertySet
+   */
+  public void addPropertyToPropertySet(Set<String> externalPropertySet) {
+    propertySet.addAll(externalPropertySet);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 5dda9e4..0540ed6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -80,12 +80,7 @@ public class SessionParams implements Serializable {
    * @return properties value
    */
   public SessionParams addProperty(String key, String value) throws InvalidConfigurationException {
-    boolean isValidConf = validateKeyValue(key, value);
-    if (isValidConf) {
-      LOGGER.audit("The key " + key + " with value " + value + " added in the session param");
-      sProps.put(key, value);
-    }
-    return this;
+    return addProperty(key, value, true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 15d1304..c16b0aa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -280,14 +280,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
   }
 
-  public static void setAggeragateTableSegments(Configuration configuration, String segments) {
-    configuration.set(CarbonCommonConstants.CARBON_INPUT_SEGMENTS, segments);
-  }
-
-  private static String getAggeragateTableSegments(Configuration configuration) {
-    return configuration.get(CarbonCommonConstants.CARBON_INPUT_SEGMENTS);
-  }
-
   /**
    * get list of segment to access
    */
@@ -330,14 +322,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
-    String aggregateTableSegments = getAggeragateTableSegments(job.getConfiguration());
     TableDataMap blockletMap =
         DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
             BlockletDataMapFactory.class.getName());
     List<String> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<String> streamSegments = null;
-    List<String> filteredSegmentToAccess = null;
+
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
       // get all valid segments and set them into the configuration
       SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
@@ -349,7 +340,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
 
-      filteredSegmentToAccess = getFilteredSegment(job, validSegments);
+      List<String> filteredSegmentToAccess = getFilteredSegment(job, validSegments);
       if (filteredSegmentToAccess.size() == 0) {
         return new ArrayList<>(0);
       } else {
@@ -363,10 +354,10 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       if (invalidSegments.size() > 0) {
         blockletMap.clear(invalidSegments);
       }
-    } else {
-      filteredSegmentToAccess = Arrays.asList(aggregateTableSegments.split(","));
     }
 
+    // get updated filtered list
+    List<String> filteredSegmentToAccess = Arrays.asList(getSegmentsToAccess(job));
     // Clean the updated segments from memory if the update happens on segments
     List<String> toBeCleanedSegments = new ArrayList<>();
     for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index bf1b188..514428b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -121,12 +121,8 @@ public class CarbonInputFormatUtil {
 
   public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable,
       boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getTableName());
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getTableName());
-    QueryModel.processFilterExpression(filterExpression, dimensions, measures,
-        isFilterDimensions, isFilterMeasures);
+    QueryModel.processFilterExpression(carbonTable, filterExpression, isFilterDimensions,
+        isFilterMeasures);
 
     if (null != filterExpression) {
       // Optimize Filter Expression and fit RANGE filters is conditions apply.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 8e69855..799d8c4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.events
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
 
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index c28426d..6759b20 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.loading.DataLoadProcessBuilder
+import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -206,7 +206,7 @@ object DataLoadProcessorStepOnSpark {
         dataWriter.close()
       }
       // clean up the folders and files created locally for data load operation
-      CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
+      TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 37ab8c3..1ecab9f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -28,6 +28,7 @@ import org.apache.spark.util.PartitionUtils
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.AlterPartitionResult
@@ -119,7 +120,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
                     case e: Exception =>
                         sys.error(s"Exception when executing Row result processor ${e.getMessage}")
                 } finally {
-                    CarbonLoaderUtil
+                    TableProcessingOperations
                       .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
                 }
             }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 82b2a57..fb4634e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -46,6 +46,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUt
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.splits.TableSplit
@@ -233,7 +234,7 @@ class CarbonMergerRDD[K, V](
         // delete temp location data
         try {
           val isCompactionFlow = true
-          CarbonLoaderUtil
+          TableProcessingOperations
             .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false)
         } catch {
           case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index d599c22..cc7f757 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -357,22 +357,14 @@ class CarbonScanRDD(
         CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
       CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     }
+
+    // when validate segments is disabled in thread local update it to CarbonTableInputFormat
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {
-      val segmentsToScan = carbonSessionInfo.getSessionParams.getProperty(
-        CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-        identifier.getCarbonTableIdentifier.getDatabaseName + "." +
-        identifier.getCarbonTableIdentifier.getTableName)
-      if (segmentsToScan != null) {
-        CarbonTableInputFormat.setAggeragateTableSegments(conf, segmentsToScan)
-      }
-      val validateSegments = carbonSessionInfo.getSessionParams.getProperty(
-        CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-        identifier.getCarbonTableIdentifier.getDatabaseName + "." +
-        identifier.getCarbonTableIdentifier.getTableName)
-      if (validateSegments != null) {
-        CarbonTableInputFormat.setValidateSegmentsToAccess(conf, validateSegments.toBoolean)
-      }
+      CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+          .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+                       identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+                       identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
     }
     format
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index f948ac8..b27521a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -45,12 +45,12 @@ import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
-import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
+import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.splits.TableSplit
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, CarbonQueryUtil}
+import org.apache.carbondata.processing.util.CarbonQueryUtil
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
@@ -261,7 +261,7 @@ class NewCarbonDataLoadRDD[K, V](
           throw e
       } finally {
         // clean up the folders and files created locally for data load operation
-        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
+        TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several times.
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
@@ -403,7 +403,7 @@ class NewDataFrameLoaderRDD[K, V](
           throw e
       } finally {
         // clean up the folders and files created locally for data load operation
-        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
+        TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several times.
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
@@ -587,7 +587,7 @@ class PartitionTableDataLoaderRDD[K, V](
           throw e
       } finally {
         // clean up the folders and files created locally for data load operation
-        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
+        TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several times.
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index b1dfc01..4934cbc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -24,11 +24,9 @@ import org.apache.spark.sql.Row
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
+import org.apache.carbondata.processing.loading.{DataLoadExecutor, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.loading.DataLoadExecutor
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 /**
  * Data load in case of update command .
@@ -65,7 +63,7 @@ object UpdateDataLoad {
         LOGGER.error(e)
         throw e
     } finally {
-      CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(carbonLoadModel, false, false)
+      TableProcessingOperations.deleteLocalDataLoadFolderLocation(carbonLoadModel, false, false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 6fafc95..6da8bd6 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.execution.command.CompactionModel
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 abstract class Compactor(carbonLoadModel: CarbonLoadModel,
     compactionModel: CompactionModel,
@@ -52,7 +52,8 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel,
     // status.
     // so deleting those folders.
     try {
-      CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+      TableProcessingOperations
+        .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, true)
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index eacfded..e0530f6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -57,7 +57,7 @@ case class CarbonCleanFilesCommand(
     }
     val cleanFilesPostEvent: CleanFilesPostEvent =
       CleanFilesPostEvent(carbonTable, sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent)
+    OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
     Seq.empty
   }
 
@@ -76,10 +76,6 @@ case class CarbonCleanFilesCommand(
   private def cleanGarbageData(sparkSession: SparkSession,
       databaseNameOp: Option[String], tableName: String): Unit = {
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
 
     CarbonStore.cleanFiles(
       CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
@@ -87,9 +83,6 @@ case class CarbonCleanFilesCommand(
       CarbonProperties.getStorePath,
       carbonTable,
       forceTableClean)
-
-    val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent)
   }
 
   private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 69aa91a..fb515fa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -35,13 +35,12 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.partition.DropPartitionCallable
 
 case class CarbonAlterTableDropPartitionCommand(
@@ -224,7 +223,9 @@ case class CarbonAlterTableDropPartitionCommand(
     } finally {
       executor.shutdown()
       try {
-        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+            false)
       } catch {
         case e: Exception =>
           LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 338ec5a..1a535fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -38,13 +38,12 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetad
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.partition.SplitPartitionCallable
 
 /**
@@ -233,7 +232,9 @@ case class CarbonAlterTableSplitPartitionCommand(
     } finally {
       executor.shutdown()
       try {
-        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+            false)
       } catch {
         case e: Exception =>
           LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
new file mode 100644
index 0000000..cb53d6e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.annotation.DeveloperApi;
+
+@DeveloperApi
+public class TableProcessingOperations {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+
+  /**
+   *
+   * @param carbonTable
+   * @param isCompactionFlow
+   * @throws IOException
+   */
+  public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
+      final boolean isCompactionFlow) throws IOException {
+    String metaDataLocation = carbonTable.getMetaDataFilepath();
+    final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
+
+    //delete folder which metadata no exist in tablestatus
+    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+      final String partitionCount = i + "";
+      String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+      FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
+      if (FileFactory.isFileExist(partitionPath, fileType)) {
+        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+          @Override public boolean accept(CarbonFile path) {
+            String segmentId =
+                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+            boolean found = false;
+            for (int j = 0; j < details.length; j++) {
+              if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
+                  .equals(partitionCount)) {
+                found = true;
+                break;
+              }
+            }
+            return !found;
+          }
+        });
+        for (int k = 0; k < listFiles.length; k++) {
+          String segmentId =
+              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+          if (isCompactionFlow) {
+            if (segmentId.contains(".")) {
+              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+            }
+          } else {
+            if (!segmentId.contains(".")) {
+              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * This method will delete the local data load folder location after data load is complete
+   *
+   * @param loadModel
+   * @param isCompactionFlow COMPACTION keyword will be added to path to make path unique if true
+   * @param isAltPartitionFlow Alter_Partition keyword will be added to path to make path unique if
+   *                           true
+   */
+  public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
+      boolean isCompactionFlow, boolean isAltPartitionFlow) {
+    String tableName = loadModel.getTableName();
+    String databaseName = loadModel.getDatabaseName();
+    String tempLocationKey = CarbonDataProcessorUtil
+        .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
+            loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow);
+    deleteLocalDataLoadFolderLocation(tempLocationKey, tableName);
+  }
+
+  /**
+   *
+   * This method will delete the local data load folder location after data load is complete
+   *
+   * @param tempLocationKey temporary location set in carbon properties
+   * @param tableName
+   */
+  public static void deleteLocalDataLoadFolderLocation(String tempLocationKey, String tableName) {
+
+    // form local store location
+    final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey);
+    if (localStoreLocations == null) {
+      throw new RuntimeException("Store location not set for the key " + tempLocationKey);
+    }
+    // submit local folder clean up in another thread so that main thread execution is not blocked
+    ExecutorService localFolderDeletionService = Executors
+        .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
+    try {
+      localFolderDeletionService.submit(new Callable<Void>() {
+        @Override public Void call() throws Exception {
+          long startTime = System.currentTimeMillis();
+          String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator);
+          for (String loc : locArray) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(new File(loc));
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error(e, "Failed to delete local data load folder location: " + loc);
+            }
+          }
+          LOGGER.info(
+              "Deleted the local store location: " + localStoreLocations + " : Time taken: " + (
+                  System.currentTimeMillis() - startTime));
+          return null;
+        }
+      });
+    } finally {
+      if (null != localFolderDeletionService) {
+        localFolderDeletionService.shutdown();
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 37b585d..a8ae513 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -305,7 +305,7 @@ public class CarbonFactDataHandlerModel {
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .checkAndCreateCarbonStoreLocation(loadModel.getTablePath(), loadModel.getDatabaseName(),
+        .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
             tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 4275603..9e6a73e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.processing.util;
 
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.net.InetAddress;
@@ -26,9 +25,6 @@ import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
 import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -56,8 +52,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -66,7 +60,6 @@ import org.apache.carbondata.processing.merger.NodeBlockRelation;
 import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 
 import com.google.gson.Gson;
-import org.apache.commons.lang3.StringUtils;
 
 public final class CarbonLoaderUtil {
 
@@ -129,52 +122,6 @@ public final class CarbonLoaderUtil {
     }
     return true;
   }
-  public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
-      final boolean isCompactionFlow) throws IOException {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    String metaDataLocation = carbonTable.getMetaDataFilepath();
-    final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
-
-    //delete folder which metadata no exist in tablestatus
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      final String partitionCount = i + "";
-      String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
-      FileType fileType = FileFactory.getFileType(partitionPath);
-      if (FileFactory.isFileExist(partitionPath, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
-        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-          @Override public boolean accept(CarbonFile path) {
-            String segmentId =
-                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
-            boolean found = false;
-            for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
-                  .equals(partitionCount)) {
-                found = true;
-                break;
-              }
-            }
-            return !found;
-          }
-        });
-        for (int k = 0; k < listFiles.length; k++) {
-          String segmentId =
-              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
-          if (isCompactionFlow) {
-            if (segmentId.contains(".")) {
-              deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          } else {
-            if (!segmentId.contains(".")) {
-              deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          }
-        }
-      }
-    }
-  }
 
   public static void deleteStorePath(String path) {
     try {
@@ -188,53 +135,6 @@ public final class CarbonLoaderUtil {
     }
   }
 
-
-  /**
-   * This method will delete the local data load folder location after data load is complete
-   *
-   * @param loadModel
-   */
-  public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
-      boolean isCompactionFlow, boolean isAltPartitionFlow) {
-    String databaseName = loadModel.getDatabaseName();
-    String tableName = loadModel.getTableName();
-    String tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
-            loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow);
-    // form local store location
-    final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey);
-    if (localStoreLocations == null) {
-      throw new RuntimeException("Store location not set for the key " + tempLocationKey);
-    }
-    // submit local folder clean up in another thread so that main thread execution is not blocked
-    ExecutorService localFolderDeletionService = Executors
-        .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
-    try {
-      localFolderDeletionService.submit(new Callable<Void>() {
-        @Override public Void call() throws Exception {
-          long startTime = System.currentTimeMillis();
-          String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator);
-          for (String loc : locArray) {
-            try {
-              CarbonUtil.deleteFoldersAndFiles(new File(loc));
-            } catch (IOException | InterruptedException e) {
-              LOGGER.error(e,
-                  "Failed to delete local data load folder location: " + loc);
-            }
-          }
-          LOGGER.info("Deleted the local store location: " + localStoreLocations
-                + " : Time taken: " + (System.currentTimeMillis() - startTime));
-          return null;
-        }
-      });
-    } finally {
-      if (null != localFolderDeletionService) {
-        localFolderDeletionService.shutdown();
-      }
-    }
-
-  }
-
   /**
    * This API will write the load level metadata for the loadmanagement module inorder to
    * manage the load and query execution management smoothly.