You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/26 12:19:07 UTC

[7/9] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 984efdb..f4450e3 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
@@ -714,15 +715,15 @@ public class CarbonUtilTest {
   }
 
   @Test public void testToGetSegmentString() {
-    List<String> list = new ArrayList<>();
-    list.add("1");
-    list.add("2");
+    List<Segment> list = new ArrayList<>();
+    list.add(new Segment("1", null));
+    list.add(new Segment("2", null));
     String segments = CarbonUtil.convertToString(list);
     assertEquals(segments, "1,2");
   }
 
   @Test public void testToGetSegmentStringWithEmptySegmentList() {
-    List<String> list = new ArrayList<>();
+    List<Segment> list = new ArrayList<>();
     String segments = CarbonUtil.convertToString(list);
     assertEquals(segments, "");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 5ab6605..a4c6e4a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -260,8 +261,12 @@ public class CarbonInputSplit extends FileSplit
     return invalidSegments;
   }
 
-  public void setInvalidSegments(List<String> invalidSegments) {
-    this.invalidSegments = invalidSegments;
+  public void setInvalidSegments(List<Segment> invalidSegments) {
+    List<String> invalidSegmentIds = new ArrayList<>();
+    for (Segment segment: invalidSegments) {
+      invalidSegmentIds.add(segment.getSegmentNo());
+    }
+    this.invalidSegments = invalidSegmentIds;
   }
 
   public void setInvalidTimestampRange(List<UpdateVO> invalidTimestamps) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index ce97169..7ea11bd 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -18,11 +18,22 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.PartitionMapFileStore;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -52,6 +63,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonOutputCommitter.class.getName());
 
+  private ICarbonLock segmentLock;
+
   public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
     super(outputPath, context);
   }
@@ -66,9 +79,16 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     super.setupJob(context);
     boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
     CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
-    CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
-    CarbonLoaderUtil.checkAndCreateCarbonDataLocation(loadModel.getSegmentId(),
-        loadModel.getCarbonDataLoadSchema().getCarbonTable());
+    if (loadModel.getSegmentId() == null) {
+      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
+    }
+    // Take segment lock
+    segmentLock = CarbonLockFactory.getCarbonLockObj(
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+        CarbonTablePath.addSegmentPrefix(loadModel.getSegmentId()) + LockUsage.LOCK);
+    if (!segmentLock.lockWithRetries()) {
+      throw new RuntimeException("Already segment is locked for loading, not supposed happen");
+    }
     CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel);
   }
 
@@ -93,42 +113,59 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
     CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
     LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail();
-    String segmentPath =
-        CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId());
+    String readPath = CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())
+        + CarbonCommonConstants.FILE_SEPARATOR
+        + loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp";
     // Merge all partition files into a single file.
-    new PartitionMapFileStore().mergePartitionMapFiles(segmentPath,
-        loadModel.getFactTimeStamp() + "");
-    CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS,
-        loadModel.getFactTimeStamp(), true);
+    String segmentFileName = loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp();
+    SegmentFileStore.SegmentFile segmentFile = SegmentFileStore
+        .mergeSegmentFiles(readPath, segmentFileName,
+            CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath()));
+    if (segmentFile != null) {
+      // Move all files from temp directory of each segment to partition directory
+      SegmentFileStore.moveFromTempFolder(segmentFile,
+          loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp",
+          loadModel.getTablePath());
+      newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
+    }
+    CarbonLoaderUtil
+        .populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(),
+            true);
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     long segmentSize = CarbonLoaderUtil
         .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
     if (segmentSize > 0 || overwriteSet) {
       Object operationContext = getOperationContext();
       if (operationContext != null) {
+        ((OperationContext) operationContext)
+            .setProperty("current.segmentfile", newMetaEntry.getSegmentFile());
         LoadEvents.LoadTablePreStatusUpdateEvent event =
             new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(),
                 loadModel);
-        LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
-            new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);
         try {
           OperationListenerBus.getInstance().fireEvent(event, (OperationContext) operationContext);
-          OperationListenerBus.getInstance().fireEvent(postStatusUpdateEvent,
-              (OperationContext) operationContext);
         } catch (Exception e) {
           throw new IOException(e);
         }
       }
       String uniqueId = null;
       if (overwriteSet) {
-        uniqueId = overwritePartitions(loadModel);
+        if (segmentSize == 0) {
+          newMetaEntry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+        }
+        uniqueId = overwritePartitions(loadModel, newMetaEntry);
+      } else {
+        CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false);
       }
-      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false);
       if (operationContext != null) {
+        LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
+            new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);
         LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent =
-            new LoadEvents.LoadTableMergePartitionEvent(segmentPath);
+            new LoadEvents.LoadTableMergePartitionEvent(readPath);
         try {
           OperationListenerBus.getInstance()
+              .fireEvent(postStatusUpdateEvent, (OperationContext) operationContext);
+          OperationListenerBus.getInstance()
               .fireEvent(loadTableMergePartitionEvent, (OperationContext) operationContext);
         } catch (Exception e) {
           throw new IOException(e);
@@ -138,29 +175,23 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
           context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
       String segmentsToBeDeleted =
           context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
-      List<String> segmentDeleteList = Arrays.asList(segmentsToBeDeleted.split(","));
-      Set<String> segmentSet = new HashSet<>(
+      List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","));
+      Set<Segment> segmentSet = new HashSet<>(
           new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
               .getValidAndInvalidSegments().getValidSegments());
       if (updateTime != null) {
-        CarbonUpdateUtil.updateTableMetadataStatus(
-            segmentSet,
-            carbonTable,
-            updateTime,
-            true,
+        CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
             segmentDeleteList);
       } else if (uniqueId != null) {
-        // Update the loadstatus with update time to clear cache from driver.
-        CarbonUpdateUtil.updateTableMetadataStatus(
-            segmentSet,
-            carbonTable,
-            uniqueId,
-            true,
-            new ArrayList<String>());
+        CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
+            segmentDeleteList);
       }
     } else {
       CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
     }
+    if (segmentLock != null) {
+      segmentLock.unlock();
+    }
   }
 
   /**
@@ -171,41 +202,31 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
    * @return
    * @throws IOException
    */
-  private String overwritePartitions(CarbonLoadModel loadModel) throws IOException {
+  private String overwritePartitions(CarbonLoadModel loadModel, LoadMetadataDetails newMetaEntry)
+      throws IOException {
     CarbonTable table = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    String currentSegmentPath =
-        CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId());
-    PartitionMapFileStore partitionMapFileStore = new PartitionMapFileStore();
-    partitionMapFileStore.readAllPartitionsOfSegment(currentSegmentPath);
-    List<List<String>> partitionsToDrop =
-        new ArrayList<List<String>>(partitionMapFileStore.getPartitionMap().values());
-    if (partitionsToDrop.size() > 0) {
-      List<String> validSegments =
+    SegmentFileStore fileStore = new SegmentFileStore(loadModel.getTablePath(),
+        loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp()
+            + CarbonTablePath.SEGMENT_EXT);
+    List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs();
+
+    if (partitionSpecs != null && partitionSpecs.size() > 0) {
+      List<Segment> validSegments =
           new SegmentStatusManager(table.getAbsoluteTableIdentifier()).getValidAndInvalidSegments()
               .getValidSegments();
       String uniqueId = String.valueOf(System.currentTimeMillis());
-      try {
-        // First drop the partitions from partition mapper files of each segment
-        for (String segment : validSegments) {
-          new PartitionMapFileStore()
-              .dropPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment),
-                  new ArrayList<List<String>>(partitionsToDrop), uniqueId, false);
+      List<String> tobeUpdatedSegs = new ArrayList<>();
+      List<String> tobeDeletedSegs = new ArrayList<>();
+      // First drop the partitions from partition mapper files of each segment
+      for (Segment segment : validSegments) {
+        new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName())
+            .dropPartitions(segment, partitionSpecs, uniqueId, tobeDeletedSegs, tobeUpdatedSegs);
 
-        }
-      } catch (Exception e) {
-        // roll back the drop partitions from carbon store
-        for (String segment : validSegments) {
-          new PartitionMapFileStore()
-              .commitPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment),
-                  uniqueId, false, table.getTablePath(), partitionsToDrop.get(0));
-        }
       }
+      newMetaEntry.setUpdateStatusFileName(uniqueId);
       // Commit the removed partitions in carbon store.
-      for (String segment : validSegments) {
-        new PartitionMapFileStore()
-            .commitPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment),
-                uniqueId, true, table.getTablePath(), partitionsToDrop.get(0));
-      }
+      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, "",
+          Segment.toSegmentList(tobeDeletedSegs), Segment.toSegmentList(tobeUpdatedSegs));
       return uniqueId;
     }
     return null;
@@ -221,17 +242,57 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
   }
 
   /**
-   * Update the tablestatus as fail if any fail happens.
+   * Update the tablestatus as fail if any fail happens.And also clean up the temp folders if any
+   * are existed.
    *
    * @param context
    * @param state
    * @throws IOException
    */
   @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException {
-    super.abortJob(context, state);
-    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
-    CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
-    LOGGER.error("Loading failed with job status : " + state);
+    try {
+      super.abortJob(context, state);
+      CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
+      CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
+      String segmentFileName = loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp();
+      LoadMetadataDetails metadataDetail = loadModel.getCurrentLoadMetadataDetail();
+      if (metadataDetail != null) {
+        // In case the segment file is already created for this job then just link it so that it
+        // will be used while cleaning.
+        if (!metadataDetail.getSegmentStatus().equals(SegmentStatus.SUCCESS)) {
+          String readPath = CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())
+              + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
+              + CarbonTablePath.SEGMENT_EXT;
+          if (FileFactory.getCarbonFile(readPath).exists()) {
+            metadataDetail.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
+          }
+        }
+      }
+      // Clean the temp files
+      CarbonFile segTmpFolder = FileFactory.getCarbonFile(
+          CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())
+              + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName + ".tmp");
+      // delete temp segment folder
+      if (segTmpFolder.exists()) {
+        FileFactory.deleteAllCarbonFilesOfDir(segTmpFolder);
+      }
+      CarbonFile segmentFilePath = FileFactory.getCarbonFile(
+          CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())
+              + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
+              + CarbonTablePath.SEGMENT_EXT);
+      // Delete the temp data folders of this job if exists
+      if (segmentFilePath.exists()) {
+        SegmentFileStore fileStore = new SegmentFileStore(loadModel.getTablePath(),
+            segmentFileName + CarbonTablePath.SEGMENT_EXT);
+        SegmentFileStore.removeTempFolder(fileStore.getLocationMap(), segmentFileName + ".tmp",
+            loadModel.getTablePath());
+      }
+      LOGGER.error("Loading failed with job status : " + state);
+    } finally {
+      if (segmentLock != null) {
+        segmentLock.unlock();
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index a1887f0..96b0b21 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -30,15 +30,16 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -256,7 +257,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * Set list of segments to access
    */
-  public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
+  public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
     configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
   }
 
@@ -270,7 +271,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
     if (!segmentNumbersFromProperty.trim().equals("*")) {
       CarbonTableInputFormat
-          .setSegmentsToAccess(conf, Arrays.asList(segmentNumbersFromProperty.split(",")));
+          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
     }
   }
 
@@ -292,25 +293,28 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * set list of partitions to prune
    */
-  public static void setPartitionsToPrune(Configuration configuration, List<String> partitions) {
+  public static void setPartitionsToPrune(Configuration configuration,
+      List<PartitionSpec> partitions) {
     if (partitions == null) {
       return;
     }
     try {
-      String partitionString = ObjectSerializationUtil.convertObjectToString(partitions);
+      String partitionString =
+          ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
       configuration.set(PARTITIONS_TO_PRUNE, partitionString);
     } catch (Exception e) {
-      throw new RuntimeException("Error while setting patition information to Job", e);
+      throw new RuntimeException("Error while setting patition information to Job" + partitions, e);
     }
   }
 
   /**
    * get list of partitions to prune
    */
-  public static List<String> getPartitionsToPrune(Configuration configuration) throws IOException {
+  public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
+      throws IOException {
     String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
     if (partitionString != null) {
-      return (List<String>) ObjectSerializationUtil.convertStringToObject(partitionString);
+      return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
     }
     return null;
   }
@@ -345,22 +349,24 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     TableDataMap blockletMap =
         DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
             BlockletDataMapFactory.class.getName());
-    List<String> invalidSegments = new ArrayList<>();
+    List<Segment> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
-    List<String> streamSegments = null;
+    List<Segment> streamSegments = null;
+    // get all valid segments and set them into the configuration
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+        segmentStatusManager.getValidAndInvalidSegments();
 
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
-      // get all valid segments and set them into the configuration
-      SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
-      SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
-          segmentStatusManager.getValidAndInvalidSegments();
-      List<String> validSegments = segments.getValidSegments();
+      List<Segment> validSegments = segments.getValidSegments();
       streamSegments = segments.getStreamSegments();
       if (validSegments.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
 
-      List<String> filteredSegmentToAccess = getFilteredSegment(job, validSegments);
+
+
+      List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments());
       if (filteredSegmentToAccess.size() == 0) {
         return new ArrayList<>(0);
       } else {
@@ -368,31 +374,36 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       }
       // remove entry in the segment index if there are invalid segments
       invalidSegments.addAll(segments.getInvalidSegments());
-      for (String invalidSegmentId : invalidSegments) {
-        invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+      for (Segment invalidSegmentId : invalidSegments) {
+        invalidTimestampsList
+            .add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId.getSegmentNo()));
       }
       if (invalidSegments.size() > 0) {
         blockletMap.clear(invalidSegments);
       }
     }
-
+    ArrayList<Segment> validAndInProgressSegments = new ArrayList<>(segments.getValidSegments());
+    // Add in progress segments also to filter it as in case of aggregate table load it loads
+    // data from in progress table.
+    validAndInProgressSegments.addAll(segments.getListOfInProgressSegments());
     // get updated filtered list
-    List<String> filteredSegmentToAccess = Arrays.asList(getSegmentsToAccess(job));
+    List<Segment> filteredSegmentToAccess =
+        getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments));
     // Clean the updated segments from memory if the update happens on segments
-    List<String> toBeCleanedSegments = new ArrayList<>();
+    List<Segment> toBeCleanedSegments = new ArrayList<>();
     for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
         .getUpdateStatusDetails()) {
       boolean refreshNeeded =
           DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
               .isRefreshNeeded(segmentUpdateDetail.getSegmentName(), updateStatusManager);
       if (refreshNeeded) {
-        toBeCleanedSegments.add(segmentUpdateDetail.getSegmentName());
+        toBeCleanedSegments.add(new Segment(segmentUpdateDetail.getSegmentName(), null));
       }
     }
     // Clean segments if refresh is needed
-    for (String segment : filteredSegmentToAccess) {
+    for (Segment segment : filteredSegmentToAccess) {
       if (DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
-          .isRefreshNeeded(segment)) {
+          .isRefreshNeeded(segment.getSegmentNo())) {
         toBeCleanedSegments.add(segment);
       }
     }
@@ -446,20 +457,28 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    * Return segment list after filtering out valid segments and segments set by user by
    * `INPUT_SEGMENT_NUMBERS` in job configuration
    */
-  private List<String> getFilteredSegment(JobContext job, List<String> validSegments) {
-    String[] segmentsToAccess = getSegmentsToAccess(job);
-    Set<String> segmentToAccessSet = new HashSet<>(Arrays.asList(segmentsToAccess));
-    List<String> filteredSegmentToAccess = new ArrayList<>();
-    if (segmentsToAccess.length == 0 || segmentsToAccess[0].equalsIgnoreCase("*")) {
+  private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments) {
+    Segment[] segmentsToAccess = getSegmentsToAccess(job);
+    List<Segment> segmentToAccessSet =
+        new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess)));
+    List<Segment> filteredSegmentToAccess = new ArrayList<>();
+    if (segmentsToAccess.length == 0 || segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) {
       filteredSegmentToAccess.addAll(validSegments);
     } else {
-      for (String validSegment : validSegments) {
-        if (segmentToAccessSet.contains(validSegment)) {
-          filteredSegmentToAccess.add(validSegment);
+      for (Segment validSegment : validSegments) {
+        int index = segmentToAccessSet.indexOf(validSegment);
+        if (index > -1) {
+          // In case of in progress reading segment, segment file name is set to the property itself
+          if (segmentToAccessSet.get(index).getSegmentFileName() != null
+              && validSegment.getSegmentFileName() == null) {
+            filteredSegmentToAccess.add(segmentToAccessSet.get(index));
+          } else {
+            filteredSegmentToAccess.add(validSegment);
+          }
         }
       }
       if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) {
-        List<String> filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess);
+        List<Segment> filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess);
         filteredSegmentToAccessTemp.removeAll(segmentToAccessSet);
         LOG.info(
             "Segments ignored are : " + Arrays.toString(filteredSegmentToAccessTemp.toArray()));
@@ -472,15 +491,15 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    * use file list in .carbonindex file to get the split of streaming.
    */
   public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
-      List<String> streamSegments) throws IOException {
+      List<Segment> streamSegments) throws IOException {
     List<InputSplit> splits = new ArrayList<InputSplit>();
     if (streamSegments != null && !streamSegments.isEmpty()) {
 
       CarbonTablePath tablePath = CarbonStorePath.getCarbonTablePath(identifier);
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
-      for (String segmentId : streamSegments) {
-        String segmentDir = tablePath.getSegmentDir("0", segmentId);
+      for (Segment segment : streamSegments) {
+        String segmentDir = tablePath.getSegmentDir("0", segment.getSegmentNo());
         FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
         if (FileFactory.isFileExist(segmentDir, fileType)) {
           String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
@@ -508,20 +527,20 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
                   long bytesRemaining = length;
                   while (((double) bytesRemaining) / splitSize > 1.1) {
                     int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-                    splits.add(makeSplit(segmentId, path, length - bytesRemaining, splitSize,
-                        blkLocations[blkIndex].getHosts(),
+                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
+                        splitSize, blkLocations[blkIndex].getHosts(),
                         blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
                     bytesRemaining -= splitSize;
                   }
                   if (bytesRemaining != 0) {
                     int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-                    splits.add(makeSplit(segmentId, path, length - bytesRemaining, bytesRemaining,
-                        blkLocations[blkIndex].getHosts(),
+                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
+                        bytesRemaining, blkLocations[blkIndex].getHosts(),
                         blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
                   }
                 } else {
                   //Create empty hosts array for zero length files
-                  splits.add(makeSplit(segmentId, path, 0, length, new String[0],
+                  splits.add(makeSplit(segment.getSegmentNo(), path, 0, length, new String[0],
                       FileFormat.ROW_V1));
                 }
               }
@@ -558,11 +577,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       List<Integer> oldPartitionIdList, PartitionInfo partitionInfo)
       throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-    List<String> invalidSegments = new ArrayList<>();
+    List<Segment> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
 
-    List<String> segmentList = new ArrayList<>();
-    segmentList.add(targetSegment);
+    List<Segment> segmentList = new ArrayList<>();
+    segmentList.add(new Segment(targetSegment, null));
     setSegmentsToAccess(job.getConfiguration(), segmentList);
     try {
 
@@ -647,7 +666,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    * @throws IOException
    */
   private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
-      List<String> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
+      List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
       List<Integer> oldPartitionIdList) throws IOException {
 
     List<InputSplit> result = new LinkedList<InputSplit>();
@@ -683,8 +702,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         }
         // When iud is done then only get delete delta files for a block
         try {
-          deleteDeltaFilePath =
-              updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
+          deleteDeltaFilePath = updateStatusManager
+              .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId());
         } catch (Exception e) {
           throw new IOException(e);
         }
@@ -713,7 +732,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    */
   private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
       AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
-      BitSet matchedPartitions, List<String> segmentIds, PartitionInfo partitionInfo,
+      BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
       List<Integer> oldPartitionIdList) throws IOException {
 
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
@@ -727,7 +746,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME,
             BlockletDataMapFactory.class.getName());
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
-    List<String> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
+    List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
     if (dataMapJob != null) {
       DistributableDataMapFormat datamapDstr =
@@ -891,12 +910,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * return valid segment to access
    */
-  public String[] getSegmentsToAccess(JobContext job) {
+  private Segment[] getSegmentsToAccess(JobContext job) {
     String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
     if (segmentString.trim().isEmpty()) {
-      return new String[0];
+      return new Segment[0];
     }
-    return segmentString.split(",");
+    List<Segment> segments = Segment.toSegmentList(segmentString.split(","));
+    return segments.toArray(new Segment[segments.size()]);
   }
 
   /**
@@ -907,7 +927,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    * @throws IOException
    */
   public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier,
-      List<String> partitions) throws IOException {
+      List<PartitionSpec> partitions) throws IOException {
     TableDataMap blockletMap = DataMapStoreManager.getInstance()
         .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
@@ -917,7 +937,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
 
     // TODO: currently only batch segment is supported, add support for streaming table
-    List<String> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments());
+    List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments());
 
     List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, partitions);
     for (ExtendedBlocklet blocklet : blocklets) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 440720e..f93f849 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -234,6 +234,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     loadModel.setTaskNo(taskAttemptContext.getConfiguration().get(
         "carbon.outputformat.taskno",
         String.valueOf(System.nanoTime())));
+    loadModel.setDataWritePath(
+        taskAttemptContext.getConfiguration().get("carbon.outputformat.writepath"));
     final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
     final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper();
     final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 96eec6f..7c1808f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -24,8 +24,10 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
@@ -49,14 +51,14 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private String dataMapName;
 
-  private List<String> validSegments;
+  private List<Segment> validSegments;
 
   private String className;
 
-  private List<String> partitions;
+  private List<PartitionSpec> partitions;
 
-  public DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
-      String dataMapName, List<String> validSegments, List<String> partitions, String className) {
+  public DistributableDataMapFormat(AbsoluteTableIdentifier identifier, String dataMapName,
+      List<Segment> validSegments, List<PartitionSpec> partitions, String className) {
     this.identifier = identifier;
     this.dataMapName = dataMapName;
     this.validSegments = validSegments;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 86f974c..31a08fc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -87,12 +87,12 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
       absoluteTableIdentifier)
 
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.map(_.getSegmentNo).toList
     assert(segments.contains("0.1"))
     assert(segments.contains("2.1"))
     assert(!segments.contains("2"))
     assert(!segments.contains("3"))
-    val cacheClient = new CacheClient();
+    val cacheClient = new CacheClient()
     val segmentIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "2")
     val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentAccessClient.
       getIfPresent(segmentIdentifier)
@@ -181,7 +181,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
       absoluteTableIdentifier)
 
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.map(_.getSegmentNo).toList
     assert(!segments.contains("0.1"))
     assert(segments.contains("0.2"))
     assert(!segments.contains("2"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index bb1f829..329161f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -123,7 +123,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
       absoluteTableIdentifier)
 
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.map(_.getSegmentNo).toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0.2"))
     assert(!segments.contains("0"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 74216ac..c3f5d0a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.schema.FilterType
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -42,15 +42,15 @@ class C2DataMapFactory() extends DataMapFactory {
 
   override def fireEvent(event: Event): Unit = ???
 
-  override def clear(segmentId: String): Unit = {}
+  override def clear(segmentId: Segment): Unit = {}
 
   override def clear(): Unit = {}
 
   override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ???
 
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+  override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
 
-  override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+  override def createWriter(segmentId: Segment): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
 
   override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO)
 
@@ -59,7 +59,7 @@ class C2DataMapFactory() extends DataMapFactory {
    *
    * @return
    */
-  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = {
+  override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 7067ef8..f2cdd67 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.schema.FilterType
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -270,15 +270,15 @@ class WaitingDataMap() extends DataMapFactory {
 
   override def fireEvent(event: Event): Unit = ???
 
-  override def clear(segmentId: String): Unit = {}
+  override def clear(segmentId: Segment): Unit = {}
 
   override def clear(): Unit = {}
 
   override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ???
 
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+  override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
 
-  override def createWriter(segmentId: String): DataMapWriter = {
+  override def createWriter(segmentId: Segment): DataMapWriter = {
     new DataMapWriter {
       override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
 
@@ -300,5 +300,5 @@ class WaitingDataMap() extends DataMapFactory {
 
   override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO)
 
-  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = ???
+  override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ???
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index 2e2c1f0..7b93766 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -204,23 +204,6 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter
     )
   }
 
-  test("test load ddl command") {
-    sql(
-      """CREATE TABLE IF NOT EXISTS dataloadOptionTests(ID BigInt, date Timestamp, country
-           String,
-          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
-      """)
-    val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
-    try {
-      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE dataloadOptionTests OPTIONS"
-          + "('bad_records_action'='FORCA', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
-    } catch {
-      case ex: Exception =>
-        assert("option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT"
-          .equals(ex.getMessage))
-    }
-  }
-
   def drop(): Unit = {
     sql("drop table IF EXISTS sales")
     sql("drop table IF EXISTS serializable_values")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 0dbf1e4..7d0959c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -21,14 +21,12 @@ import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll {
   var executorService: ExecutorService = _
@@ -48,20 +46,6 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
   }
 
-  def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".partitionmap")
-      }
-    })
-    assert(dataFiles.length == partitions)
-  }
-
   test("data loading for global sort partition table for one partition column") {
     sql(
       """
@@ -74,8 +58,6 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='1')""")
 
-    validateDataFiles("default_partitionone", "0", 1)
-
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone order by empno"),
       sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
 
@@ -93,8 +75,6 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
-    validateDataFiles("default_partitiontwo", "0", 1)
-
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwo order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
 
@@ -112,7 +92,6 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
       """.stripMargin)
     sql(s"""insert into staticpartitionone PARTITION(empno='1') select empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary from originTable""")
 
-    validateDataFiles("default_staticpartitionone", "0", 1)
   }
 
   test("single pass loading for global sort partition table for one partition column") {
@@ -127,7 +106,6 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE singlepasspartitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='true')""")
 
-    validateDataFiles("default_singlepasspartitionone", "0", 1)
   }
 
   test("data loading for global sort partition table for one static partition column with load syntax") {
@@ -193,23 +171,27 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
         | CREATE TABLE partitionmultiplethreeconcurrent (empno int, doj Timestamp,
         |  workgroupcategoryname String, deptno int, deptname String,
         |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
-        |  utilization int,salary int)
-        | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+        |  utilization int,salary int,workgroupcategory int,designation String)
+        | PARTITIONED BY (empname String)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname', 'SORT_SCOPE'='GLOBAL_SORT')
+        | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
       """.stripMargin)
 
     val tasks = new util.ArrayList[Callable[String]]()
-    tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
-    tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
-    tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
+    var i = 0
+    val count = 5
+    while (i < count) {
+      tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE  partitionmultiplethreeconcurrent partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
+      i = i + 1
+    }
     val results = executorService.invokeAll(tasks)
     for (i <- 0 until tasks.size()) {
       val res = results.get(i).get
       assert("PASS".equals(res))
     }
     executorService.shutdown()
-    checkAnswer(sql("select count(*) from partitionmultiplethreeconcurrent"), Seq(Row(30)))
+    checkAnswer(sql("select count(*) from partitionmultiplethreeconcurrent"), Seq(Row(10 * count)))
+    assert(sql("show segments for table partitionmultiplethreeconcurrent").count() == count)
   }
 
   class QueryTask(query: String) extends Callable[String] {
@@ -306,10 +288,9 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""")
     sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""")
     sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""")
-    sql(s"""insert overwrite table staticpartitiondateinsert PARTITION(projectenddate='2016-06-29',doj='2010-12-29 00:00:00') select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary from originTable where projectenddate=cast('2016-06-29' as Date)""")
-    //    sql(s"""insert overwrite table partitiondateinsert  select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""")
+    sql(s"""insert overwrite table staticpartitiondateinsert PARTITION(projectenddate='2016-06-29',doj='2010-12-29 00:00:00') select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary from originTable where projectenddate=cast('2016-06-29' as timestamp)""")
     checkAnswer(sql("select * from staticpartitiondateinsert where projectenddate=cast('2016-06-29' as Date)"),
-      sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)"))
+      sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,cast(projectenddate as date),doj from originTable where projectenddate=cast('2016-06-29' as timestamp)"))
   }
 
 
@@ -436,10 +417,10 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
   test("test overwrite with timestamp partition column") {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-    sql("DROP TABLE IF EXISTS origintable")
+    sql("DROP TABLE IF EXISTS origintablenew")
     sql(
       """
-        | CREATE TABLE origintable
+        | CREATE TABLE origintablenew
         | (id Int,
         | vin String,
         | logdate Timestamp,
@@ -452,7 +433,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
 
     sql(
       s"""
-       LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table origintable
+       LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table origintablenew
        """)
 
     sql("DROP TABLE IF EXISTS partitiontable0")
@@ -504,9 +485,9 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
       """.stripMargin))
 
     sql("insert into table partitiontable0 partition(logdate='2018-02-15 00:00:00') " +
-              "select id,vin,phonenumber,country,area,salary from origintable")
+              "select id,vin,phonenumber,country,area,salary from origintablenew")
     sql("insert into table partitiontable0_hive partition(logdate='2018-02-15 00:00:00') " +
-        "select id,vin,phonenumber,country,area,salary from origintable")
+        "select id,vin,phonenumber,country,area,salary from origintablenew")
     checkAnswer(sql(
       s"""
          | SELECT logdate,id,vin,phonenumber,country,area,salary
@@ -530,10 +511,10 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
   test("test overwrite with date partition column") {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
-    sql("DROP TABLE IF EXISTS origintable")
+    sql("DROP TABLE IF EXISTS origintablenew")
     sql(
       """
-        | CREATE TABLE origintable
+        | CREATE TABLE origintablenew
         | (id Int,
         | vin String,
         | logdate date,
@@ -546,7 +527,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
 
     sql(
       s"""
-       LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table origintable
+       LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table origintablenew
        """)
 
     sql("DROP TABLE IF EXISTS partitiontable0")
@@ -598,9 +579,9 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
       """.stripMargin))
 
     sql("insert into table partitiontable0 partition(logdate='2018-02-15') " +
-        "select id,vin,phonenumber,country,area,salary from origintable")
+        "select id,vin,phonenumber,country,area,salary from origintablenew")
     sql("insert into table partitiontable0_hive partition(logdate='2018-02-15') " +
-        "select id,vin,phonenumber,country,area,salary from origintable")
+        "select id,vin,phonenumber,country,area,salary from origintablenew")
     checkAnswer(sql(
       s"""
          | SELECT logdate,id,vin,phonenumber,country,area,salary
@@ -621,7 +602,319 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
   }
 
+  test("partition with date column issue") {
+    try {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FAIL.name())
+      sql("drop table if exists partdatecarb")
+      sql(
+        "create table partdatecarb(name string, age Int) partitioned by(dob date) stored by 'carbondata'")
+
+      sql("insert into partdatecarb partition(dob='2016-06-28') select 'name1',121")
+      checkAnswer(sql("select name,age,cast(dob as string) from partdatecarb"),
+        Seq(Row("name1", 121, "2016-06-28")))
+    } finally {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
+    }
+  }
+
+  test("partition with time column issue") {
+    try {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FAIL.name())
+      sql("drop table if exists partdatecarb1")
+      sql(
+        "create table partdatecarb1(name string, age Int) partitioned by(dob timestamp) stored by 'carbondata'")
+
+      sql("insert into partdatecarb1 partition(dob='2016-06-28 00:00:00') select 'name1',121")
+      checkAnswer(sql("select name,age,cast(dob as string) from partdatecarb1"),
+        Seq(Row("name1", 121, "2016-06-28 00:00:00")))
+    } finally {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
+    }
+  }
+
+  test("partition with int issue and dictionary exclude") {
+    try {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FAIL.name())
+      sql("drop table if exists partdatecarb2")
+      sql(
+        "create table partdatecarb2(name string, dob string) partitioned by(age Int) stored by 'carbondata' TBLPROPERTIES('DICTIONARY_EXCLUDE'='age')")
+
+      sql("insert into partdatecarb2 partition(age='12') select 'name1','2016-06-28'")
+      checkAnswer(sql("select name,age,cast(dob as string) from partdatecarb2"),
+        Seq(Row("name1", 12, "2016-06-28")))
+    } finally {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
+    }
+  }
 
+  test("partition with int issue and dictionary include") {
+    sql("drop table if exists partdatecarb3")
+    intercept[Exception] {
+      sql(
+        "create table partdatecarb3(name string, dob string) partitioned by(age Int) stored by 'carbondata' TBLPROPERTIES('DICTIONARY_INCLUDE'='age')")
+    }
+  }
+
+  test("data loading for all dimensions with table for two partition column") {
+    sql("drop table if exists partitiontwoalldims")
+    sql(
+      """
+        | CREATE TABLE partitiontwoalldims (empno String, designation String,
+        |  workgroupcategory String, workgroupcategoryname String, deptno String, deptname String,
+        |  projectcode String, projectjoindate Timestamp, projectenddate Timestamp,attendance String,
+        |  utilization String,salary String)
+        | PARTITIONED BY (doj Timestamp, empname String)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwoalldims OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiontwoalldims"), Seq(Row(10)))
+  }
+
+  test("partition with different order column issue") {
+    try {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FAIL.name())
+
+      sql("drop table if exists partdatecarb4_hive")
+      sql(
+        "create table partdatecarb4_hive(name string, age Int) partitioned by(country string, state string, city string)")
+
+      sql("insert into partdatecarb4_hive partition(state,city,country='india') select 'name1',12,'KA', 'BGLR'")
+      sql("insert into partdatecarb4_hive partition(state,city,country='india') select 'name1',12,'KA', 'BGLR'")
+
+      sql("drop table if exists partdatecarb4")
+      sql(
+        "create table partdatecarb4(name string, age Int) partitioned by(country string, state string, city string) stored by 'carbondata'")
+
+      sql("insert into partdatecarb4 partition(state,city,country='india') select 'name1',12,'KA', 'BGLR'")
+      sql("insert into partdatecarb4 partition(city,state,country='india') select 'name1',12, 'BGLR','KA'")
+      sql("select * from partdatecarb4").show()
+      checkAnswer(sql("select * from partdatecarb4"), sql("select * from partdatecarb4_hive"))
+      intercept[Exception] {
+        sql(
+          "insert into partdatecarb4 partition(state,city='3',country) select 'name1',12,'cc', 'dd'")
+      }
+    } finally {
+      CarbonProperties.getInstance().addProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
+    }
+  }
+
+  test("data loading for decimal column partition table") {
+
+    sql(
+      """
+        | CREATE TABLE partitiondecimal (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondecimal OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiondecimal order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+  }
+
+  test("data loading for decimal column static partition table") {
+
+    sql(
+      """
+        | CREATE TABLE partitiondecimalstatic (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondecimalstatic partition(salary='1.0') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select count(salary) from partitiondecimalstatic"), Seq(Row(10)))
+  }
+
+  test("query after select on partition table") {
+
+    sql(
+      """
+        | CREATE TABLE partitiondatadelete (designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int,empno int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String,salary int)
+        | PARTITIONED BY (projectjoindate Timestamp)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondatadelete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(s"delete from partitiondatadelete where projectjoindate='2012-11-14 00:00:00'")
+    checkAnswer(sql(s"select count(*) from partitiondatadelete where where projectjoindate='2012-11-14 00:00:00'"), Seq(Row(0)))
+  }
+
+  test("partition colunm test without partition column in fileheader of load command") {
+    sql("DROP TABLE IF EXISTS partitiontablewithoutpartcolumninfileheader")
+
+    sql("CREATE TABLE partitiontablewithoutpartcolumninfileheader (CUST_ID int,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) partitioned by(CUST_NAME String) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='CUST_ID,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1') ")
+    sql(s"""LOAD DATA INPATH '$resourcesPath/data_with_all_types.csv' into table partitiontablewithoutpartcolumninfileheader partition(cust_name='ravi') OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME1,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')""")
+
+    checkAnswer(sql("select count(*) from partitiontablewithoutpartcolumninfileheader"), Seq(Row(10)))
+    sql("DROP TABLE IF EXISTS partitiontablewithoutpartcolumninfileheader")
+  }
+
+  test("data loading with wrong format in static partition table") {
+    sql("DROP TABLE IF EXISTS partitionwrongformat")
+    sql(
+      """
+        | CREATE TABLE partitionwrongformat (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    intercept[MalformedCarbonCommandException] {
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionwrongformat partition(projectjoindate='2016-12-01',salary='gg') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    }
+
+    intercept[MalformedCarbonCommandException] {
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionwrongformat partition(projectjoindate='2016',salary='1.0') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    }
+
+  }
+
+  test("data loading with default partition in static partition table") {
+    sql("DROP TABLE IF EXISTS partitiondefaultpartition")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultpartition (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultpartition partition(projectjoindate='__HIVE_DEFAULT_PARTITION__',salary='1.0') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(salary) from partitiondefaultpartition"), Seq(Row(10)))
+    checkExistence(sql("show partitions partitiondefaultpartition"), true, "__HIVE_DEFAULT_PARTITION__")
+  }
+
+  test("data loading with default partition in static partition table with fail badrecord") {
+    sql("DROP TABLE IF EXISTS partitiondefaultpartitionfail")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultpartitionfail (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultpartitionfail partition(projectjoindate='__HIVE_DEFAULT_PARTITION__',salary='1.0') OPTIONS('bad_records_logger_enable'='true', 'bad_records_action'='fail','DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultpartitionfail"), Seq(Row(10)))
+    checkExistence(sql("show partitions partitiondefaultpartitionfail"), true, "__HIVE_DEFAULT_PARTITION__")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultpartitionfail partition(projectjoindate='2016-12-01',salary='__HIVE_DEFAULT_PARTITION__') OPTIONS('bad_records_logger_enable'='true', 'bad_records_action'='fail','DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultpartitionfail"), Seq(Row(20)))
+  }
+
+  test("data loading with int partition issue") {
+    sql("DROP TABLE IF EXISTS intissue")
+    sql("create table intissue(a int) partitioned by (b int) stored by 'carbondata'")
+    sql("insert into intissue values(1,1)")
+    checkAnswer(sql("select * from intissue"), Seq(Row(1,1)))
+  }
+
+  test("data loading with int partition issue with global sort") {
+    sql("DROP TABLE IF EXISTS intissuesort")
+    sql("create table intissuesort(a int) partitioned by (b int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')")
+    sql("insert into intissuesort values(1,1)")
+    checkAnswer(sql("select * from intissuesort"), Seq(Row(1,1)))
+  }
+
+  test("data loading with decimal column fail issue") {
+    sql("DROP TABLE IF EXISTS partitiondecimalfailissue")
+    sql("CREATE TABLE IF NOT EXISTS partitiondecimalfailissue (ID Int, date Timestamp, country String, name String, phonetype String, serialname String) partitioned by (salary Decimal(17,2)) STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalDataWithHeader.csv' into table partitiondecimalfailissue")
+    sql(s"select * from partitiondecimalfailissue").show()
+    sql(s"insert into partitiondecimalfailissue partition(salary='13000000.7878788') select ID,date,country,name,phonetype,serialname from partitiondecimalfailissue" )
+    sql(s"select * from partitiondecimalfailissue").show(100)
+  }
+
+  test("data loading with decimalissue partition issue") {
+    sql("DROP TABLE IF EXISTS decimalissue")
+    sql("create table decimalissue(a int) partitioned by (b decimal(2,2)) stored by 'carbondata'")
+    sql("insert into decimalissue values(23,21.2)")
+    checkAnswer(sql("select * from decimalissue"), Seq(Row(23,null)))
+  }
+
+  test("data loading scalar query partition issue") {
+    sql("DROP TABLE IF EXISTS scalarissue")
+    sql("create table scalarissue(a int) partitioned by (salary double) stored by 'carbondata'")
+    sql("insert into scalarissue values(23,21.2)")
+    sql("DROP TABLE IF EXISTS scalarissue_hive")
+    sql("create table scalarissue_hive(a int,salary double) using parquet partitioned by (salary) ")
+    sql("set hive.exec.dynamic.partition.mode=nonstrict")
+    sql("insert into scalarissue_hive values(23,21.2)")
+    intercept[Exception] {
+      sql(s"select * from scalarissue_hive where salary = (select max(salary) from scalarissue_hive)").show()
+    }
+    intercept[Exception] {
+      sql(s"select * from scalarissue where salary = (select max(salary) from scalarissue)").show()
+    }
+  }
+
+  test("global sort badrecords fail on partition column message") {
+    sql("DROP TABLE IF EXISTS badrecordsPartitionfailmessage")
+    sql("create table badrecordsPartitionfailmessage(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')")
+    val ex = intercept[Exception] {
+      sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionfailmessage options('bad_records_action'='fail')")
+    }
+    println(ex.getMessage.startsWith("DataLoad failure: Data load failed due to bad record"))
+  }
+
+  test("multiple compaction on partition table") {
+    sql("DROP TABLE IF EXISTS comp_dt2")
+    sql("create table comp_dt2(id int,name string) partitioned by (dt date,c4 int) stored by 'carbondata'")
+    sql("insert into comp_dt2 select 1,'A','2001-01-01',1")
+    sql("insert into comp_dt2 select 2,'B','2001-01-01',1")
+    sql("insert into comp_dt2 select 3,'C','2002-01-01',2")
+    sql("insert into comp_dt2 select 4,'D','2002-01-01',null")
+    assert(sql("select * from comp_dt2").collect().length == 4)
+    sql("Alter table comp_dt2 compact 'minor'")
+    assert(sql("select * from comp_dt2").collect().length == 4)
+    sql("clean files for table comp_dt2")
+    assert(sql("select * from comp_dt2").collect().length == 4)
+    sql("insert into comp_dt2 select 5,'E','2003-01-01',3")
+    sql("insert into comp_dt2 select 6,'F','2003-01-01',3")
+    sql("insert into comp_dt2 select 7,'G','2003-01-01',4")
+    sql("insert into comp_dt2 select 8,'H','2004-01-01',''")
+    assert(sql("select * from comp_dt2").collect().length == 8)
+    sql("Alter table comp_dt2 compact 'minor'")
+    sql("clean files for table comp_dt2")
+    assert(sql("select * from comp_dt2").collect().length == 8)
+    assert(sql("select * from comp_dt2").collect().length == 8)
+    sql("insert into comp_dt2 select 9,'H','2001-01-01',1")
+    sql("insert into comp_dt2 select 10,'I','2002-01-01',null")
+    sql("insert into comp_dt2 select 11,'J','2003-01-01',4")
+    sql("insert into comp_dt2 select 12,'K','2003-01-01',5")
+    assert(sql("select * from comp_dt2").collect().length == 12)
+    sql("Alter table comp_dt2 compact 'minor'")
+    assert(sql("show segments for table comp_dt2").collect().length == 8)
+    assert(sql("select * from comp_dt2").collect().length == 12)
+    sql("clean files for table comp_dt2")
+    assert(sql("select * from comp_dt2").collect().length == 12)
+    sql("insert into comp_dt2 select 13,'L','2004-01-01', 6")
+    assert(sql("select * from comp_dt2").collect().length == 13)
+    sql("Alter table comp_dt2 compact 'major'")
+    assert(sql("select * from comp_dt2").collect().length == 13)
+    assert(sql("show segments for table comp_dt2").collect().length == 3)
+    assert(sql("select * from comp_dt2").collect().length == 13)
+    sql("clean files for table comp_dt2")
+  }
 
   override def afterAll = {
     CarbonProperties.getInstance()
@@ -632,7 +925,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION ,
       CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
-//    dropTable
+    dropTable
     if (executorService != null && !executorService.isShutdown) {
       executorService.shutdownNow()
     }
@@ -680,5 +973,8 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     sql("drop table if exists noLoadTable")
     sql("drop table if exists carbon_test")
     sql("drop table if exists carbon_test_hive")
+    sql("drop table if exists partitiondecimal")
+    sql("drop table if exists partitiondecimalstatic")
+    sql("drop table if exists partitiondatadelete")
   }
 }