You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/25 10:21:01 UTC

[1/4] incubator-carbondata git commit: change ScanRdd to use RecordReader

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 5c697e942 -> e05c0d5da


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
index e3b74ad..4036a8b 100644
--- a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
@@ -60,29 +60,23 @@ public class SegmentStatusManager {
   private static final LogService LOG =
       LogServiceFactory.getLogService(SegmentStatusManager.class.getName());
 
-  private AbsoluteTableIdentifier absoluteTableIdentifier;
-
-  public SegmentStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) {
-    this.absoluteTableIdentifier = absoluteTableIdentifier;
-  }
-
   /**
    * This will return the lock object used to lock the table status file before updation.
    *
    * @return
    */
-  public ICarbonLock getTableStatusLock() {
-    return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+  public static ICarbonLock getTableStatusLock(AbsoluteTableIdentifier identifier) {
+    return CarbonLockFactory.getCarbonLockObj(identifier.getCarbonTableIdentifier(),
         LockUsage.TABLE_STATUS_LOCK);
   }
 
   /**
    * This method will return last modified time of tablestatus file
    */
-  public long getTableStatusLastModifiedTime() throws IOException {
-    String tableStatusPath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier()).getTableStatusFilePath();
+  public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identifier)
+      throws IOException {
+    String tableStatusPath = CarbonStorePath.getCarbonTablePath(identifier.getStorePath(),
+            identifier.getCarbonTableIdentifier()).getTableStatusFilePath();
     if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
       return 0L;
     } else {
@@ -97,15 +91,15 @@ public class SegmentStatusManager {
    * @return
    * @throws IOException
    */
-  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
+  public static SegmentStatus getSegmentStatus(AbsoluteTableIdentifier identifier)
+      throws IOException {
 
     // @TODO: move reading LoadStatus file to separate class
-    List<String> listOfValidSegments = new ArrayList<String>(10);
-    List<String> listOfValidUpdatedSegments = new ArrayList<String>(10);
-    List<String> listOfInvalidSegments = new ArrayList<String>(10);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
+    List<String> validSegments = new ArrayList<String>(10);
+    List<String> validUpdatedSegments = new ArrayList<String>(10);
+    List<String> invalidSegments = new ArrayList<String>(10);
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier.getStorePath(),
+        identifier.getCarbonTableIdentifier());
     String dataPath = carbonTablePath.getTableStatusFilePath();
     DataInputStream dataInputStream = null;
     Gson gsonObjectToRead = new Gson();
@@ -114,9 +108,7 @@ public class SegmentStatusManager {
     LoadMetadataDetails[] loadFolderDetailsArray;
     try {
       if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) {
-
         dataInputStream = fileOperation.openForRead();
-
         BufferedReader buffReader =
             new BufferedReader(
                     new InputStreamReader(dataInputStream, CarbonCommonConstants.DEFAULT_CHARSET));
@@ -126,40 +118,31 @@ public class SegmentStatusManager {
         List<LoadMetadataDetails> loadFolderDetails = Arrays.asList(loadFolderDetailsArray);
 
         for (LoadMetadataDetails loadMetadataDetails : loadFolderDetails) {
-          if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-              || CarbonCommonConstants.MARKED_FOR_UPDATE
-              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-              || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+          String loadStatus = loadMetadataDetails.getLoadStatus();
+          if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS.equalsIgnoreCase(loadStatus)
+              || CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus)
+              || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equalsIgnoreCase(
+                  loadStatus)) {
             // check for merged loads.
             if (null != loadMetadataDetails.getMergedLoadName()) {
-              if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
-                listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
+              if (!validSegments.contains(loadMetadataDetails.getMergedLoadName())) {
+                validSegments.add(loadMetadataDetails.getMergedLoadName());
               }
               // if merged load is updated then put it in updated list
-              if (CarbonCommonConstants.MARKED_FOR_UPDATE
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
-                listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
+              if (CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus)) {
+                validUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
               }
               continue;
             }
-
-            if (CarbonCommonConstants.MARKED_FOR_UPDATE
-                .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
-
-              listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
+            if (CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus)) {
+              validUpdatedSegments.add(loadMetadataDetails.getLoadName());
             }
-            listOfValidSegments.add(loadMetadataDetails.getLoadName());
-          } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-              || CarbonCommonConstants.SEGMENT_COMPACTED
-              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-              || CarbonCommonConstants.MARKED_FOR_DELETE
-              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
-            listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
+            validSegments.add(loadMetadataDetails.getLoadName());
+          } else if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equalsIgnoreCase(loadStatus)
+              || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(loadStatus)
+              || CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadStatus)) {
+            invalidSegments.add(loadMetadataDetails.getLoadName());
           }
-
         }
       }
     } catch (IOException e) {
@@ -167,7 +150,6 @@ public class SegmentStatusManager {
       throw e;
     } finally {
       try {
-
         if (null != dataInputStream) {
           dataInputStream.close();
         }
@@ -175,10 +157,9 @@ public class SegmentStatusManager {
         LOG.error(e);
         throw e;
       }
-
     }
-    return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
-        listOfInvalidSegments);
+
+    return new SegmentStatus(validSegments, validUpdatedSegments, invalidSegments);
   }
 
   /**
@@ -187,7 +168,7 @@ public class SegmentStatusManager {
    * @param tableFolderPath
    * @return
    */
-  public LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) {
+  public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) {
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
@@ -223,13 +204,9 @@ public class SegmentStatusManager {
    *
    * @return
    */
-  private String readCurrentTime() {
+  private static String readCurrentTime() {
     SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    String date = null;
-
-    date = sdf.format(new Date());
-
-    return date;
+    return sdf.format(new Date());
   }
 
   /**
@@ -240,8 +217,7 @@ public class SegmentStatusManager {
    * @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg,
    * 0 otherwise
    */
-  private Integer compareDateValues(Long loadValue, Long userValue) {
-
+  private static Integer compareDateValues(Long loadValue, Long userValue) {
     return loadValue.compareTo(userValue);
   }
 
@@ -252,10 +228,9 @@ public class SegmentStatusManager {
    * @param tableFolderPath
    * @return
    */
-  public List<String> updateDeletionStatus(List<String> loadIds, String tableFolderPath)
-      throws Exception {
-    CarbonTableIdentifier carbonTableIdentifier =
-        absoluteTableIdentifier.getCarbonTableIdentifier();
+  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
+      List<String> loadIds, String tableFolderPath) throws Exception {
+    CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
     ICarbonLock carbonDeleteSegmentLock =
         CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK);
     ICarbonLock carbonTableStatusLock =
@@ -267,9 +242,8 @@ public class SegmentStatusManager {
       if (carbonDeleteSegmentLock.lockWithRetries()) {
         LOG.info("Delete segment lock has been successfully acquired");
 
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                absoluteTableIdentifier.getCarbonTableIdentifier());
+        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+            identifier.getStorePath(), identifier.getCarbonTableIdentifier());
         String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
         LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
         if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) {
@@ -335,10 +309,9 @@ public class SegmentStatusManager {
    * @param tableFolderPath
    * @return
    */
-  public List<String> updateDeletionStatus(String loadDate, String tableFolderPath,
-      Long loadStartTime) throws Exception {
-    CarbonTableIdentifier carbonTableIdentifier =
-        absoluteTableIdentifier.getCarbonTableIdentifier();
+  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
+      String loadDate, String tableFolderPath, Long loadStartTime) throws Exception {
+    CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
     ICarbonLock carbonDeleteSegmentLock =
         CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK);
     ICarbonLock carbonTableStatusLock =
@@ -350,9 +323,8 @@ public class SegmentStatusManager {
       if (carbonDeleteSegmentLock.lockWithRetries()) {
         LOG.info("Delete segment lock has been successfully acquired");
 
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                absoluteTableIdentifier.getCarbonTableIdentifier());
+        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+            identifier.getStorePath(), identifier.getCarbonTableIdentifier());
         String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
         LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
 
@@ -422,7 +394,7 @@ public class SegmentStatusManager {
    * @param listOfLoadFolderDetailsArray
    * @throws IOException
    */
-  public void writeLoadDetailsIntoFile(String dataLoadLocation,
+  public static void writeLoadDetailsIntoFile(String dataLoadLocation,
       LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
     AtomicFileOperations fileWrite =
         new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
@@ -458,7 +430,7 @@ public class SegmentStatusManager {
    * @param invalidLoadIds
    * @return invalidLoadIds
    */
-  public List<String> updateDeletionStatus(List<String> loadIds,
+  public static List<String> updateDeletionStatus(List<String> loadIds,
       LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds) {
     for (String loadId : loadIds) {
       boolean loadFound = false;
@@ -502,7 +474,7 @@ public class SegmentStatusManager {
    * @param invalidLoadTimestamps
    * @return invalidLoadTimestamps
    */
-  public List<String> updateDeletionStatus(String loadDate,
+  public static List<String> updateDeletionStatus(String loadDate,
       LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps,
       Long loadStartTime) {
     // For each load timestamp loop through data and if the
@@ -543,7 +515,7 @@ public class SegmentStatusManager {
    *
    * @param streams - streams to close.
    */
-  private void closeStreams(Closeable... streams) {
+  private static void closeStreams(Closeable... streams) {
     // Added if to avoid NullPointerException in case one stream is being passed as null
     if (null != streams) {
       for (Closeable stream : streams) {
@@ -566,7 +538,7 @@ public class SegmentStatusManager {
    * @return
    */
 
-  public List<LoadMetadataDetails> updateLatestTableStatusDetails(
+  public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
       LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {
 
     List<LoadMetadataDetails> newListMetadata =
@@ -584,7 +556,7 @@ public class SegmentStatusManager {
    *
    * @param loadMetadata
    */
-  public void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
+  public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
     // update status only if the segment is not marked for delete
     if (!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus())) {
       loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
@@ -593,28 +565,28 @@ public class SegmentStatusManager {
   }
 
 
-  public static class ValidAndInvalidSegmentsInfo {
-    private final List<String> listOfValidSegments;
-    private final List<String> listOfValidUpdatedSegments;
-    private final List<String> listOfInvalidSegments;
-
-    private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
-        List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) {
-      this.listOfValidSegments = listOfValidSegments;
-      this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
-      this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
-    }
+  public static class SegmentStatus {
+    private final List<String> validSegments;
+    private final List<String> validUpdatedSegments;
+    private final List<String> invalidSegments;
 
-    public List<String> getInvalidSegments() {
-      return listOfInvalidSegments;
+    private SegmentStatus(List<String> validSegments, List<String> validUpdatedSegments,
+        List<String> invalidSegments) {
+      this.validSegments = validSegments;
+      this.validUpdatedSegments = validUpdatedSegments;
+      this.invalidSegments = invalidSegments;
     }
 
     public List<String> getValidSegments() {
-      return listOfValidSegments;
+      return validSegments;
     }
 
     public List<String> getUpadtedSegments() {
-      return listOfValidUpdatedSegments;
+      return validUpdatedSegments;
+    }
+
+    public List<String> getInvalidSegments() {
+      return invalidSegments;
     }
   }
 }


[3/4] incubator-carbondata git commit: change ScanRdd to use RecordReader

Posted by ra...@apache.org.
change ScanRdd to use RecordReader

fix style


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5f6a56ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5f6a56ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5f6a56ca

Branch: refs/heads/master
Commit: 5f6a56cac26be7e5307a01296ce9351b60be0e66
Parents: 5c697e9
Author: jackylk <ja...@huawei.com>
Authored: Fri Nov 25 18:07:20 2016 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Nov 25 15:49:53 2016 +0530

----------------------------------------------------------------------
 .../carbon/datastore/block/Distributable.java   |   8 +-
 .../carbon/datastore/block/TableBlockInfo.java  |   3 +-
 .../carbon/datastore/block/TableTaskInfo.java   |   2 +-
 .../scan/filter/FilterExpressionProcessor.java  |  29 +-
 .../examples/DataFrameAPIExample.scala          |   5 +-
 .../carbondata/hadoop/CarbonInputFormat.java    | 334 +++++-----------
 .../carbondata/hadoop/CarbonInputSplit.java     | 112 +++++-
 .../hadoop/CarbonMultiBlockSplit.java           | 105 +++++
 .../carbondata/hadoop/CarbonRecordReader.java   |  34 +-
 .../hadoop/readsupport/CarbonReadSupport.java   |   2 +-
 .../AbstractDictionaryDecodedReadSupport.java   |   2 +-
 .../impl/ArrayWritableReadSupport.java          |   2 +-
 .../readsupport/impl/RawDataReadSupport.java    |   6 +-
 .../hadoop/util/CarbonInputFormatUtil.java      |  14 +
 .../hadoop/ft/CarbonInputMapperTest.java        |  15 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |  99 +----
 .../spark/merger/CarbonDataMergerUtil.java      |  11 +-
 .../readsupport/SparkRowReadSupportImpl.java    |   9 +-
 .../carbondata/spark/util/LoadMetadataUtil.java |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  20 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 151 +++-----
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 387 ++++++++-----------
 .../apache/carbondata/spark/rdd/Compactor.scala |  13 +-
 .../carbondata/spark/util/QueryPlanUtil.scala   |  56 ---
 .../sql/CarbonDatasourceHadoopRelation.scala    |  20 +-
 .../spark/sql/CarbonDatasourceRelation.scala    |   3 +-
 .../org/apache/spark/sql/CarbonOperators.scala  | 191 ---------
 .../scala/org/apache/spark/sql/CarbonScan.scala | 186 +++++++++
 .../execution/command/carbonTableSchema.scala   |  20 +-
 .../spark/sql/hive/DistributionUtil.scala       |  21 +-
 .../AllDataTypesTestCaseAggregate.scala         |  20 +-
 .../CompactionSystemLockFeatureTest.scala       |  20 +-
 .../DataCompactionBoundaryConditionsTest.scala  |   3 -
 .../DataCompactionCardinalityBoundryTest.scala  |  10 +-
 .../datacompaction/DataCompactionLockTest.scala |  11 +-
 .../DataCompactionMinorThresholdTest.scala      |   6 +-
 .../DataCompactionNoDictionaryTest.scala        |  10 +-
 .../datacompaction/DataCompactionTest.scala     |  14 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |  32 +-
 .../MajorCompactionStopsAfterCompaction.scala   |  14 +-
 .../dataretention/DataRetentionTestCase.scala   |   5 +-
 .../lcm/status/SegmentStatusManager.java        | 160 ++++----
 42 files changed, 964 insertions(+), 1205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
index 817aafc..99d4459 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
@@ -16,10 +16,12 @@
  */
 package org.apache.carbondata.core.carbon.datastore.block;
 
+import java.io.IOException;
+
 /**
- * Abstract class which is maintains the locations of node.
+ * interface to get the locations of node. Used for making task distribution based on locality
  */
-public abstract class Distributable implements Comparable<Distributable> {
+public interface Distributable extends Comparable<Distributable> {
 
-  public abstract String[] getLocations();
+  String[] getLocations() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index f8da9af..4bf0047 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -28,8 +28,7 @@ import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
  * class will be used to pass the block detail detail will be passed form driver
  * to all the executor to load the b+ tree
  */
-public class TableBlockInfo extends Distributable
-    implements Serializable, Comparable<Distributable> {
+public class TableBlockInfo implements Distributable, Serializable {
 
   /**
    * serialization id

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java
index 1f8caf0..7ce3a14 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java
@@ -27,7 +27,7 @@ import java.util.TreeMap;
 /**
  * This class is responsible for maintaining the mapping of tasks of a node.
  */
-public class TableTaskInfo extends Distributable {
+public class TableTaskInfo implements Distributable {
 
   private final List<TableBlockInfo> tableBlockInfoList;
   private final String taskId;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java
index 1541867..44a3a31 100644
--- a/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNode;
 import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
@@ -127,12 +126,10 @@ public class FilterExpressionProcessor implements FilterProcessor {
     FilterExecuter filterExecuter =
         FilterUtil.getFilterExecuterTree(filterResolver, tableSegment.getSegmentProperties(),null);
     while (startBlock != endBlock) {
-      addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock,
-          tableSegment.getSegmentProperties());
+      addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock);
       startBlock = startBlock.getNextDataRefNode();
     }
-    addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock,
-        tableSegment.getSegmentProperties());
+    addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock);
     LOGGER.info("Total Time in retrieving the data reference node" + "after scanning the btree " + (
         System.currentTimeMillis() - startTimeInMillis)
         + " Total number of data reference node for executing filter(s) " + listOfDataBlocksToScan
@@ -147,11 +144,9 @@ public class FilterExpressionProcessor implements FilterProcessor {
    * @param filterResolver
    * @param listOfDataBlocksToScan
    * @param dataRefNode
-   * @param segmentProperties
    */
   private void addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter,
-      List<DataRefNode> listOfDataBlocksToScan, DataRefNode dataRefNode,
-      SegmentProperties segmentProperties) {
+      List<DataRefNode> listOfDataBlocksToScan, DataRefNode dataRefNode) {
 
     BitSet bitSet = filterExecuter
         .isScanRequired(dataRefNode.getColumnsMaxValue(), dataRefNode.getColumnsMinValue());
@@ -174,7 +169,7 @@ public class FilterExpressionProcessor implements FilterProcessor {
   private FilterResolverIntf getFilterResolvertree(Expression expressionTree,
       AbsoluteTableIdentifier tableIdentifier) throws FilterUnsupportedException {
     FilterResolverIntf filterEvaluatorTree =
-        createFilterResolverTree(expressionTree, tableIdentifier, null);
+        createFilterResolverTree(expressionTree, tableIdentifier);
     traverseAndResolveTree(filterEvaluatorTree, tableIdentifier);
     return filterEvaluatorTree;
   }
@@ -212,24 +207,22 @@ public class FilterExpressionProcessor implements FilterProcessor {
    * @return
    */
   private FilterResolverIntf createFilterResolverTree(Expression expressionTree,
-      AbsoluteTableIdentifier tableIdentifier, Expression intermediateExpression) {
+      AbsoluteTableIdentifier tableIdentifier) {
     ExpressionType filterExpressionType = expressionTree.getFilterExpressionType();
     BinaryExpression currentExpression = null;
     switch (filterExpressionType) {
       case OR:
         currentExpression = (BinaryExpression) expressionTree;
         return new LogicalFilterResolverImpl(
-            createFilterResolverTree(currentExpression.getLeft(), tableIdentifier,
-                currentExpression),
-            createFilterResolverTree(currentExpression.getRight(), tableIdentifier,
-                currentExpression),currentExpression);
+            createFilterResolverTree(currentExpression.getLeft(), tableIdentifier),
+            createFilterResolverTree(currentExpression.getRight(), tableIdentifier),
+            currentExpression);
       case AND:
         currentExpression = (BinaryExpression) expressionTree;
         return new LogicalFilterResolverImpl(
-            createFilterResolverTree(currentExpression.getLeft(), tableIdentifier,
-                currentExpression),
-            createFilterResolverTree(currentExpression.getRight(), tableIdentifier,
-                currentExpression), currentExpression);
+            createFilterResolverTree(currentExpression.getLeft(), tableIdentifier),
+            createFilterResolverTree(currentExpression.getRight(), tableIdentifier),
+            currentExpression);
       case EQUALS:
       case IN:
         return getFilterResolverBasedOnExpressionType(ExpressionType.EQUALS,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index 49fb0da..97fa152 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -24,7 +24,7 @@ object DataFrameAPIExample {
 
   def main(args: Array[String]) {
     val cc = ExampleUtils.createCarbonContext("DataFrameAPIExample")
-    ExampleUtils.writeSampleCarbonFile(cc, "carbon1")
+    ExampleUtils.writeSampleCarbonFile(cc, "carbon1", 1000)
 
     // use datasource api to read
     val in = cc.read
@@ -42,7 +42,8 @@ object DataFrameAPIExample {
     println(s"count after 2 load: $count")
 
     // use SQL to read
-    cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show
+    cc.sql("SELECT c1, count(c3) FROM carbon1 where c3 > 500 group by c1 limit 10").show
+
     cc.sql("DROP TABLE IF EXISTS carbon1")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 4a51f52..a44a78c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -25,12 +25,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
@@ -50,7 +44,6 @@ import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.carbon.querystatistics.*;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
@@ -61,7 +54,6 @@ import org.apache.carbondata.hadoop.util.SchemaReader;
 import org.apache.carbondata.lcm.status.SegmentStatusManager;
 import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.scan.expression.Expression;
-import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.scan.filter.FilterUtil;
 import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf;
@@ -74,6 +66,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -130,6 +123,11 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
   }
 
+  public static void setTablePath(Configuration configuration, String tablePath)
+      throws IOException {
+    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+  }
+
   /**
    * It sets unresolved filter expression.
    *
@@ -137,26 +135,10 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * @param filterExpression
    */
   public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
-    try {
-      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
-      configuration.set(FILTER_PREDICATE, filterString);
-    } catch (Exception e) {
-      throw new RuntimeException("Error while setting filter expression to Job", e);
+    if (filterExpression == null) {
+      return;
     }
-  }
-
-  /**
-   * It sets the resolved filter expression
-   *
-   * @param configuration
-   * @param filterExpression
-   */
-  public static void setFilterPredicates(Configuration configuration,
-      FilterResolverIntf filterExpression) {
     try {
-      if (filterExpression == null) {
-        return;
-      }
       String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
       configuration.set(FILTER_PREDICATE, filterString);
     } catch (Exception e) {
@@ -164,7 +146,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     }
   }
 
-  public static void setColumnProjection(CarbonProjection projection, Configuration configuration) {
+  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
     if (projection == null || projection.isEmpty()) {
       return;
     }
@@ -178,6 +160,10 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(COLUMN_PROJECTION, columnString);
   }
 
+  public static String getColumnProjection(Configuration configuration) {
+    return configuration.get(COLUMN_PROJECTION);
+  }
+
   public static void setCarbonReadSupport(Class<? extends CarbonReadSupport> readSupportClass,
       Configuration configuration) {
     if (readSupportClass != null) {
@@ -191,31 +177,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
   /**
-   * Set List of segments to access
+   * Set list of segments to access
    */
   public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
-    configuration
-        .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
+    configuration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS,
+        CarbonUtil.getSegmentString(validSegments));
   }
 
   /**
-   * Below method will be used to set the segments details if
-   * segments are not added in the configuration
-   *
-   * @param job
-   * @param absoluteTableIdentifier
-   * @throws IOException
-   */
-  private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier)
-      throws IOException {
-    if (getSegmentsFromConfiguration(job).length == 0) {
-      // Get the valid segments from the carbon store.
-      SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
-          new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
-      setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments());
-    }
-  }
-  /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR
    * are used to get table path to read.
@@ -224,42 +193,46 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * @return List<InputSplit> list of CarbonInputSplit
    * @throws IOException
    */
-  @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
-    try {
-      CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
-      Object filterPredicates = getFilterPredicates(job.getConfiguration());
-      AbsoluteTableIdentifier absoluteTableIdentifier =
-          getAbsoluteTableIdentifier(job.getConfiguration());
-      addSegmentsIfEmpty(job, absoluteTableIdentifier);
-      if (filterPredicates == null) {
-        return getSplitsNonFilter(job);
-      } else {
-        if (filterPredicates instanceof Expression) {
-          //process and resolve the expression.
-          CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable);
-          return getSplits(job, CarbonInputFormatUtil
-              .resolveFilter((Expression) filterPredicates, absoluteTableIdentifier));
-        } else {
-          //It means user sets already resolved expression.
-          return getSplits(job, (FilterResolverIntf) filterPredicates);
-        }
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+    List<String> invalidSegments = new ArrayList<>();
+
+    // get all valid segments and set them into the configuration
+    if (getSegmentsToAccess(job).length == 0) {
+      SegmentStatusManager.SegmentStatus segments =
+          SegmentStatusManager.getSegmentStatus(identifier);
+      setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments());
+      if (segments.getValidSegments().size() == 0) {
+        return new ArrayList<>(0);
+      }
+
+      // remove entry in the segment index if there are invalid segments
+      invalidSegments.addAll(segments.getInvalidSegments());
+      if (invalidSegments.size() > 0) {
+        SegmentTaskIndexStore.getInstance().removeTableBlocks(invalidSegments, identifier);
       }
-    } catch (Exception ex) {
-      throw new IOException(ex);
     }
-  }
 
-  /**
-   * the method will return the blocks to be scanned with blocklets info
-   *
-   * @param job
-   * @return
-   * @throws IOException
-   * @throws IndexBuilderException
-   */
-  private List<InputSplit> getSplitsNonFilter(JobContext job)
-      throws IOException, IndexBuilderException {
-    return getSplits(job, null);
+    // process and resolve the expression
+    Expression filter = getFilterPredicates(job.getConfiguration());
+    CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+    List<InputSplit> splits;
+    try {
+      // do block filtering and get split
+      splits = getSplits(job, filterInterface);
+    } catch (IndexBuilderException e) {
+      throw new IOException(e);
+    }
+    // pass the invalid segment to task side in order to remove index entry in task side
+    if (invalidSegments.size() > 0) {
+      for (InputSplit split : splits) {
+        ((CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+      }
+    }
+    return splits;
   }
 
   private List<InputSplit> getSplitsInternal(JobContext job) throws IOException {
@@ -296,7 +269,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         getAbsoluteTableIdentifier(job.getConfiguration());
 
     //for each segment fetch blocks matching filter in Driver BTree
-    for (String segmentNo : getSegmentsFromConfiguration(job)) {
+    for (String segmentNo : getSegmentsToAccess(job)) {
       List<DataRefNode> dataRefNodes =
           getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier,
               filterResolver, segmentNo);
@@ -311,98 +284,23 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return result;
   }
 
-  /**
-   * get total number of rows. Same as count(*)
-   *
-   * @throws IOException
-   * @throws IndexBuilderException
-   */
-  public long getRowCount(JobContext job) throws IOException, IndexBuilderException {
-
-    long rowCount = 0;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        getAbsoluteTableIdentifier(job.getConfiguration());
-    // no of core to load the blocks in driver
-    addSegmentsIfEmpty(job, absoluteTableIdentifier);
-    int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
-    try {
-      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
-    } catch (NumberFormatException e) {
-      numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
-    }
-    // creating a thread pool
-    ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
-    List<Future<Map<String, AbstractIndex>>> loadedBlocks =
-        new ArrayList<Future<Map<String, AbstractIndex>>>();
-    //for each segment fetch blocks matching filter in Driver BTree
-    for (String segmentNo : getSegmentsFromConfiguration(job)) {
-      // submitting the task
-      loadedBlocks
-          .add(threadPool.submit(new BlocksLoaderThread(job, absoluteTableIdentifier, segmentNo)));
-    }
-    threadPool.shutdown();
-    try {
-      threadPool.awaitTermination(1, TimeUnit.HOURS);
-    } catch (InterruptedException e) {
-      throw new IndexBuilderException(e);
-    }
-    try {
-      // adding all the rows of the blocks to get the total row
-      // count
-      for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
-        for (AbstractIndex abstractIndex : block.get().values()) {
-          rowCount += abstractIndex.getTotalNumberOfRows();
-        }
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IndexBuilderException(e);
-    }
-    return rowCount;
-  }
-
-  /**
-   * {@inheritDoc}
-   * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS
-   * are used to get table path to read.
-   *
-   * @return
-   * @throws IOException
-   */
-  public FilterResolverIntf getResolvedFilter(Configuration configuration,
-      Expression filterExpression)
-      throws IOException, IndexBuilderException, QueryExecutionException {
-    if (filterExpression == null) {
-      return null;
-    }
-    FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
-    AbsoluteTableIdentifier absoluteTableIdentifier = getAbsoluteTableIdentifier(configuration);
-    //get resolved filter
-    try {
-      return filterExpressionProcessor.getFilterResolver(filterExpression, absoluteTableIdentifier);
-    } catch (FilterUnsupportedException e) {
-      throw new QueryExecutionException(e.getMessage());
-    }
-  }
-
-  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
-      throws IOException {
+  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) {
     String dirs = configuration.get(INPUT_DIR, "");
     String[] inputPaths = StringUtils.split(dirs);
     if (inputPaths.length == 0) {
-      throw new IOException("No input paths specified in job");
+      throw new InvalidPathException("No input paths specified in job");
     }
     return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
   }
 
-  private Object getFilterPredicates(Configuration configuration) {
+  private Expression getFilterPredicates(Configuration configuration) {
     try {
       String filterExprString = configuration.get(FILTER_PREDICATE);
       if (filterExprString == null) {
         return null;
       }
-      Object filterExprs = ObjectSerializationUtil.convertStringToObject(filterExprString);
-      return filterExprs;
+      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+      return (Expression) filter;
     } catch (IOException e) {
       throw new RuntimeException("Error while reading filter expression", e);
     }
@@ -452,16 +350,13 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * Below method will be used to get the table block info
    *
    * @param job                     job context
-   * @param absoluteTableIdentifier absolute table identifier
    * @param segmentId               number of segment id
    * @return list of table block
    * @throws IOException
    */
-  private List<TableBlockInfo> getTableBlockInfo(JobContext job,
-      AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException {
-    // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+  private List<TableBlockInfo> getTableBlockInfo(JobContext job, String segmentId)
+      throws IOException {
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
-    // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
 
     // get file location of all files of given segment
     JobContext newJob =
@@ -489,10 +384,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
 
     // if segment tree is not loaded, load the segment tree
     if (segmentIndexMap == null) {
-      // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
       List<TableBlockInfo> tableBlockInfoList =
-          getTableBlockInfo(job, absoluteTableIdentifier, segmentId);
-      // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
+          getTableBlockInfo(job, segmentId);
 
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
       segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
@@ -535,30 +428,33 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return blocks;
   }
 
-  @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+  @Override
+  public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getCarbonTable(configuration);
-    QueryModel queryModel;
-    try {
-      CarbonQueryPlan queryPlan =
-          CarbonInputFormatUtil.createQueryPlan(carbonTable, configuration.get(COLUMN_PROJECTION));
-      queryModel =
-          QueryModel.createModel(getAbsoluteTableIdentifier(configuration), queryPlan, carbonTable);
-      Object filterPredicates = getFilterPredicates(configuration);
-      if (filterPredicates != null) {
-        if (filterPredicates instanceof Expression) {
-          CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable);
-          queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil
-              .resolveFilter((Expression) filterPredicates,
-                  getAbsoluteTableIdentifier(configuration)));
-        } else {
-          queryModel.setFilterExpressionResolverTree((FilterResolverIntf) filterPredicates);
-        }
+    AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(configuration);
+
+    // query plan includes projection column
+    String projection = getColumnProjection(configuration);
+    CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
+    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+
+    // set the filter to the query model in order to filter blocklet before scan
+    Expression filter = getFilterPredicates(configuration);
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+
+    // update the file level index store if there are invalid segment
+    if (inputSplit instanceof CarbonMultiBlockSplit) {
+      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+      if (invalidSegments.size() > 0) {
+        queryModel.setInvalidSegmentIds(invalidSegments);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
     }
+
     CarbonReadSupport readSupport = getReadSupportClass(configuration);
     return new CarbonRecordReader<T>(queryModel, readSupport);
   }
@@ -586,17 +482,20 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return readSupport;
   }
 
-  @Override protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
+  @Override
+  protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
     return super.computeSplitSize(blockSize, minSize, maxSize);
   }
 
-  @Override protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+  @Override
+  protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
     return super.getBlockIndex(blkLocations, offset);
   }
 
-  @Override protected List<FileStatus> listStatus(JobContext job) throws IOException {
+  @Override
+  protected List<FileStatus> listStatus(JobContext job) throws IOException {
     List<FileStatus> result = new ArrayList<FileStatus>();
-    String[] segmentsToConsider = getSegmentsFromConfiguration(job);
+    String[] segmentsToConsider = getSegmentsToAccess(job);
     if (segmentsToConsider.length == 0) {
       throw new IOException("No segments found");
     }
@@ -605,7 +504,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return result;
   }
 
-  @Override protected boolean isSplitable(JobContext context, Path filename) {
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
     try {
       // Don't split the file if it is local file system
       FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
@@ -626,7 +526,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       throw new IOException("No partitions/data found");
     }
 
-    PathFilter inputFilter = getDataFileFilter(job);
+    PathFilter inputFilter = getDataFileFilter();
     CarbonTablePath tablePath = getTablePath(job.getConfiguration());
 
     // get tokens for all the required FileSystem for table path
@@ -658,10 +558,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
   /**
-   * @param job
    * @return the PathFilter for Fact Files.
    */
-  public PathFilter getDataFileFilter(JobContext job) {
+  private PathFilter getDataFileFilter() {
     return new CarbonPathFilter(getUpdateExtension());
   }
 
@@ -676,27 +575,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
   /**
-   * @return updateExtension
+   * return valid segment to access
    */
-  private String[] getSegmentsFromConfiguration(JobContext job)
-      throws IOException {
+  private String[] getSegmentsToAccess(JobContext job) throws IOException {
     String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
-    // if no segments
     if (segmentString.trim().isEmpty()) {
       return new String[0];
     }
-
-    String[] segments = segmentString.split(",");
-    String[] segmentIds = new String[segments.length];
-    int i = 0;
-    try {
-      for (; i < segments.length; i++) {
-        segmentIds[i] = segments[i];
-      }
-    } catch (NumberFormatException e) {
-      throw new IOException("segment no:" + segments[i] + " should be integer");
-    }
-    return segmentIds;
+    return segmentString.split(",");
   }
 
   /**
@@ -709,28 +595,4 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return new String[] { "0" };
   }
 
-  /**
-   * Thread class to load the blocks
-   */
-  private class BlocksLoaderThread implements Callable<Map<String, AbstractIndex>> {
-    // job
-    private JobContext job;
-
-    // table identifier
-    private AbsoluteTableIdentifier absoluteTableIdentifier;
-
-    // segment id
-    private String segmentId;
-
-    private BlocksLoaderThread(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier,
-        String segmentId) {
-      this.job = job;
-      this.absoluteTableIdentifier = absoluteTableIdentifier;
-      this.segmentId = segmentId;
-    }
-
-    @Override public Map<String, AbstractIndex> call() throws Exception {
-      return getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index ffcd663..132ee43 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -22,8 +22,13 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
 import org.apache.hadoop.fs.Path;
@@ -33,23 +38,36 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 /**
  * Carbon input split to allow distributed read of CarbonInputFormat.
  */
-public class CarbonInputSplit extends FileSplit implements Serializable, Writable, Block {
+public class CarbonInputSplit extends FileSplit implements Distributable, Serializable, Writable,
+    Block {
 
   private static final long serialVersionUID = 3520344046772190207L;
   private String segmentId;
-  /**
+  public String taskId;
+
+  /*
+   * Invalid segments that need to be removed in task side index
+   */
+  private List<String> invalidSegments;
+
+  /*
    * Number of BlockLets in a block
    */
-  private int numberOfBlocklets = 0;
+  private int numberOfBlocklets;
 
-  public CarbonInputSplit() {
-    super(null, 0, 0, new String[0]);
+  public  CarbonInputSplit() {
+    segmentId = null;
+    taskId = "0";
+    numberOfBlocklets = 0;
+    invalidSegments = new ArrayList<>();
   }
 
-  public CarbonInputSplit(String segmentId, Path path, long start, long length,
+  private CarbonInputSplit(String segmentId, Path path, long start, long length,
       String[] locations) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
+    this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+    this.invalidSegments = new ArrayList<>();
   }
 
   public CarbonInputSplit(String segmentId, Path path, long start, long length,
@@ -67,16 +85,33 @@ public class CarbonInputSplit extends FileSplit implements Serializable, Writabl
     return segmentId;
   }
 
-  @Override public void readFields(DataInput in) throws IOException {
-
+  @Override
+  public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     this.segmentId = in.readUTF();
-
+    int numInvalidSegment = in.readInt();
+    invalidSegments = new ArrayList<>(numInvalidSegment);
+    for (int i = 0; i < numInvalidSegment; i++) {
+      invalidSegments.add(in.readUTF());
+    }
   }
 
-  @Override public void write(DataOutput out) throws IOException {
+  @Override
+  public void write(DataOutput out) throws IOException {
     super.write(out);
     out.writeUTF(segmentId);
+    out.writeInt(invalidSegments.size());
+    for (String invalidSegment: invalidSegments) {
+      out.writeUTF(invalidSegment);
+    }
+  }
+
+  public List<String> getInvalidSegments(){
+    return invalidSegments;
+  }
+
+  public void setInvalidSegments(List<String> invalidSegments) {
+    this.invalidSegments = invalidSegments;
   }
 
   /**
@@ -88,6 +123,63 @@ public class CarbonInputSplit extends FileSplit implements Serializable, Writabl
   }
 
   @Override
+  public int compareTo(Distributable o) {
+    CarbonInputSplit other = (CarbonInputSplit)o;
+    int compareResult = 0;
+    // get the segment id
+    // converr seg ID to double.
+
+    double seg1 = Double.parseDouble(segmentId);
+    double seg2 = Double.parseDouble(other.getSegmentId());
+    if (seg1 - seg2 < 0) {
+      return -1;
+    }
+    if (seg1 - seg2 > 0) {
+      return 1;
+    }
+
+    // Comparing the time task id of the file to other
+    // if both the task id of the file is same then we need to compare the
+    // offset of
+    // the file
+    String filePath1 = this.getPath().getName();
+    String filePath2 = other.getPath().getName();
+    if (CarbonTablePath.isCarbonDataFile(filePath1)) {
+      int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
+      int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
+      if (firstTaskId != otherTaskId) {
+        return firstTaskId - otherTaskId;
+      }
+      // compare the part no of both block info
+      int firstPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath1));
+      int SecondPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath2));
+      compareResult = firstPartNo - SecondPartNo;
+    } else {
+      compareResult = filePath1.compareTo(filePath2);
+    }
+    if (compareResult != 0) {
+      return compareResult;
+    }
+    return 0;
+  }
+
+  public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
+    List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+    for (CarbonInputSplit split : splitList) {
+      BlockletInfos blockletInfos = new BlockletInfos(split.getNumberOfBlocklets(), 0,
+          split.getNumberOfBlocklets());
+      try {
+        tableBlockInfoList.add(
+            new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
+                split.getLocations(), split.getLength(), blockletInfos));
+      } catch (IOException e) {
+        throw new RuntimeException("fail to get location of split: " + split, e);
+      }
+    }
+    return tableBlockInfoList;
+  }
+
+  @Override
   public String getBlockPath() {
     return getPath().getName();
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
new file mode 100644
index 0000000..a13d6ba
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This class wraps multiple blocks belong to a same node to one split.
+ * So the scanning task will scan multiple blocks. This is an optimization for concurrent query.
+ */
+public class CarbonMultiBlockSplit extends InputSplit implements Writable {
+
+  /*
+   * Splits (HDFS Blocks) for task to scan.
+   */
+  private List<CarbonInputSplit> splitList;
+
+  /*
+   * The location of all wrapped splits belong to the same node
+   */
+  private String location;
+
+  public CarbonMultiBlockSplit() {
+    splitList = null;
+    location = null;
+  }
+
+  public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
+      String location) throws IOException {
+    this.splitList = splitList;
+    this.location = location;
+  }
+
+  /**
+   * Return all splits for scan
+   * @return split list for scan
+   */
+  public List<CarbonInputSplit> getAllSplits() {
+    return splitList;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    long total = 0;
+    for (InputSplit split: splitList) {
+      total += split.getLength();
+    }
+    return total;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return new String[]{location};
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write number of splits and then write all splits
+    out.writeInt(splitList.size());
+    for (CarbonInputSplit split: splitList) {
+      split.write(out);
+    }
+    out.writeUTF(location);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // read all splits
+    int numSplit = in.readInt();
+    splitList = new ArrayList<>(numSplit);
+    for (int i = 0; i < numSplit; i++) {
+      CarbonInputSplit split = new CarbonInputSplit();
+      split.readFields(in);
+      splitList.add(split);
+    }
+    location = in.readUTF();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 443922c..91e6e46 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -25,7 +25,6 @@ import java.util.Map;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
@@ -33,7 +32,6 @@ import org.apache.carbondata.scan.executor.QueryExecutor;
 import org.apache.carbondata.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.scan.model.QueryModel;
-import org.apache.carbondata.scan.result.BatchResult;
 import org.apache.carbondata.scan.result.iterator.ChunkRowIterator;
 
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -59,22 +57,28 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
     this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
   }
 
-  @Override public void initialize(InputSplit split, TaskAttemptContext context)
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
       throws IOException, InterruptedException {
-    CarbonInputSplit carbonInputSplit = (CarbonInputSplit) split;
-    List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
-    BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
-        carbonInputSplit.getNumberOfBlocklets());
-    tableBlockInfoList.add(
-        new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
-            carbonInputSplit.getSegmentId(), carbonInputSplit.getLocations(),
-            carbonInputSplit.getLength(), blockletInfos));
+    // The input split can contain single HDFS block or multiple blocks, so firstly get all the
+    // blocks and then set them in the query model.
+    List<CarbonInputSplit> splitList;
+    if (inputSplit instanceof CarbonInputSplit) {
+      splitList = new ArrayList<>(1);
+      splitList.add((CarbonInputSplit) inputSplit);
+    } else if (inputSplit instanceof CarbonMultiBlockSplit){
+      // contains multiple blocks, this is an optimization for concurrent query.
+      CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
+      splitList = multiBlockSplit.getAllSplits();
+    } else {
+      throw new RuntimeException("unsupported input split type: " + inputSplit);
+    }
+    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
     queryModel.setTableBlockInfos(tableBlockInfoList);
-    readSupport
-        .intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+    readSupport.initialize(queryModel.getProjectionColumns(),
+        queryModel.getAbsoluteTableIdentifier());
     try {
-      carbonIterator =
-          new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+      carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
     } catch (QueryExecutionException e) {
       throw new InterruptedException(e.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
index d3419c2..1b7f577 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
@@ -31,7 +31,7 @@ public interface CarbonReadSupport<T> {
    *
    * @param carbonColumns
    */
-  public void intialize(CarbonColumn[] carbonColumns,
+  public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier);
 
   public T readRow(Object[] data);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
index 6832789..fa8ba6e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
@@ -51,7 +51,7 @@ public abstract class AbstractDictionaryDecodedReadSupport<T> implements CarbonR
    * @param carbonColumns
    * @param absoluteTableIdentifier
    */
-  @Override public void intialize(CarbonColumn[] carbonColumns,
+  @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
     this.carbonColumns = carbonColumns;
     dictionaries = new Dictionary[carbonColumns.length];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
index dae83a1..4fbcb05 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.ArrayWritable;
 
 public class ArrayWritableReadSupport implements CarbonReadSupport<ArrayWritable> {
 
-  @Override public void intialize(CarbonColumn[] carbonColumns,
+  @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
index 578ab43..59b45ad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
@@ -24,7 +24,8 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
 public class RawDataReadSupport implements CarbonReadSupport<Object[]> {
 
-  @Override public void intialize(CarbonColumn[] carbonColumns,
+  @Override
+  public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
   }
 
@@ -34,7 +35,8 @@ public class RawDataReadSupport implements CarbonReadSupport<Object[]> {
    * @param data
    * @return
    */
-  @Override public Object[] readRow(Object[] data) {
+  @Override
+  public Object[] readRow(Object[] data) {
     return data;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 0d94da4..75e004d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -19,12 +19,14 @@
 
 package org.apache.carbondata.hadoop.util;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.scan.expression.Expression;
 import org.apache.carbondata.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf;
@@ -33,6 +35,11 @@ import org.apache.carbondata.scan.model.QueryDimension;
 import org.apache.carbondata.scan.model.QueryMeasure;
 import org.apache.carbondata.scan.model.QueryModel;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+
 /**
  * Utility class
  */
@@ -81,6 +88,13 @@ public class CarbonInputFormatUtil {
     return plan;
   }
 
+  public static <V> CarbonInputFormat<V> createCarbonInputFormat(AbsoluteTableIdentifier identifier,
+      Job job) throws IOException {
+    CarbonInputFormat<V> carbonInputFormat = new CarbonInputFormat<>();
+    FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    return carbonInputFormat;
+  }
+
   private static void addQueryMeasure(CarbonQueryPlan plan, int order, CarbonMeasure measure) {
     QueryMeasure queryMeasure = new QueryMeasure(measure.getColName());
     queryMeasure.setQueryOrder(order);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index 1c21a50..8693646 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 public class CarbonInputMapperTest extends TestCase {
@@ -61,8 +60,8 @@ public class CarbonInputMapperTest extends TestCase {
     try {
       String outPath = "target/output";
       runJob(outPath, null, null);
-      Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 1000);
-      Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 7);
+      Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
+      Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath));
     } catch (Exception e) {
       Assert.assertTrue("failed", false);
       e.printStackTrace();
@@ -79,8 +78,8 @@ public class CarbonInputMapperTest extends TestCase {
       carbonProjection.addColumn("salary");
       runJob(outPath, carbonProjection, null);
 
-      Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 1000);
-      Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 3);
+      Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
+      Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
     } catch (Exception e) {
       Assert.assertTrue("failed", false);
     }
@@ -97,8 +96,8 @@ public class CarbonInputMapperTest extends TestCase {
           new EqualToExpression(new ColumnExpression("country", DataType.STRING),
               new LiteralExpression("france", DataType.STRING));
       runJob(outPath, carbonProjection, expression);
-      Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 101);
-      Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 3);
+      Assert.assertEquals("Count lines are not matching", 101, countTheLines(outPath));
+      Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
     } catch (Exception e) {
       Assert.assertTrue("failed", false);
     }
@@ -171,7 +170,7 @@ public class CarbonInputMapperTest extends TestCase {
     job.setOutputFormatClass(TextOutputFormat.class);
     AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier();
     if (projection != null) {
-      CarbonInputFormat.setColumnProjection(projection, job.getConfiguration());
+      CarbonInputFormat.setColumnProjection(job.getConfiguration(), projection);
     }
     if (filter != null) {
       CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 3d670a2..f2a1f9f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -50,9 +50,7 @@ import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.ColumnIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -236,9 +234,7 @@ public final class CarbonLoaderUtil {
       final boolean isCompactionFlow) throws IOException {
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     String metaDataLocation = carbonTable.getMetaDataFilepath();
-    SegmentStatusManager segmentStatusManager =
-        new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
-    final LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metaDataLocation);
+    final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
 
@@ -404,10 +400,7 @@ public final class CarbonLoaderUtil {
             absoluteTableIdentifier.getCarbonTableIdentifier());
 
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
-
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
 
     try {
       if (carbonLock.lockWithRetries()) {
@@ -416,7 +409,7 @@ public final class CarbonLoaderUtil {
                 + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            segmentStatusManager.readLoadMetadata(metaDataFilepath);
+            SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         String loadEnddate = readCurrentTime();
         loadMetadataDetails.setTimestamp(loadEnddate);
@@ -434,7 +427,7 @@ public final class CarbonLoaderUtil {
         }
         listOfLoadFolderDetails.add(loadMetadataDetails);
 
-        segmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
+        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
             .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
 
         status = true;
@@ -579,9 +572,13 @@ public final class CarbonLoaderUtil {
     List<NodeBlockRelation> flattenedList =
         new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (Distributable blockInfo : blockInfos) {
-      for (String eachNode : blockInfo.getLocations()) {
-        NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-        flattenedList.add(nbr);
+      try {
+        for (String eachNode : blockInfo.getLocations()) {
+          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+          flattenedList.add(nbr);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
       }
     }
     // sort the flattened data.
@@ -646,7 +643,7 @@ public final class CarbonLoaderUtil {
   /**
    * Assigning the blocks of a node to tasks.
    *
-   * @param nodeBlocksMap
+   * @param nodeBlocksMap nodeName to list of blocks mapping
    * @param noOfTasksPerNode
    * @return
    */
@@ -912,10 +909,14 @@ public final class CarbonLoaderUtil {
       // put the blocks in the set
       uniqueBlocks.add(blockInfo);
 
-      for (String eachNode : blockInfo.getLocations()) {
-        NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-        flattenedList.add(nbr);
-        nodeList.add(eachNode);
+      try {
+        for (String eachNode : blockInfo.getLocations()) {
+          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+          flattenedList.add(nbr);
+          nodeList.add(eachNode);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
       }
     }
   }
@@ -954,66 +955,6 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * method to distribute the blocklets of a block in multiple blocks
-   * @param blockInfoList
-   * @param defaultParallelism
-   * @return
-     */
-  public static List<Distributable> distributeBlockLets(List<TableBlockInfo> blockInfoList,
-      int defaultParallelism) {
-    String blockletDistributionString = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_BLOCKLET_DISTRIBUTION,
-            CarbonCommonConstants.ENABLE_BLOCKLET_DISTRIBUTION_DEFAULTVALUE);
-    boolean isBlockletDistributionEnabled = Boolean.parseBoolean(blockletDistributionString);
-    LOGGER.info("No.Of Blocks before Blocklet distribution: " + blockInfoList.size());
-    List<Distributable> tableBlockInfos = new ArrayList<Distributable>();
-    if (blockInfoList.size() < defaultParallelism && isBlockletDistributionEnabled) {
-      for (TableBlockInfo tableBlockInfo : blockInfoList) {
-        int noOfBlockLets = tableBlockInfo.getBlockletInfos().getNoOfBlockLets();
-        LOGGER.info(
-            "No.Of blocklet : " + noOfBlockLets + ".Minimum blocklets required for distribution : "
-                + minBlockLetsReqForDistribution);
-        if (noOfBlockLets < minBlockLetsReqForDistribution) {
-          tableBlockInfos.add(tableBlockInfo);
-          continue;
-        }
-        TableBlockInfo tableBlockInfo1 = null;
-        int rem = noOfBlockLets % minBlockLetsReqForDistribution;
-        int count = noOfBlockLets / minBlockLetsReqForDistribution;
-        if (rem > 0) {
-          count = count + 1;
-        }
-        for (int i = 0; i < count; i++) {
-          BlockletInfos blockletInfos = new BlockletInfos();
-          blockletInfos.setStartBlockletNumber(i * minBlockLetsReqForDistribution);
-          blockletInfos.setNumberOfBlockletToScan(minBlockLetsReqForDistribution);
-          blockletInfos.setNoOfBlockLets(blockletInfos.getNoOfBlockLets());
-          tableBlockInfo1 =
-              new TableBlockInfo(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset(),
-                  tableBlockInfo.getSegmentId(), tableBlockInfo.getLocations(),
-                  tableBlockInfo.getBlockLength(), blockletInfos);
-          tableBlockInfos.add(tableBlockInfo1);
-        }
-        //if rem is greater than 0 then for the last block
-        if (rem > 0) {
-          tableBlockInfo1.getBlockletInfos().setNumberOfBlockletToScan(rem);
-        }
-      }
-    }
-    if (tableBlockInfos.size() == 0) {
-      {
-        for (TableBlockInfo tableBlockInfo : blockInfoList) {
-          tableBlockInfos.add(tableBlockInfo);
-        }
-        LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size());
-        return tableBlockInfos;
-      }
-    }
-    LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size());
-    return tableBlockInfos;
-  }
-
-  /**
    * This will update the old table status details before clean files to the latest table status.
    * @param oldList
    * @param newList

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 6e8ab0c..9be4d47 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -133,12 +133,7 @@ public final class CarbonDataMergerUtil {
     boolean tableStatusUpdationStatus = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    SegmentStatusManager segmentStatusManager =
-        new SegmentStatusManager(absoluteTableIdentifier);
-
-    ICarbonLock carbonLock =
-        segmentStatusManager.getTableStatusLock();
+    ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
 
     try {
       if (carbonLock.lockWithRetries()) {
@@ -151,7 +146,7 @@ public final class CarbonDataMergerUtil {
 
         String statusFilePath = carbonTablePath.getTableStatusFilePath();
 
-        LoadMetadataDetails[] loadDetails = segmentStatusManager.readLoadMetadata(metaDataFilepath);
+        LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         String mergedLoadNumber = MergedLoadName.substring(
             MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
@@ -195,7 +190,7 @@ public final class CarbonDataMergerUtil {
         updatedDetailsList.add(loadMetadataDetails);
 
         try {
-          segmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
+          SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
               updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
           tableStatusUpdationStatus = true;
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index af8d5e6..bb8fc5c 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -33,10 +33,10 @@ import org.apache.spark.unsafe.types.UTF8String;
 
 public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> {
 
-  @Override public void intialize(CarbonColumn[] carbonColumns,
+  @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
-    super.intialize(carbonColumns, absoluteTableIdentifier);
-    //can intialize and generate schema here.
+    super.initialize(carbonColumns, absoluteTableIdentifier);
+    //can initialize and generate schema here.
   }
 
   @Override public Row readRow(Object[] data) {
@@ -52,6 +52,9 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
           case TIMESTAMP:
             data[i] = new Timestamp((long) data[i] / 1000);
             break;
+          case LONG:
+            data[i] = data[i];
+            break;
           default:
         }
       } else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index 6b56545..11cf9f8 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -44,9 +44,7 @@ public final class LoadMetadataUtil {
         .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
 
     String metaDataLocation = table.getMetaDataFilepath();
-    SegmentStatusManager segmentStatusManager =
-        new SegmentStatusManager(table.getAbsoluteTableIdentifier());
-    LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metaDataLocation);
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneRow : details) {
         if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4edfa6b..8445440 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -93,9 +93,8 @@ object CarbonDataRDDFactory {
     // Delete the records based on data
     val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
       .getCarbonTable(databaseName + "_" + tableName)
-    val segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
     val loadMetadataDetailsArray =
-      segmentStatusManager.readLoadMetadata(table.getMetaDataFilepath()).toList
+      SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
     val resultMap = new CarbonDeleteLoadByDateRDD(
       sc.sparkContext,
       new DeletedLoadResultImpl(),
@@ -1055,10 +1054,7 @@ object CarbonDataRDDFactory {
 
   def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = {
     val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
-    val segmentStatusManager =
-      new SegmentStatusManager(
-        model.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier)
-    val details = segmentStatusManager.readLoadMetadata(metadataPath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataPath)
     model.setLoadMetadataDetails(details.toList.asJava)
   }
 
@@ -1070,9 +1066,7 @@ object CarbonDataRDDFactory {
     if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
       val loadMetadataFilePath = CarbonLoaderUtil
         .extractLoadMetadataFileLocation(carbonLoadModel)
-      val segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
-      val details = segmentStatusManager
-        .readLoadMetadata(loadMetadataFilePath)
+      val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
       val carbonTableStatusLock = CarbonLockFactory
         .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
           LockUsage.TABLE_STATUS_LOCK)
@@ -1089,17 +1083,16 @@ object CarbonDataRDDFactory {
             LOGGER.info("Table status lock has been successfully acquired.")
 
             // read latest table status again.
-            val latestMetadata = segmentStatusManager.readLoadMetadata(loadMetadataFilePath)
+            val latestMetadata = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
 
             // update the metadata details from old to new status.
             val latestStatus = CarbonLoaderUtil
-              .updateLoadMetadataFromOldToNew(details, latestMetadata)
+                .updateLoadMetadataFromOldToNew(details, latestMetadata)
 
             CarbonLoaderUtil.writeLoadMetadata(
               carbonLoadModel.getCarbonDataLoadSchema,
               carbonLoadModel.getDatabaseName,
-              carbonLoadModel.getTableName, latestStatus
-            )
+              carbonLoadModel.getTableName, latestStatus)
           } else {
             val errorMsg = "Clean files request is failed for " +
                            s"${ carbonLoadModel.getDatabaseName }." +
@@ -1109,7 +1102,6 @@ object CarbonDataRDDFactory {
             LOGGER.audit(errorMsg)
             LOGGER.error(errorMsg)
             throw new Exception(errorMsg + " Please try after some time.")
-
           }
         } finally {
           CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 6c2e993..4e820c6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -21,8 +21,11 @@ import java.util
 import java.util.{Collections, List}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Random
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
@@ -32,22 +35,19 @@ import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties,
-TableBlockInfo, TableTaskInfo, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TaskBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.carbon.path.CarbonTablePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor,
-CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.spark.MergeResult
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.QueryPlanUtil
 
 
 class CarbonMergerRDD[K, V](
@@ -103,17 +103,17 @@ class CarbonMergerRDD[K, V](
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
       try {
-        var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
         val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
 
         // get destination segment properties as sent from driver which is of last segment.
 
-        val segmentProperties = new SegmentProperties(carbonMergerMapping.maxSegmentColumnSchemaList
-          .asJava,
+        val segmentProperties = new SegmentProperties(
+          carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
           carbonMergerMapping.maxSegmentColCardinality)
 
         // sorting the table block info List.
-        val tableBlockInfoList = carbonSparkPartition.tableBlockInfos
+        val splitList = carbonSparkPartition.split.value.getAllSplits
+        val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
 
         Collections.sort(tableBlockInfoList)
 
@@ -214,86 +214,46 @@ class CarbonMergerRDD[K, V](
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonSparkPartition]
-    theSplit.locations.filter(_ != "localhost")
+    theSplit.split.value.getLocations.filter(_ != "localhost")
   }
 
   override def getPartitions: Array[Partition] = {
-
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
       storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
-    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
-      QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
+    val jobConf: JobConf = new JobConf(new Configuration)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
     var defaultParallelism = sparkContext.defaultParallelism
     val result = new util.ArrayList[Partition](defaultParallelism)
 
     // mapping of the node and block list.
-    var nodeMapping: util.Map[String, util.List[Distributable]] = new
+    var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new
         util.HashMap[String, util.List[Distributable]]
 
-    var noOfBlocks = 0
-
-    val taskInfoList = new util.ArrayList[Distributable]
-
-    var blocksOfLastSegment: List[TableBlockInfo] = null
+    val noOfBlocks = 0
+    var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
 
     // for each valid segment.
     for (eachSeg <- carbonMergerMapping.validSegments) {
 
       // map for keeping the relation of a task and its blocks.
-      val taskIdMapping: util.Map[String, util.List[TableBlockInfo]] = new
-          util.HashMap[String, util.List[TableBlockInfo]]
-
       job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
 
       // get splits
-      val splits = carbonInputFormat.getSplits(job)
-      val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-
-      // take the blocks of one segment.
-      val blocksOfOneSegment = carbonInputSplits.map(inputSplit =>
-        new TableBlockInfo(inputSplit.getPath.toString,
-          inputSplit.getStart, inputSplit.getSegmentId,
-          inputSplit.getLocations, inputSplit.getLength
-        )
-      )
-
-      // keep on assigning till last one is reached.
-      if (null != blocksOfOneSegment && blocksOfOneSegment.nonEmpty) {
-        blocksOfLastSegment = blocksOfOneSegment.asJava
-      }
-
-      // populate the task and its block mapping.
-      blocksOfOneSegment.foreach(tableBlockInfo => {
-        val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(tableBlockInfo.getFilePath)
-        val blockList = taskIdMapping.get(taskNo)
-        if (null == blockList) {
-          val blockListTemp = new util.ArrayList[TableBlockInfo]()
-          blockListTemp.add(tableBlockInfo)
-          taskIdMapping.put(taskNo, blockListTemp)
-        } else {
-          blockList.add(tableBlockInfo)
-        }
-      }
-      )
-
-      noOfBlocks += blocksOfOneSegment.size
-      taskIdMapping.asScala.foreach(
-        entry =>
-          taskInfoList.add(new TableTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
-      )
+      val splits = format.getSplits(job)
+      carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
     }
 
     // prepare the details required to extract the segment properties using last segment.
-    if (null != blocksOfLastSegment && blocksOfLastSegment.size > 0) {
-      val lastBlockInfo = blocksOfLastSegment.get(blocksOfLastSegment.size - 1)
-
+    if (null != carbonInputSplits && carbonInputSplits.nonEmpty) {
+      val carbonInputSplit = carbonInputSplits.last
       var dataFileFooter: DataFileFooter = null
 
       try {
-        dataFileFooter = CarbonUtil.readMetadatFile(lastBlockInfo.getFilePath,
-          lastBlockInfo.getBlockOffset, lastBlockInfo.getBlockLength)
+        dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(),
+          carbonInputSplit.getStart, carbonInputSplit.getLength)
       } catch {
         case e: CarbonUtilException =>
           logError("Exception in preparing the data file footer for compaction " + e.getMessage)
@@ -306,16 +266,17 @@ class CarbonMergerRDD[K, V](
         .toList
     }
     // send complete list of blocks to the mapping util.
-    nodeMapping = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1)
+    nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(
+      carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1)
 
     val confExecutors = confExecutorsTemp.toInt
-    val requiredExecutors = if (nodeMapping.size > confExecutors) {
+    val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
       confExecutors
-    } else { nodeMapping.size() }
+    } else { nodeBlockMapping.size() }
     CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
-    logInfo("No.of Executors required=" + requiredExecutors
-            + " , spark.executor.instances=" + confExecutors
-            + ", no.of.nodes where data present=" + nodeMapping.size())
+    logInfo("No.of Executors required=" + requiredExecutors +
+            " , spark.executor.instances=" + confExecutors +
+            ", no.of.nodes where data present=" + nodeBlockMapping.size())
     var nodes = DistributionUtil.getNodeList(sparkContext)
     var maxTimes = 30
     while (nodes.length < requiredExecutors && maxTimes > 0) {
@@ -327,24 +288,23 @@ class CarbonMergerRDD[K, V](
     defaultParallelism = sparkContext.defaultParallelism
     var i = 0
 
-    val nodeTaskBlocksMap: util.Map[String, util.List[NodeInfo]] = new util.HashMap[String, util
-    .List[NodeInfo]]()
+    val nodeTaskBlocksMap = new util.HashMap[String, util.List[NodeInfo]]()
 
     // Create Spark Partition for each task and assign blocks
-    nodeMapping.asScala.foreach { entry =>
-
-      val taskBlockList: List[NodeInfo] = new util.ArrayList[NodeInfo](0)
-      nodeTaskBlocksMap.put(entry._1, taskBlockList)
-
-      val list = new util.ArrayList[TableBlockInfo]
-      entry._2.asScala.foreach(taskInfo => {
-        val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo]
-        list.addAll(blocksPerNode.getTableBlockInfoList)
-        taskBlockList
-          .add(new NodeInfo(blocksPerNode.getTaskId, blocksPerNode.getTableBlockInfoList.size))
-      })
-      if (list.size() != 0) {
-        result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, list))
+    nodeBlockMapping.asScala.foreach { case (nodeName, blockList) =>
+      val taskBlockList = new util.ArrayList[NodeInfo](0)
+      nodeTaskBlocksMap.put(nodeName, taskBlockList)
+      var blockletCount = 0
+      blockList.asScala.foreach { taskInfo =>
+        val blocksPerNode = taskInfo.asInstanceOf[CarbonInputSplit]
+        blockletCount = blockletCount + blocksPerNode.getNumberOfBlocklets
+        taskBlockList.add(
+          NodeInfo(blocksPerNode.taskId, blocksPerNode.getNumberOfBlocklets))
+      }
+      if (blockletCount != 0) {
+        val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
+          carbonInputSplits.asJava, nodeName)
+        result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
         i += 1
       }
     }
@@ -360,17 +320,14 @@ class CarbonMergerRDD[K, V](
 
     val noOfNodes = nodes.length
     val noOfTasks = result.size
-    logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
-            + s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
-    )
-    logInfo("Time taken to identify Blocks to scan: " + (System
-                                                           .currentTimeMillis() - startTime)
-    )
-    for (j <- 0 until result.size) {
-      val cp = result.get(j).asInstanceOf[CarbonSparkPartition]
-      logInfo(s"Node: " + cp.locations.toSeq.mkString(",")
-              + ", No.Of Blocks: " + cp.tableBlockInfos.size
-      )
+    logInfo(s"Identified  no.of.Blocks: $noOfBlocks," +
+            s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
+    logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
+    for (j <- 0 until result.size ) {
+      val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
+      val splitList = multiBlockSplit.getAllSplits
+      logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " +
+              s"${CarbonInputSplit.createBlocks(splitList).size}")
     }
     result.toArray(new Array[Partition](result.size))
   }



[2/4] incubator-carbondata git commit: change ScanRdd to use RecordReader

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7798e5c..e8d7399 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -17,49 +17,43 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.text.SimpleDateFormat
 import java.util
+import java.util.Date
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark.mapred.CarbonHadoopMapReduceUtil
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.hive.DistributionUtil
 
-import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo}
-import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
-import org.apache.carbondata.scan.executor.QueryExecutor
-import org.apache.carbondata.scan.executor.QueryExecutorFactory
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
 import org.apache.carbondata.scan.expression.Expression
-import org.apache.carbondata.scan.model.QueryModel
-import org.apache.carbondata.scan.result.BatchResult
-import org.apache.carbondata.scan.result.iterator.ChunkRowIterator
-import org.apache.carbondata.spark.RawValue
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.util.QueryPlanUtil
 
-
-class CarbonSparkPartition(rddId: Int, val idx: Int,
-    val locations: Array[String],
-    val tableBlockInfos: util.List[TableBlockInfo])
+class CarbonSparkPartition(
+    val rddId: Int,
+    val idx: Int,
+    @transient val multiBlockSplit: CarbonMultiBlockSplit)
   extends Partition {
 
+  val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
+
   override val index: Int = idx
 
-  // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
-  override def hashCode(): Int = {
-    41 * (41 + rddId) + idx
-  }
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
 }
 
 /**
@@ -68,169 +62,135 @@ class CarbonSparkPartition(rddId: Int, val idx: Int,
  * level filtering in driver side.
  */
 class CarbonScanRDD[V: ClassTag](
-    sc: SparkContext,
-    queryModel: QueryModel,
+    @transient sc: SparkContext,
+    columnProjection: Seq[Attribute],
     filterExpression: Expression,
-    keyClass: RawValue[V],
-    @transient conf: Configuration,
-    tableCreationTime: Long,
-    schemaLastUpdatedTime: Long,
-    baseStoreLocation: String)
-  extends RDD[V](sc, Nil) {
+    identifier: AbsoluteTableIdentifier,
+    @transient carbonTable: CarbonTable)
+  extends RDD[V](sc, Nil)
+    with CarbonHadoopMapReduceUtil
+    with Logging {
+
+  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
 
+  @transient private val jobId = new JobID(jobTrackerId, id)
+  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def getPartitions: Array[Partition] = {
-    var defaultParallelism = sparkContext.defaultParallelism
-    val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
-      QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
+    val job = Job.getInstance(new Configuration())
+    val format = prepareInputFormatForDriver(job.getConfiguration)
 
     // initialise query_id for job
-    job.getConfiguration.set("query.id", queryModel.getQueryId)
-
-    val result = new util.ArrayList[Partition](defaultParallelism)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier)
-      .getValidAndInvalidSegments
-    // set filter resolver tree
-    try {
-      // before applying filter check whether segments are available in the table.
-      if (!validAndInvalidSegments.getValidSegments.isEmpty) {
-        val filterResolver = carbonInputFormat
-          .getResolvedFilter(job.getConfiguration, filterExpression)
-        CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
-        queryModel.setFilterExpressionResolverTree(filterResolver)
-        CarbonInputFormat
-          .setSegmentsToAccess(job.getConfiguration,
-            validAndInvalidSegments.getValidSegments
-          )
-        SegmentTaskIndexStore.getInstance()
-          .removeTableBlocks(validAndInvalidSegments.getInvalidSegments,
-            queryModel.getAbsoluteTableIdentifier
-          )
-      }
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e)
-        sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
-    }
+    job.getConfiguration.set("query.id", queryId)
+
     // get splits
-    val splits = carbonInputFormat.getSplits(job)
+    val splits = format.getSplits(job)
+    val result = distributeSplits(splits)
+    result
+  }
+
+  private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
+    // this function distributes the split based on following logic:
+    // 1. based on data locality, to make split balanced on all available nodes
+    // 2. if the number of split for one
+
+    var statistic = new QueryStatistic()
+    val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+    val parallelism = sparkContext.defaultParallelism
+    val result = new util.ArrayList[Partition](parallelism)
+    var noOfBlocks = 0
+    var noOfNodes = 0
+    var noOfTasks = 0
+
     if (!splits.isEmpty) {
-      val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-      queryModel.setInvalidSegmentIds(validAndInvalidSegments.getInvalidSegments)
-      val blockListTemp = carbonInputSplits.map(inputSplit =>
-        new TableBlockInfo(inputSplit.getPath.toString,
-          inputSplit.getStart, inputSplit.getSegmentId,
-          inputSplit.getLocations, inputSplit.getLength,
-          new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets)
-        )
-      )
-      var activeNodes = Array[String]()
-      if (blockListTemp.nonEmpty) {
-        activeNodes = DistributionUtil
-          .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext)
-      }
-      defaultParallelism = sparkContext.defaultParallelism
-      val blockList = CarbonLoaderUtil.
-        distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala
-
-      if (blockList.nonEmpty) {
-        var statistic = new QueryStatistic()
-        // group blocks to nodes, tasks
-        val nodeBlockMapping =
-        CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
-          activeNodes.toList.asJava
-        )
-        statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
-        statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
-        statistic = new QueryStatistic()
-        var i = 0
-        // Create Spark Partition for each task and assign blocks
-        nodeBlockMapping.asScala.foreach { entry =>
-          entry._2.asScala.foreach { blocksPerTask => {
-            val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
-            if (blocksPerTask.size() != 0) {
-              result
-                .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
-              i += 1
-            }
+      // create a list of block based on split
+      val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+      // get the list of executors and map blocks to executors based on locality
+      val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+      // divide the blocks among the tasks of the nodes as per the data locality
+      val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+        parallelism, activeNodes.toList.asJava)
+
+      statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
+      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+      statistic = new QueryStatistic()
+
+      var i = 0
+      // Create Spark Partition for each task and assign blocks
+      nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+        blockList.asScala.foreach { blocksPerTask =>
+          val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+          if (blocksPerTask.size() != 0) {
+            val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
+            val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+            result.add(partition)
+            i += 1
           }
-          }
-        }
-        val noOfBlocks = blockList.size
-        val noOfNodes = nodeBlockMapping.size
-        val noOfTasks = result.size()
-        logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
-                + s"parallelism: $defaultParallelism , " +
-                s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
-        )
-        statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
-          System.currentTimeMillis)
-        statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
-        statisticRecorder.logStatisticsAsTableDriver()
-        result.asScala.foreach { r =>
-          val cp = r.asInstanceOf[CarbonSparkPartition]
-          logInfo(s"Node: ${ cp.locations.toSeq.mkString(",") }" +
-                  s", No.Of Blocks:  ${ cp.tableBlockInfos.size() }"
-          )
         }
-      } else {
-        logInfo("No blocks identified to scan")
       }
-    } else {
-      logInfo("No valid segments found to scan")
+
+      noOfBlocks = splits.size
+      noOfNodes = nodeBlockMapping.size
+      noOfTasks = result.size()
+
+      statistic = new QueryStatistic()
+      statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
+        System.currentTimeMillis)
+      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+      statisticRecorder.logStatisticsAsTableDriver()
     }
+    logInfo(
+      s"""
+         | Identified no.of.blocks: $noOfBlocks,
+         | no.of.tasks: $noOfTasks,
+         | no.of.nodes: $noOfNodes,
+         | parallelism: $parallelism
+       """.stripMargin)
     result.toArray(new Array[Partition](result.size()))
   }
 
-  override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val iter = new Iterator[V] {
-      var rowIterator: CarbonIterator[Array[Any]] = _
-      var queryStartTime: Long = 0
-      val queryExecutor = QueryExecutorFactory.getQueryExecutor()
-      try {
-        context.addTaskCompletionListener(context => {
-          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-          logStatistics()
-          queryExecutor.finish
-        })
-        val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
-        if (!carbonSparkPartition.tableBlockInfos.isEmpty) {
-          queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
-          // fill table block info
-          queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
-          queryStartTime = System.currentTimeMillis
-          val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
-          logInfo("*************************" + carbonPropertiesFilePath)
-          if (null == carbonPropertiesFilePath) {
-            System.setProperty("carbon.properties.filepath",
-              System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
-          }
-          // execute query
-          rowIterator = new ChunkRowIterator(
-            queryExecutor.execute(queryModel).
-              asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
+  override def compute(split: Partition, context: TaskContext): Iterator[V] = {
+    val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+    if (null == carbonPropertiesFilePath) {
+      System.setProperty("carbon.properties.filepath",
+        System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
+      )
+    }
 
-        }
-      } catch {
-        case e: Exception =>
-          LOGGER.error(e)
-          if (null != e.getMessage) {
-            sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
-          } else {
-            sys.error("Exception occurred in query execution.Please check logs.")
-          }
-      }
+    val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+    val attemptContext = newTaskAttemptContext(new Configuration(), attemptId)
+    val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
+    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+    val reader = format.createRecordReader(inputSplit, attemptContext)
+    reader.initialize(inputSplit, attemptContext)
+
+    val queryStartTime = System.currentTimeMillis
 
-      var havePair = false
-      var finished = false
-      var recordCount = 0
+    new Iterator[V] {
+      private var havePair = false
+      private var finished = false
+      private var count = 0
+
+      context.addTaskCompletionListener { context =>
+        logStatistics(queryStartTime, count)
+        reader.close()
+      }
 
       override def hasNext: Boolean = {
+        if (context.isInterrupted) {
+          throw new TaskKilledException
+        }
         if (!finished && !havePair) {
-          finished = (null == rowIterator) || (!rowIterator.hasNext)
+          finished = !reader.nextKeyValue
+          if (finished) {
+            reader.close()
+          }
           havePair = !finished
         }
         !finished
@@ -241,68 +201,55 @@ class CarbonScanRDD[V: ClassTag](
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        recordCount += 1
-        keyClass.getValue(rowIterator.next())
+        val value: V = reader.getCurrentValue
+        count += 1
+        value
       }
+    }
+  }
 
-      def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
-        if (null != columnToDictionaryMap) {
-          org.apache.carbondata.spark.util.CarbonQueryUtil
-            .clearColumnDictionaryCache(columnToDictionaryMap)
-        }
-      }
+  private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
+    CarbonInputFormat.setCarbonTable(conf, carbonTable)
+    createInputFormat(conf)
+  }
 
-      def logStatistics(): Unit = {
-        if (null != queryModel.getStatisticsRecorder) {
-          var queryStatistic = new QueryStatistic()
-          queryStatistic
-            .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
-              System.currentTimeMillis - queryStartTime
-            )
-          queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
-          // result size
-          queryStatistic = new QueryStatistic()
-          queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
-          queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
-          // print executor query statistics for each task_id
-          queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
-        }
-      }
+  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
+    CarbonInputFormat.setCarbonReadSupport(classOf[RawDataReadSupport], conf)
+    createInputFormat(conf)
+  }
+
+  private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
+    val format = new CarbonInputFormat[V]
+    CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
+    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+    val projection = new CarbonProjection
+    columnProjection.foreach { attr =>
+      projection.addColumn(attr.name)
     }
+    CarbonInputFormat.setColumnProjection(conf, projection)
+    format
+  }
 
-    iter
+  def logStatistics(queryStartTime: Long, recordCount: Int): Unit = {
+    var queryStatistic = new QueryStatistic()
+    queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+      System.currentTimeMillis - queryStartTime)
+    val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
+    statisticRecorder.recordStatistics(queryStatistic)
+    // result size
+    queryStatistic = new QueryStatistic()
+    queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
+    statisticRecorder.recordStatistics(queryStatistic)
+    // print executor query statistics for each task_id
+    statisticRecorder.logStatisticsAsTableExecutor()
   }
 
   /**
    * Get the preferred locations where to launch this task.
    */
-  override def getPreferredLocations(partition: Partition): Seq[String] = {
-    val theSplit = partition.asInstanceOf[CarbonSparkPartition]
-    val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
-    val tableBlocks = theSplit.tableBlockInfos
-    // node name and count mapping
-    val blockMap = new util.LinkedHashMap[String, Integer]()
-
-    tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
-      location => {
-        if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
-          val currentCount = blockMap.get(location)
-          if (currentCount == null) {
-            blockMap.put(location, 1)
-          } else {
-            blockMap.put(location, currentCount + 1)
-          }
-        }
-      }
-    )
-    )
-
-    val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
-      nodeCount1.getValue > nodeCount2.getValue
-    }
-    )
-
-    val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
-    firstOptionLocation ++ sortedNodesList
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[CarbonSparkPartition]
+    val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
+    firstOptionLocation
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 9c9be8d..5fdbc5d 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -73,17 +73,8 @@ object Compactor {
       maxSegmentColumnSchemaList = null
     )
     carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
-    val segmentStatusManager = new SegmentStatusManager(new AbsoluteTableIdentifier
-    (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION),
-      new CarbonTableIdentifier(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName,
-        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId
-      )
-    )
-    )
-    carbonLoadModel.setLoadMetadataDetails(segmentStatusManager
-      .readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava
-    )
+    carbonLoadModel.setLoadMetadataDetails(
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
     var execInstance = "1"
     // in case of non dynamic executor allocation, number of executors are fixed.
     if (sc.sparkContext.getConf.contains("spark.executor.instances")) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
deleted file mode 100644
index c55c807..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
-
-
-/**
- * All the utility functions for carbon plan creation
- */
-object QueryPlanUtil {
-
-  /**
-   * createCarbonInputFormat from query model
-   */
-  def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
-  (CarbonInputFormat[Array[Object]], Job) = {
-    val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
-    val jobConf: JobConf = new JobConf(new Configuration)
-    val job: Job = new Job(jobConf)
-    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
-    (carbonInputFormat, job)
-  }
-
-  def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
-      conf: Configuration) : CarbonInputFormat[V] = {
-    val carbonInputFormat = new CarbonInputFormat[V]()
-    val job: Job = new Job(conf)
-    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
-    carbonInputFormat
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index c9d2a0f..ca0ad58 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,14 +38,13 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
 import org.apache.carbondata.scan.expression.logical.AndExpression
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
-import org.apache.carbondata.spark.util.QueryPlanUtil
 
 
 private[sql] case class CarbonDatasourceHadoopRelation(
@@ -104,7 +103,7 @@ private[sql] case class CarbonDatasourceHadoopRelation(
 
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
-    CarbonInputFormat.setColumnProjection(projection, conf)
+    CarbonInputFormat.setColumnProjection(conf, projection)
     CarbonInputFormat.setCarbonReadSupport(classOf[SparkRowReadSupportImpl], conf)
 
     new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
@@ -145,12 +144,11 @@ class CarbonHadoopFSRDD[V: ClassTag](
     context: TaskContext): Iterator[V] = {
     val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
     val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
-    val inputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
-      hadoopAttemptContext.getConfiguration
-    )
+    val job: Job = new Job(hadoopAttemptContext.getConfiguration)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job)
     hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
     val reader =
-      inputFormat.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
+      format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
         hadoopAttemptContext
       )
     reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
@@ -186,11 +184,9 @@ class CarbonHadoopFSRDD[V: ClassTag](
 
   override protected def getPartitions: Array[Partition] = {
     val jobContext = newJobContext(conf.value, jobId)
-    val carbonInputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
-      jobContext.getConfiguration
-    )
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value))
     jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
-    val splits = carbonInputFormat.getSplits(jobContext).toArray
+    val splits = format.getSplits(jobContext).toArray
     val carbonInputSplits = splits
       .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
     carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 069e106..a06d5cb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -272,9 +272,8 @@ case class CarbonRelation(
   private var sizeInBytesLocalValue = 0L
 
   def sizeInBytes: Long = {
-    val tableStatusNewLastUpdatedTime = new SegmentStatusManager(
+    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
       tableMeta.carbonTable.getAbsoluteTableIdentifier)
-        .getTableStatusLastModifiedTime
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
       val tablePath = CarbonStorePath.getCarbonTablePath(
         tableMeta.storePath,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
deleted file mode 100644
index c105cae..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.ArrayList
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.scan.model._
-import org.apache.carbondata.spark.{CarbonFilters, RawValue, RawValueImpl}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-
-case class CarbonScan(
-    var attributesRaw: Seq[Attribute],
-    relationRaw: CarbonRelation,
-    dimensionPredicatesRaw: Seq[Expression],
-    useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
-  val carbonTable = relationRaw.metaData.carbonTable
-  val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
-  val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
-  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
-
-  val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
-  val unprocessedExprs = new ArrayBuffer[Expression]()
-
-  val buildCarbonPlan: CarbonQueryPlan = {
-    val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
-
-    plan.setSortedDimemsions(new ArrayList[QueryDimension])
-
-    plan.setOutLocationPath(
-      CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
-    plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-    processFilterExpressions(plan)
-    plan
-  }
-
-  def processFilterExpressions(plan: CarbonQueryPlan) {
-    if (dimensionPredicatesRaw.nonEmpty) {
-      val expressionVal = CarbonFilters.processExpression(
-        dimensionPredicatesRaw,
-        attributesNeedToDecode,
-        unprocessedExprs,
-        carbonTable)
-      expressionVal match {
-        case Some(ce) =>
-          // adding dimension used in expression in querystats
-          plan.setFilterExpression(ce)
-        case _ =>
-      }
-    }
-    processExtraAttributes(plan)
-  }
-
-  private def processExtraAttributes(plan: CarbonQueryPlan) {
-    if (attributesNeedToDecode.size() > 0) {
-      val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
-
-      attributesNeedToDecode.asScala.foreach { attr =>
-        if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
-          attributeOut += attr
-        }
-      }
-      attributesRaw = attributeOut
-    }
-
-    val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
-    val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
-    val dimAttr = new Array[Attribute](dimensions.size())
-    val msrAttr = new Array[Attribute](measures.size())
-    attributesRaw.foreach { attr =>
-      val carbonDimension =
-        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-      if(carbonDimension != null) {
-        dimAttr(dimensions.indexOf(carbonDimension)) = attr
-      } else {
-        val carbonMeasure =
-          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-        if(carbonMeasure != null) {
-          msrAttr(measures.indexOf(carbonMeasure)) = attr
-        }
-      }
-    }
-
-    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
-
-    var queryOrder: Integer = 0
-    attributesRaw.foreach { attr =>
-      val carbonDimension =
-        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-      if (carbonDimension != null) {
-        val dim = new QueryDimension(attr.name)
-        dim.setQueryOrder(queryOrder)
-        queryOrder = queryOrder + 1
-        selectedDims += dim
-      } else {
-        val carbonMeasure =
-          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-        if (carbonMeasure != null) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setQueryOrder(queryOrder)
-          queryOrder = queryOrder + 1
-          selectedMsrs += m1
-        }
-      }
-    }
-
-    // Fill the selected dimensions & measures obtained from
-    // attributes to query plan  for detailed query
-    selectedDims.foreach(plan.addDimension)
-    selectedMsrs.foreach(plan.addMeasure)
-  }
-
-
-  def inputRdd: CarbonScanRDD[Array[Any]] = {
-
-    val conf = new Configuration()
-    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val model = QueryModel.createModel(
-      absoluteTableIdentifier, buildCarbonPlan, carbonTable)
-    val kv: RawValue[Array[Any]] = new RawValueImpl
-    // setting queryid
-    buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-
-    val tableCreationTime = carbonCatalog
-        .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
-    val schemaLastUpdatedTime = carbonCatalog
-        .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
-    val big = new CarbonScanRDD(
-      ocRaw.sparkContext,
-      model,
-      buildCarbonPlan.getFilterExpression,
-      kv,
-      conf,
-      tableCreationTime,
-      schemaLastUpdatedTime,
-      carbonCatalog.storePath)
-    big
-  }
-
-
-  override def outputsUnsafeRows: Boolean =
-    (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-
-  override def doExecute(): RDD[InternalRow] = {
-    val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-    inputRdd.mapPartitions { iter =>
-      val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
-      new Iterator[InternalRow] {
-        override def hasNext: Boolean = iter.hasNext
-
-        override def next(): InternalRow =
-          if (outUnsafeRows) {
-            unsafeProjection(new GenericMutableRow(iter.next()))
-          } else {
-            new GenericMutableRow(iter.next())
-          }
-      }
-    }
-  }
-
-  def output: Seq[Attribute] = {
-    attributesRaw
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
new file mode 100644
index 0000000..6580c4f
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.ArrayList
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.LeafNode
+import org.apache.spark.sql.hive.CarbonMetastoreCatalog
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.scan.model._
+import org.apache.carbondata.spark.CarbonFilters
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class CarbonScan(
+    var attributesRaw: Seq[Attribute],
+    relationRaw: CarbonRelation,
+    dimensionPredicatesRaw: Seq[Expression],
+    useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
+  val carbonTable = relationRaw.metaData.carbonTable
+  val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
+  val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
+  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
+
+  val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
+  val unprocessedExprs = new ArrayBuffer[Expression]()
+
+  val buildCarbonPlan: CarbonQueryPlan = {
+    val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
+
+    plan.setSortedDimemsions(new ArrayList[QueryDimension])
+
+    plan.setOutLocationPath(
+      CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
+    plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+    processFilterExpressions(plan)
+    plan
+  }
+
+  def processFilterExpressions(plan: CarbonQueryPlan) {
+    if (dimensionPredicatesRaw.nonEmpty) {
+      val expressionVal = CarbonFilters.processExpression(
+        dimensionPredicatesRaw,
+        attributesNeedToDecode,
+        unprocessedExprs,
+        carbonTable)
+      expressionVal match {
+        case Some(ce) =>
+          // adding dimension used in expression in querystats
+          plan.setFilterExpression(ce)
+        case _ =>
+      }
+    }
+    processExtraAttributes(plan)
+  }
+
+  private def processExtraAttributes(plan: CarbonQueryPlan) {
+    if (attributesNeedToDecode.size() > 0) {
+      val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
+
+      attributesNeedToDecode.asScala.foreach { attr =>
+        if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
+          attributeOut += attr
+        }
+      }
+      attributesRaw = attributeOut
+    }
+
+    val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+    val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+    val dimAttr = new Array[Attribute](dimensions.size())
+    val msrAttr = new Array[Attribute](measures.size())
+    attributesRaw.foreach { attr =>
+      val carbonDimension =
+        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+      if(carbonDimension != null) {
+        dimAttr(dimensions.indexOf(carbonDimension)) = attr
+      } else {
+        val carbonMeasure =
+          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+        if(carbonMeasure != null) {
+          msrAttr(measures.indexOf(carbonMeasure)) = attr
+        }
+      }
+    }
+
+    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
+
+    var queryOrder: Integer = 0
+    attributesRaw.foreach { attr =>
+      val carbonDimension =
+        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+      if (carbonDimension != null) {
+        val dim = new QueryDimension(attr.name)
+        dim.setQueryOrder(queryOrder)
+        queryOrder = queryOrder + 1
+        selectedDims += dim
+      } else {
+        val carbonMeasure =
+          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+        if (carbonMeasure != null) {
+          val m1 = new QueryMeasure(attr.name)
+          m1.setQueryOrder(queryOrder)
+          queryOrder = queryOrder + 1
+          selectedMsrs += m1
+        }
+      }
+    }
+
+    // Fill the selected dimensions & measures obtained from
+    // attributes to query plan  for detailed query
+    selectedDims.foreach(plan.addDimension)
+    selectedMsrs.foreach(plan.addMeasure)
+  }
+
+  def inputRdd: CarbonScanRDD[Array[Any]] = {
+
+    val conf = new Configuration()
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+
+    // setting queryid
+    buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+
+    val tableCreationTime = carbonCatalog
+        .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
+    val schemaLastUpdatedTime = carbonCatalog
+        .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
+    new CarbonScanRDD(
+      ocRaw.sparkContext,
+      attributesRaw,
+      buildCarbonPlan.getFilterExpression,
+      absoluteTableIdentifier,
+      carbonTable
+    )
+  }
+
+  override def outputsUnsafeRows: Boolean =
+    (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
+
+  override def doExecute(): RDD[InternalRow] = {
+    val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
+    inputRdd.mapPartitions { iter =>
+      val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = iter.hasNext
+
+        override def next(): InternalRow = {
+          val value = iter.next
+          if (outUnsafeRows) {
+            unsafeProjection(new GenericMutableRow(value))
+          } else {
+            new GenericMutableRow(value)
+          }
+        }
+      }
+    }
+  }
+
+  def output: Seq[Attribute] = {
+    attributesRaw
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index a6b4ec5..74b0dd2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -924,10 +924,9 @@ private[sql] case class DeleteLoadsById(
     }
     val path = carbonTable.getMetaDataFilepath
 
-    val segmentStatusManager =
-      new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
     try {
-      val invalidLoadIds = segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala
+      val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
+        carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
 
       if (invalidLoadIds.isEmpty) {
 
@@ -986,8 +985,6 @@ private[sql] case class DeleteLoadsByLoadDate(
 
     val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
       .getCarbonTable(dbName + '_' + tableName)
-    val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-
     if (null == carbonTable) {
       var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
         .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
@@ -995,8 +992,9 @@ private[sql] case class DeleteLoadsByLoadDate(
     val path = carbonTable.getMetaDataFilepath()
 
     try {
-      val invalidLoadTimestamps = segmentStatusManager
-        .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
+      val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
+        carbonTable.getAbsoluteTableIdentifier, loadDate, path,
+        timeObj.asInstanceOf[java.lang.Long]).asScala
       if (invalidLoadTimestamps.isEmpty) {
         LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
       }
@@ -1328,12 +1326,8 @@ private[sql] case class ShowLoads(
     if (carbonTable == null) {
       sys.error(s"$databaseName.$tableName is not found")
     }
-    val path = carbonTable.getMetaDataFilepath()
-
-    val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-
-    val loadMetadataDetailsArray = segmentStatusManager.readLoadMetadata(path)
-
+    val path = carbonTable.getMetaDataFilepath
+    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
     if (loadMetadataDetailsArray.nonEmpty) {
 
       val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 0c13293..25c36c5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.hive
 
-
 import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
 
 import scala.collection.JavaConverters._
@@ -44,12 +43,9 @@ object DistributionUtil {
    * localhost for retriving executor list
    */
   def getNodeList(sparkContext: SparkContext): Array[String] = {
-
-    val arr =
-      sparkContext.getExecutorMemoryStatus.map {
-        kv =>
-          kv._1.split(":")(0)
-      }.toSeq
+    val arr = sparkContext.getExecutorMemoryStatus.map { kv =>
+      kv._1.split(":")(0)
+    }.toSeq
     val localhostIPs = getLocalhostIPs
     val selectedLocalIPList = localhostIPs.filter(arr.contains(_))
 
@@ -109,10 +105,9 @@ object DistributionUtil {
    * @param sparkContext
    * @return
    */
-  def ensureExecutorsAndGetNodeList(blockList: Array[Distributable],
-      sparkContext: SparkContext):
-  Array[String] = {
-    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+  def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
+    sparkContext: SparkContext): Seq[String] = {
+    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
     var confExecutorsTemp: String = null
     if (sparkContext.getConf.contains("spark.executor.instances")) {
       confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
@@ -131,7 +126,9 @@ object DistributionUtil {
     }
     val requiredExecutors = if (nodeMapping.size > confExecutors) {
       confExecutors
-    } else { nodeMapping.size() }
+    } else {
+      nodeMapping.size()
+    }
 
     val startTime = System.currentTimeMillis()
     CarbonContext.ensureExecutors(sparkContext, requiredExecutors)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index cc00c47..f02d4e7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -35,10 +35,9 @@ import org.apache.carbondata.core.util.CarbonProperties
 class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
-
+    clean
     val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
       .getCanonicalPath
-
     sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active
 _phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')");
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -52,11 +51,14 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  def clean{
+    sql("drop table if exists Carbon_automation_test")
+    sql("drop table if exists Carbon_automation_hive")
+    sql("drop table if exists Carbon_automation_test_hive")
+  }
+  
   override def afterAll {
-    sql("drop table Carbon_automation_test")
-    sql("drop table Carbon_automation_hive")
-    sql("drop table Carbon_automation_test_hive")
-
+    clean
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
   }
@@ -425,10 +427,10 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
   })
 
   //TC_103
-  test("select variance(deviceInformationId) as a   from Carbon_automation_test")({
+  test("select variance(deviceInformationId) as a   from carbon_automation_test")({
     checkAnswer(
-      sql("select variance(deviceInformationId) as a   from Carbon_automation_test"),
-      sql("select variance(deviceInformationId) as a   from Carbon_automation_hive"))
+      sql("select variance(deviceInformationId) as a   from carbon_automation_test"),
+      sql("select variance(deviceInformationId) as a   from carbon_automation_hive"))
   })
    //TC_105
   test("select var_samp(deviceInformationId) as a  from Carbon_automation_test")({

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index 7343a81..924d91a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -113,26 +113,22 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
     sql("clean files for table table2")
 
     // check for table 1.
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier1 = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr")
-        )
-    )
+          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr"))
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier1)
+        .getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0"))
     assert(!segments.contains("1"))
     // check for table 2.
-    val segmentStatusManager2: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier2 = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1")
-        )
-    )
+          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1"))
     // merged segment should not be there
-    val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments2 = SegmentStatusManager.getSegmentStatus(identifier2)
+        .getValidSegments.asScala.toList
     assert(segments2.contains("0.1"))
     assert(!segments2.contains("0"))
     assert(!segments2.contains("1"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
index 80a2320..f5039a7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
@@ -43,9 +43,6 @@ class DataCompactionBoundaryConditionsTest extends QueryTest with BeforeAndAfter
   val carbonTableIdentifier: CarbonTableIdentifier =
     new CarbonTableIdentifier("default", "boundarytest".toLowerCase(), "1")
 
-  val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-      AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
-
   override def beforeAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index d780efe..f77ec9b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -86,13 +86,13 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
     var noOfRetries = 0
     while (status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val identifier = new AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
+            new CarbonTableIdentifier(
+              CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
           )
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      val segments = SegmentStatusManager.getSegmentStatus(identifier)
+          .getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         // wait for 2 seconds for compaction to complete.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index eb889d6..7ec6431 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -105,16 +105,11 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
     * Compaction should fail as lock is being held purposefully
     */
   test("check if compaction is failed or not.") {
-
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
-        absoluteTableIdentifier
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
-
+      val segments = SegmentStatusManager.getSegmentStatus(absoluteTableIdentifier)
+          .getValidSegments.asScala.toList
       if (!segments.contains("0.1")) {
         assert(true)
-      }
-      else {
+      } else {
         assert(false)
       }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
index fbb39d8..15ed78b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
@@ -45,8 +45,7 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
   val carbonTableIdentifier: CarbonTableIdentifier =
     new CarbonTableIdentifier("default", "minorthreshold".toLowerCase(), "1")
 
-  val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-      AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
+  val identifier = new AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier)
 
   override def beforeAll {
     CarbonProperties.getInstance()
@@ -96,7 +95,8 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
 
     sql("clean files for table minorthreshold")
 
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier)
+        .getValidSegments.asScala.toList
 
     assert(segments.contains("0.2"))
     assert(!segments.contains("0.1"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
index c7be22f..570bb72 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
@@ -38,14 +38,10 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll {
 
   // return segment details
   def getSegments(databaseName : String, tableName : String, tableId : String): List[String] = {
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId)
-        )
-    )
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
-    segments
+          new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId))
+    SegmentStatusManager.getSegmentStatus(identifier).getValidSegments.asScala.toList
   }
 
   val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 3eef8b7..137cebc 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -84,13 +84,12 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
     var noOfRetries = 0
     while (status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val identifier = new AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
             new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "1")
           )
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      val segments = SegmentStatusManager.getSegmentStatus(identifier)
+          .getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         // wait for 2 seconds for compaction to complete.
@@ -131,15 +130,14 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
     // delete merged segments
     sql("clean files for table normalcompaction")
 
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
           new CarbonTableIdentifier(
             CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "uniqueid")
         )
-    )
     // merged segment should not be there
-    val segments   = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier)
+        .getValidSegments.asScala.toList
     assert(!segments.contains("0"))
     assert(!segments.contains("1"))
     assert(!segments.contains("2"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 99e3d56..a1664a6 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -97,14 +97,13 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     var noOfRetries = 0
     while (!status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val identifier = new AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
             new CarbonTableIdentifier(
               CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", noOfRetries + "")
           )
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      val segments = SegmentStatusManager.getSegmentStatus(identifier)
+          .getValidSegments.asScala.toList
       segments.foreach(seg =>
         System.out.println( "valid segment is =" + seg)
       )
@@ -129,15 +128,14 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     // delete merged segments
     sql("clean files for table ignoremajor")
 
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
           new CarbonTableIdentifier(
             CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
         )
-    )
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier)
+        .getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(segments.contains("2.1"))
     assert(!segments.contains("2"))
@@ -156,13 +154,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     catch {
       case _:Throwable => assert(true)
     }
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
-          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(
-            CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
-        )
-    )
     val carbontablePath = CarbonStorePath
       .getCarbonTablePath(CarbonProperties.getInstance
         .getProperty(CarbonCommonConstants.STORE_LOCATION),
@@ -170,7 +161,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
       )
       .getMetadataDirectoryPath
-    var segs = segmentStatusManager.readLoadMetadata(carbontablePath)
+    val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted.
     assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))
@@ -185,13 +176,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
       "DELETE SEGMENTS FROM TABLE ignoremajor where STARTTIME before" +
         " '2222-01-01 19:35:01'"
     )
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
-          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(
-            CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
-        )
-    )
     val carbontablePath = CarbonStorePath
       .getCarbonTablePath(CarbonProperties.getInstance
         .getProperty(CarbonCommonConstants.STORE_LOCATION),
@@ -199,7 +183,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
       )
       .getMetadataDirectoryPath
-    var segs = segmentStatusManager.readLoadMetadata(carbontablePath)
+    val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted for segment 2.
     assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 3745e11..25087a7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -87,14 +87,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     var noOfRetries = 0
     while (!status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val identifier = new AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
             new CarbonTableIdentifier(
               CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "")
           )
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      val segments = SegmentStatusManager.getSegmentStatus(identifier)
+          .getValidSegments.asScala.toList
       segments.foreach(seg =>
         System.out.println( "valid segment is =" + seg)
       )
@@ -119,14 +118,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     // delete merged segments
     sql("clean files for table stopmajor")
 
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
           new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr")
         )
-    )
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier)
+        .getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0.2"))
     assert(!segments.contains("0"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index bed6428..6d3cdec 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -55,7 +55,6 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
       AbsoluteTableIdentifier(storeLocation,
         new CarbonTableIdentifier(
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, "DataRetentionTable".toLowerCase(), "300"))
-  val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(absoluteTableIdentifierForRetention)
   val carbonTablePath = CarbonStorePath
     .getCarbonTablePath(absoluteTableIdentifierForRetention.getStorePath,
       absoluteTableIdentifierForRetention.getCarbonTableIdentifier).getMetadataDirectoryPath
@@ -133,8 +132,8 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("RetentionTest_DeleteSegmentsByLoadTime") {
-    val segments: Array[LoadMetadataDetails] = segmentStatusManager
-      .readLoadMetadata(carbonTablePath)
+    val segments: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(carbonTablePath)
     // check segment length, it should be 3 (loads)
     if (segments.length != 2) {
       assert(false)


[4/4] incubator-carbondata git commit: [CARBONDATA-308] Use CarbonInputFormat in CarbonScanRDD compute This closes #262

Posted by ra...@apache.org.
[CARBONDATA-308] Use CarbonInputFormat in CarbonScanRDD compute This closes #262


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e05c0d5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e05c0d5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e05c0d5d

Branch: refs/heads/master
Commit: e05c0d5da95e503e532aca5833c99f2f51021dc4
Parents: 5c697e9 5f6a56c
Author: ravipesala <ra...@gmail.com>
Authored: Fri Nov 25 15:50:37 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Nov 25 15:50:37 2016 +0530

----------------------------------------------------------------------
 .../carbon/datastore/block/Distributable.java   |   8 +-
 .../carbon/datastore/block/TableBlockInfo.java  |   3 +-
 .../carbon/datastore/block/TableTaskInfo.java   |   2 +-
 .../scan/filter/FilterExpressionProcessor.java  |  29 +-
 .../examples/DataFrameAPIExample.scala          |   5 +-
 .../carbondata/hadoop/CarbonInputFormat.java    | 334 +++++-----------
 .../carbondata/hadoop/CarbonInputSplit.java     | 112 +++++-
 .../hadoop/CarbonMultiBlockSplit.java           | 105 +++++
 .../carbondata/hadoop/CarbonRecordReader.java   |  34 +-
 .../hadoop/readsupport/CarbonReadSupport.java   |   2 +-
 .../AbstractDictionaryDecodedReadSupport.java   |   2 +-
 .../impl/ArrayWritableReadSupport.java          |   2 +-
 .../readsupport/impl/RawDataReadSupport.java    |   6 +-
 .../hadoop/util/CarbonInputFormatUtil.java      |  14 +
 .../hadoop/ft/CarbonInputMapperTest.java        |  15 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |  99 +----
 .../spark/merger/CarbonDataMergerUtil.java      |  11 +-
 .../readsupport/SparkRowReadSupportImpl.java    |   9 +-
 .../carbondata/spark/util/LoadMetadataUtil.java |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  20 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 151 +++-----
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 387 ++++++++-----------
 .../apache/carbondata/spark/rdd/Compactor.scala |  13 +-
 .../carbondata/spark/util/QueryPlanUtil.scala   |  56 ---
 .../sql/CarbonDatasourceHadoopRelation.scala    |  20 +-
 .../spark/sql/CarbonDatasourceRelation.scala    |   3 +-
 .../org/apache/spark/sql/CarbonOperators.scala  | 191 ---------
 .../scala/org/apache/spark/sql/CarbonScan.scala | 186 +++++++++
 .../execution/command/carbonTableSchema.scala   |  20 +-
 .../spark/sql/hive/DistributionUtil.scala       |  21 +-
 .../AllDataTypesTestCaseAggregate.scala         |  20 +-
 .../CompactionSystemLockFeatureTest.scala       |  20 +-
 .../DataCompactionBoundaryConditionsTest.scala  |   3 -
 .../DataCompactionCardinalityBoundryTest.scala  |  10 +-
 .../datacompaction/DataCompactionLockTest.scala |  11 +-
 .../DataCompactionMinorThresholdTest.scala      |   6 +-
 .../DataCompactionNoDictionaryTest.scala        |  10 +-
 .../datacompaction/DataCompactionTest.scala     |  14 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |  32 +-
 .../MajorCompactionStopsAfterCompaction.scala   |  14 +-
 .../dataretention/DataRetentionTestCase.scala   |   5 +-
 .../lcm/status/SegmentStatusManager.java        | 160 ++++----
 42 files changed, 964 insertions(+), 1205 deletions(-)
----------------------------------------------------------------------