You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2016/09/17 20:56:44 UTC

[1/2] incubator-carbondata git commit: Fixed out of memory issue during query execution, as invalid segments are not getting deleted from Btree

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 0d9970f66 -> 381b08a01


Fixed out of memory issue during query execution, as invalid segments are not getting deleted from Btree


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/286904ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/286904ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/286904ea

Branch: refs/heads/master
Commit: 286904eaf2454441d200dcb8bba1d0c9d537a1e2
Parents: 0d9970f
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Sep 15 14:11:00 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Sun Sep 18 02:22:19 2016 +0530

----------------------------------------------------------------------
 .../core/carbon/datastore/BlockIndexStore.java  |  60 +++++++++-
 .../carbon/datastore/SegmentTaskIndexStore.java |   6 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  21 ++++
 .../executor/impl/AbstractQueryExecutor.java    |  13 +-
 .../carbondata/scan/model/QueryModel.java       |  16 +++
 .../carbondata/hadoop/CarbonInputFormat.java    | 120 +++++++++++--------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  20 +++-
 .../CompactionSystemLockFeatureTest.scala       |   4 +-
 .../DataCompactionCardinalityBoundryTest.scala  |   2 +-
 .../datacompaction/DataCompactionLockTest.scala |   2 +-
 .../DataCompactionNoDictionaryTest.scala        |   2 +-
 .../datacompaction/DataCompactionTest.scala     |   4 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |   4 +-
 .../MajorCompactionStopsAfterCompaction.scala   |   4 +-
 .../lcm/status/SegmentStatusManager.java        |  80 ++++++++-----
 15 files changed, 247 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index 07815c0..4a36373 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.carbon.datastore;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -62,6 +63,12 @@ public class BlockIndexStore {
   private Map<AbsoluteTableIdentifier, Map<TableBlockInfo, AbstractIndex>> tableBlocksMap;
 
   /**
+   * map to maintain segment id to block info map, this map will be used to
+   * while removing the block from memory when segment is compacted or deleted
+   */
+  private Map<AbsoluteTableIdentifier, Map<String, List<TableBlockInfo>>> segmentIdToBlockListMap;
+
+  /**
    * map of block info to lock object map, while loading the btree this will be filled
    * and removed after loading the tree for that particular block info, this will be useful
    * while loading the tree concurrently so only block level lock will be applied another
@@ -83,6 +90,7 @@ public class BlockIndexStore {
     tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
         CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     blockInfoLock = new ConcurrentHashMap<TableBlockInfo, Object>();
+    segmentIdToBlockListMap = new ConcurrentHashMap<>();
   }
 
   /**
@@ -129,6 +137,7 @@ public class BlockIndexStore {
         tableBlockMapTemp = new ConcurrentHashMap<TableBlockInfo, AbstractIndex>();
         tableBlocksMap.put(absoluteTableIdentifier, tableBlockMapTemp);
       }
+      fillSegmentIdToTableInfoMap(tableBlocksInfos, absoluteTableIdentifier);
     }
     AbstractIndex tableBlock = null;
     List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>();
@@ -189,6 +198,32 @@ public class BlockIndexStore {
   }
 
   /**
+   * Below method will be used to fill segment id to its block mapping map.
+   * it will group all the table block info based on segment id and it will fill
+   *
+   * @param tableBlockInfos         table block infos
+   * @param absoluteTableIdentifier absolute table identifier
+   */
+  private void fillSegmentIdToTableInfoMap(List<TableBlockInfo> tableBlockInfos,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    Map<String, List<TableBlockInfo>> map = segmentIdToBlockListMap.get(absoluteTableIdentifier);
+    if (null == map) {
+      map = new ConcurrentHashMap<String, List<TableBlockInfo>>();
+      segmentIdToBlockListMap.put(absoluteTableIdentifier, map);
+    }
+    for (TableBlockInfo info : tableBlockInfos) {
+      List<TableBlockInfo> tempTableBlockInfos = map.get(info.getSegmentId());
+      if (null == tempTableBlockInfos) {
+        tempTableBlockInfos = new ArrayList<>();
+        map.put(info.getSegmentId(), tempTableBlockInfos);
+      }
+      if (!tempTableBlockInfos.contains(info)) {
+        tempTableBlockInfos.add(info);
+      }
+    }
+  }
+
+  /**
    * Below method will be used to fill the loaded blocks to the array
    * which will be used for query execution
    *
@@ -246,10 +281,10 @@ public class BlockIndexStore {
    * deletion of some of the blocks in case of retention or may be some other
    * scenario
    *
-   * @param removeTableBlocksInfos  blocks to be removed
+   * @param segmentsToBeRemoved     list of segments to be removed
    * @param absoluteTableIdentifier absolute table identifier
    */
-  public void removeTableBlocks(List<TableBlockInfo> removeTableBlocksInfos,
+  public void removeTableBlocks(List<String> segmentsToBeRemoved,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
     // get the lock object if lock object is not present then it is not
     // loaded at all
@@ -260,11 +295,26 @@ public class BlockIndexStore {
     }
     Map<TableBlockInfo, AbstractIndex> map = tableBlocksMap.get(absoluteTableIdentifier);
     // if there is no loaded blocks then return
-    if (null == map) {
+    if (null == map || map.isEmpty()) {
       return;
     }
-    for (TableBlockInfo blockInfos : removeTableBlocksInfos) {
-      map.remove(blockInfos);
+    Map<String, List<TableBlockInfo>> segmentIdToBlockInfoMap =
+        segmentIdToBlockListMap.get(absoluteTableIdentifier);
+    if (null == segmentIdToBlockInfoMap || segmentIdToBlockInfoMap.isEmpty()) {
+      return;
+    }
+    synchronized (lockObject) {
+      for (String segmentId : segmentsToBeRemoved) {
+        List<TableBlockInfo> tableBlockInfoList = segmentIdToBlockInfoMap.remove(segmentId);
+        if (null == tableBlockInfoList) {
+          continue;
+        }
+        Iterator<TableBlockInfo> tableBlockInfoIterator = tableBlockInfoList.iterator();
+        while (tableBlockInfoIterator.hasNext()) {
+          TableBlockInfo info = tableBlockInfoIterator.next();
+          map.remove(info);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index 50d462a..e2218a8 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -141,8 +141,8 @@ public class SegmentTaskIndexStore {
           synchronized (segmentLoderLockObject) {
             taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
             if (null == taskIdToSegmentIndexMap) {
-              // creating a map of take if to table segment
-              taskIdToSegmentIndexMap = new HashMap<String, AbstractIndex>();
+              // creating a map of task id to table segment
+              taskIdToSegmentIndexMap = new ConcurrentHashMap<String, AbstractIndex>();
               Iterator<Entry<String, List<TableBlockInfo>>> iterator =
                   taskIdToTableBlockInfoMap.entrySet().iterator();
               while (iterator.hasNext()) {
@@ -256,7 +256,7 @@ public class SegmentTaskIndexStore {
   private Map<String, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
     Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
-        new HashMap<String, List<TableBlockInfo>>();
+        new ConcurrentHashMap<String, List<TableBlockInfo>>();
     Iterator<Entry<String, List<TableBlockInfo>>> iterator =
         segmentToTableBlocksInfos.entrySet().iterator();
     while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 82c515c..5168208 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1431,5 +1431,26 @@ public final class CarbonUtil {
     return builder.toString();
   }
 
+  /**
+   * Below method will be used to get the list of segment in
+   * comma separated string format
+   *
+   * @param segmentList
+   * @return comma separated segment string
+   */
+  public static String getSegmentString(List<String> segmentList) {
+    if (segmentList.isEmpty()) {
+      return "";
+    }
+    StringBuilder segmentStringbuilder = new StringBuilder();
+    for (int i = 0; i < segmentList.size() - 1; i++) {
+      String segmentNo = segmentList.get(i);
+      segmentStringbuilder.append(segmentNo);
+      segmentStringbuilder.append(",");
+    }
+    segmentStringbuilder.append(segmentList.get(segmentList.size() - 1));
+    return segmentStringbuilder.toString();
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
index dab8a23..c31824f 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -18,12 +18,7 @@
  */
 package org.apache.carbondata.scan.executor.impl;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -101,8 +96,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // query execution
     Collections.sort(queryModel.getTableBlockInfos());
     // get the table blocks
+    BlockIndexStore blockLoaderInstance = BlockIndexStore.getInstance();
+    // remove the invalid table blocks, block which is deleted or compacted
+    blockLoaderInstance.removeTableBlocks(queryModel.getInvalidSegmentIds(),
+        queryModel.getAbsoluteTableIdentifier());
     try {
-      queryProperties.dataBlocks = BlockIndexStore.getInstance()
+      queryProperties.dataBlocks = blockLoaderInstance
           .loadAndGetBlocks(queryModel.getTableBlockInfos(),
               queryModel.getAbsoluteTableIdentifier());
     } catch (IndexBuilderException e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
index 81eb728..1c819a2 100644
--- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
@@ -132,6 +132,13 @@ public class QueryModel implements Serializable {
 
   private QueryStatisticsRecorder statisticsRecorder;
 
+  /**
+   * Invalid table blocks, which need to be removed from
+   * memory, invalid blocks can be segment which are deleted
+   * or compacted
+   */
+  private List<String> invalidSegmentIds;
+
   public QueryModel() {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     queryDimension = new ArrayList<QueryDimension>();
@@ -139,6 +146,7 @@ public class QueryModel implements Serializable {
     sortDimension = new ArrayList<QueryDimension>();
     sortOrder = new byte[0];
     paritionColumns = new ArrayList<String>();
+    invalidSegmentIds = new ArrayList<>();
   }
 
   public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier,
@@ -504,4 +512,12 @@ public class QueryModel implements Serializable {
   public void setStatisticsRecorder(QueryStatisticsRecorder statisticsRecorder) {
     this.statisticsRecorder = statisticsRecorder;
   }
+
+  public List<String> getInvalidSegmentIds() {
+    return invalidSegmentIds;
+  }
+
+  public void setInvalidSegmentIds(List<String> invalidSegmentIds) {
+    this.invalidSegmentIds = invalidSegmentIds;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index f6fae6c..2e18629 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -54,6 +54,7 @@ import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
@@ -93,6 +94,9 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.StringUtils;
 
 
+
+
+
 /**
  * Carbon Input format class representing one carbon table
  */
@@ -239,21 +243,29 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * Set List of segments to access
    */
-  private void setSegmentsToAccess(Configuration configuration, List<String> segmentNosList) {
-
-    //serialize to comma separated string
-    StringBuilder stringSegmentsBuilder = new StringBuilder();
-    for (int i = 0; i < segmentNosList.size(); i++) {
-      String segmentNo = segmentNosList.get(i);
-      stringSegmentsBuilder.append(segmentNo);
-      if (i < segmentNosList.size() - 1) {
-        stringSegmentsBuilder.append(",");
-      }
-    }
-    configuration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, stringSegmentsBuilder.toString());
+  public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
+    configuration
+        .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
   }
 
   /**
+   * Below method will be used to set the segments details if
+   * segments are not added in the configuration
+   *
+   * @param job
+   * @param absoluteTableIdentifier
+   * @throws IOException
+   */
+  private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier)
+      throws IOException {
+    if (getSegmentsFromConfiguration(job).length == 0) {
+      // Get the valid segments from the carbon store.
+      SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
+          new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+      setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments());
+    }
+  }
+  /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR
    * are used to get table path to read.
@@ -266,25 +278,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     try {
       CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
       Object filterPredicates = getFilterPredicates(job.getConfiguration());
-      if (getValidSegments(job).length == 0) {
-        // Get the valid segments from the carbon store.
-        SegmentStatusManager.ValidSegmentsInfo validSegments =
-            new SegmentStatusManager(getAbsoluteTableIdentifier(job.getConfiguration()))
-                .getValidSegments();
-        if (validSegments.listOfValidSegments.isEmpty()) {
-          return new ArrayList<InputSplit>();
-        }
-        setSegmentsToAccess(job.getConfiguration(), validSegments.listOfValidSegments);
-      }
-
+      AbsoluteTableIdentifier absoluteTableIdentifier =
+          getAbsoluteTableIdentifier(job.getConfiguration());
+      addSegmentsIfEmpty(job, absoluteTableIdentifier);
       if (filterPredicates == null) {
         return getSplitsNonFilter(job);
       } else {
         if (filterPredicates instanceof Expression) {
           //process and resolve the expression.
           CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable);
-          return getSplits(job, CarbonInputFormatUtil.resolveFilter((Expression) filterPredicates,
-              getAbsoluteTableIdentifier(job.getConfiguration())));
+          return getSplits(job, CarbonInputFormatUtil
+              .resolveFilter((Expression) filterPredicates, absoluteTableIdentifier));
         } else {
           //It means user sets already resolved expression.
           return getSplits(job, (FilterResolverIntf) filterPredicates);
@@ -342,7 +346,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         getAbsoluteTableIdentifier(job.getConfiguration());
 
     //for each segment fetch blocks matching filter in Driver BTree
-    for (String segmentNo : getValidSegments(job)) {
+    for (String segmentNo : getSegmentsFromConfiguration(job)) {
       List<DataRefNode> dataRefNodes =
           getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier,
               filterResolver, segmentNo);
@@ -368,11 +372,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     long rowCount = 0;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         getAbsoluteTableIdentifier(job.getConfiguration());
-    SegmentStatusManager.ValidSegmentsInfo validSegments =
-        new SegmentStatusManager(getAbsoluteTableIdentifier(job.getConfiguration()))
-            .getValidSegments();
-    setSegmentsToAccess(job.getConfiguration(), validSegments.listOfValidSegments);
     // no of core to load the blocks in driver
+    addSegmentsIfEmpty(job, absoluteTableIdentifier);
     int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
     try {
       numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
@@ -385,7 +386,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     List<Future<Map<String, AbstractIndex>>> loadedBlocks =
         new ArrayList<Future<Map<String, AbstractIndex>>>();
     //for each segment fetch blocks matching filter in Driver BTree
-    for (String segmentNo : getValidSegments(job)) {
+    for (String segmentNo : getSegmentsFromConfiguration(job)) {
       // submitting the task
       loadedBlocks
           .add(threadPool.submit(new BlocksLoaderThread(job, absoluteTableIdentifier, segmentNo)));
@@ -494,6 +495,39 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return resultFilterredBlocks;
   }
 
+  /**
+   * Below method will be used to get the table block info
+   *
+   * @param job                     job context
+   * @param absoluteTableIdentifier absolute table identifier
+   * @param segmentId               number of segment id
+   * @return list of table block
+   * @throws IOException
+   */
+  private List<TableBlockInfo> getTableBlockInfo(JobContext job,
+      AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException {
+    // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+    List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+    // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
+
+    // get file location of all files of given segment
+    JobContext newJob =
+        new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
+    newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId + "");
+
+    // identify table blocks
+    for (InputSplit inputSplit : getSplitsInternal(newJob)) {
+      CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+      BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
+          carbonInputSplit.getNumberOfBlocklets());
+      tableBlockInfoList.add(
+          new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
+              blockletInfos));
+    }
+    return tableBlockInfoList;
+  }
+
   private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job,
       AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId)
       throws IOException, IndexBuilderException {
@@ -503,25 +537,10 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     // if segment tree is not loaded, load the segment tree
     if (segmentIndexMap == null) {
       // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
-      List<TableBlockInfo> tableBlockInfoList = new LinkedList<TableBlockInfo>();
+      List<TableBlockInfo> tableBlockInfoList =
+          getTableBlockInfo(job, absoluteTableIdentifier, segmentId);
       // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
 
-      // get file location of all files of given segment
-      JobContext newJob =
-          new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
-      newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId + "");
-
-      // identify table blocks
-      for (InputSplit inputSplit : getSplitsInternal(newJob)) {
-        CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
-        BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
-            carbonInputSplit.getNumberOfBlocklets());
-        tableBlockInfoList.add(
-            new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
-                segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
-                blockletInfos));
-      }
-
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
       segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
 
@@ -624,7 +643,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
 
   @Override protected List<FileStatus> listStatus(JobContext job) throws IOException {
     List<FileStatus> result = new ArrayList<FileStatus>();
-    String[] segmentsToConsider = getValidSegments(job);
+    String[] segmentsToConsider = getSegmentsFromConfiguration(job);
     if (segmentsToConsider.length == 0) {
       throw new IOException("No segments found");
     }
@@ -706,7 +725,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * @return updateExtension
    */
-  private String[] getValidSegments(JobContext job) throws IOException {
+  private String[] getSegmentsFromConfiguration(JobContext job)
+      throws IOException {
     String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
     // if no segments
     if (segmentString.trim().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 0b8b106..f2cfd81 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
@@ -32,9 +33,11 @@ import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.Dictionary
 import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo}
-import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
+import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.scan.executor.QueryExecutorFactory
 import org.apache.carbondata.scan.expression.Expression
 import org.apache.carbondata.scan.model.QueryModel
@@ -82,15 +85,24 @@ class CarbonScanRDD[V: ClassTag](
 
     val result = new util.ArrayList[Partition](defaultParallelism)
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier)
+        .getValidAndInvalidSegments
     // set filter resolver tree
     try {
       // before applying filter check whether segments are available in the table.
-      val splits = carbonInputFormat.getSplits(job)
-      if (!splits.isEmpty) {
+      if (!validAndInvalidSegments.getValidSegments.isEmpty) {
         val filterResolver = carbonInputFormat
           .getResolvedFilter(job.getConfiguration, filterExpression)
         CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
         queryModel.setFilterExpressionResolverTree(filterResolver)
+        CarbonInputFormat
+          .setSegmentsToAccess(job.getConfiguration,
+            validAndInvalidSegments.getValidSegments
+          )
+        SegmentTaskIndexStore.getInstance()
+          .removeTableBlocks(validAndInvalidSegments.getInvalidSegments,
+            queryModel.getAbsoluteTableIdentifier
+          )
       }
     }
     catch {
@@ -102,7 +114,7 @@ class CarbonScanRDD[V: ClassTag](
     val splits = carbonInputFormat.getSplits(job)
     if (!splits.isEmpty) {
       val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-
+      queryModel.setInvalidSegmentIds(validAndInvalidSegments.getInvalidSegments)
       val blockListTemp = carbonInputSplits.map(inputSplit =>
         new TableBlockInfo(inputSplit.getPath.toString,
           inputSplit.getStart, inputSplit.getSegmentId,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index ae29650..d9e1349 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -118,7 +118,7 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
         )
     )
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0"))
     assert(!segments.contains("1"))
@@ -130,7 +130,7 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
         )
     )
     // merged segment should not be there
-    val segments2 = segmentStatusManager2.getValidSegments.listOfValidSegments.asScala.toList
+    val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(segments2.contains("0.1"))
     assert(!segments2.contains("0"))
     assert(!segments2.contains("1"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index 7bbee54..4ec00ac 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -92,7 +92,7 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
             new CarbonTableIdentifier("default", "cardinalityTest", "1")
           )
       )
-      val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         // wait for 2 seconds for compaction to complete.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index 48a1231..4a43767 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -108,7 +108,7 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
       val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
         absoluteTableIdentifier
       )
-      val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         assert(true)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
index 09fb427..17fc1e5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
@@ -44,7 +44,7 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll {
           new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId)
         )
     )
-    val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
     segments
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 7737745..39dba52 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -90,7 +90,7 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
             new CarbonTableIdentifier("default", "normalcompaction", "1")
           )
       )
-      val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         // wait for 2 seconds for compaction to complete.
@@ -138,7 +138,7 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
         )
     )
     // merged segment should not be there
-    val segments   = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+    val segments   = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList
     assert(!segments.contains("0"))
     assert(!segments.contains("1"))
     assert(!segments.contains("2"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index a062153..9fe178f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -103,7 +103,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
             new CarbonTableIdentifier("default", "ignoremajor", noOfRetries + "")
           )
       )
-      val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
       segments.foreach(seg =>
         System.out.println( "valid segment is =" + seg)
       )
@@ -135,7 +135,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
         )
     )
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(segments.contains("2.1"))
     assert(!segments.contains("2"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 2b0afe6..3e51002 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -93,7 +93,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
             new CarbonTableIdentifier("default", "stopmajor", noOfRetries + "")
           )
       )
-      val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
       segments.foreach(seg =>
         System.out.println( "valid segment is =" + seg)
       )
@@ -125,7 +125,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
         )
     )
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0.2"))
     assert(!segments.contains("0"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
index 22ec689..bc6032a 100644
--- a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
@@ -49,7 +49,6 @@ import org.apache.carbondata.lcm.locks.ICarbonLock;
 import org.apache.carbondata.lcm.locks.LockUsage;
 
 import com.google.gson.Gson;
-
 /**
  * Manages Load/Segment status
  */
@@ -64,17 +63,6 @@ public class SegmentStatusManager {
     this.absoluteTableIdentifier = absoluteTableIdentifier;
   }
 
-  public static class ValidSegmentsInfo {
-    public final List<String> listOfValidSegments;
-    public final List<String> listOfValidUpdatedSegments;
-
-    public ValidSegmentsInfo(List<String> listOfValidSegments,
-        List<String> listOfValidUpdatedSegments) {
-      this.listOfValidSegments = listOfValidSegments;
-      this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
-    }
-  }
-
   /**
    * This will return the lock object used to lock the table status file before updation.
    *
@@ -89,9 +77,9 @@ public class SegmentStatusManager {
    * This method will return last modified time of tablestatus file
    */
   public long getTableStatusLastModifiedTime() throws IOException {
-    String tableStatusPath = CarbonStorePath.getCarbonTablePath(
-        absoluteTableIdentifier.getStorePath(), absoluteTableIdentifier.getCarbonTableIdentifier())
-          .getTableStatusFilePath();
+    String tableStatusPath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier()).getTableStatusFilePath();
     if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
       return 0L;
     } else {
@@ -102,14 +90,16 @@ public class SegmentStatusManager {
 
   /**
    * get valid segment for given table
+   *
    * @return
    * @throws IOException
    */
-  public ValidSegmentsInfo getValidSegments() throws IOException {
+  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
 
     // @TODO: move reading LoadStatus file to separate class
     List<String> listOfValidSegments = new ArrayList<String>(10);
     List<String> listOfValidUpdatedSegments = new ArrayList<String>(10);
+    List<String> listOfInvalidSegments = new ArrayList<String>(10);
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
             absoluteTableIdentifier.getCarbonTableIdentifier());
@@ -157,8 +147,15 @@ public class SegmentStatusManager {
               listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
             }
             listOfValidSegments.add(loadMetadataDetails.getLoadName());
-
+          } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+              || CarbonCommonConstants.SEGMENT_COMPACTED
+              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+              || CarbonCommonConstants.MARKED_FOR_DELETE
+              .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
+            listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
           }
+
         }
       }
     } catch (IOException e) {
@@ -176,7 +173,8 @@ public class SegmentStatusManager {
       }
 
     }
-    return new ValidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments);
+    return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
+        listOfInvalidSegments);
   }
 
   /**
@@ -218,6 +216,7 @@ public class SegmentStatusManager {
 
   /**
    * returns current time
+   *
    * @return
    */
   private String readCurrentTime() {
@@ -244,6 +243,7 @@ public class SegmentStatusManager {
 
   /**
    * updates deletion status
+   *
    * @param loadIds
    * @param tableFolderPath
    * @return
@@ -271,13 +271,10 @@ public class SegmentStatusManager {
         listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
         if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
           updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
-          if(invalidLoadIds.isEmpty())
-          {
+          if (invalidLoadIds.isEmpty()) {
             // All or None , if anything fails then dont write
             writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
-          }
-          else
-          {
+          } else {
             return invalidLoadIds;
           }
 
@@ -330,13 +327,11 @@ public class SegmentStatusManager {
         // read existing metadata details in load metadata.
         listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
         if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
-          updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray,
-              invalidLoadTimestamps, loadStartTime);
-          if(invalidLoadTimestamps.isEmpty()) {
+          updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps,
+              loadStartTime);
+          if (invalidLoadTimestamps.isEmpty()) {
             writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
-          }
-          else
-          {
+          } else {
             return invalidLoadTimestamps;
           }
 
@@ -395,6 +390,7 @@ public class SegmentStatusManager {
 
   /**
    * updates deletion status details for each load and returns invalidLoadIds
+   *
    * @param loadIds
    * @param listOfLoadFolderDetailsArray
    * @param invalidLoadIds
@@ -504,17 +500,39 @@ public class SegmentStatusManager {
 
   /**
    * unlocks given file
+   *
    * @param carbonLock
    */
   private void fileUnlock(ICarbonLock carbonLock) {
     if (carbonLock.unlock()) {
       LOG.info("Metadata lock has been successfully released");
     } else {
-      LOG
-          .error("Not able to release the metadata lock");
+      LOG.error("Not able to release the metadata lock");
     }
   }
 
+  public static class ValidAndInvalidSegmentsInfo {
+    private final List<String> listOfValidSegments;
+    private final List<String> listOfValidUpdatedSegments;
+    private final List<String> listOfInvalidSegments;
+
+    private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
+        List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) {
+      this.listOfValidSegments = listOfValidSegments;
+      this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
+      this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
+    }
+
+    public List<String> getInvalidSegments() {
+      return listOfInvalidSegments;
+    }
 
+    public List<String> getValidSegments() {
+      return listOfValidSegments;
+    }
 
+    public List<String> getUpadtedSegments() {
+      return listOfValidUpdatedSegments;
+    }
+  }
 }



[2/2] incubator-carbondata git commit: [CARBONDATA-241] delete invalid segments from btree. This closes #158

Posted by gv...@apache.org.
[CARBONDATA-241] delete invalid segments from btree. This closes #158


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/381b08a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/381b08a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/381b08a0

Branch: refs/heads/master
Commit: 381b08a01ab3f05448a79b9eecfb476e224d8a00
Parents: 0d9970f 286904e
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Sun Sep 18 02:26:25 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Sun Sep 18 02:26:25 2016 +0530

----------------------------------------------------------------------
 .../core/carbon/datastore/BlockIndexStore.java  |  60 +++++++++-
 .../carbon/datastore/SegmentTaskIndexStore.java |   6 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  21 ++++
 .../executor/impl/AbstractQueryExecutor.java    |  13 +-
 .../carbondata/scan/model/QueryModel.java       |  16 +++
 .../carbondata/hadoop/CarbonInputFormat.java    | 120 +++++++++++--------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  20 +++-
 .../CompactionSystemLockFeatureTest.scala       |   4 +-
 .../DataCompactionCardinalityBoundryTest.scala  |   2 +-
 .../datacompaction/DataCompactionLockTest.scala |   2 +-
 .../DataCompactionNoDictionaryTest.scala        |   2 +-
 .../datacompaction/DataCompactionTest.scala     |   4 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |   4 +-
 .../MajorCompactionStopsAfterCompaction.scala   |   4 +-
 .../lcm/status/SegmentStatusManager.java        |  80 ++++++++-----
 15 files changed, 247 insertions(+), 111 deletions(-)
----------------------------------------------------------------------