You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2016/09/17 20:56:44 UTC
[1/2] incubator-carbondata git commit: Fixed out of memory issue
during query execution, as invalid segments are not getting deleted from Btree
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 0d9970f66 -> 381b08a01
Fixed out of memory issue during query execution, as invalid segments are not getting deleted from Btree
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/286904ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/286904ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/286904ea
Branch: refs/heads/master
Commit: 286904eaf2454441d200dcb8bba1d0c9d537a1e2
Parents: 0d9970f
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Sep 15 14:11:00 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Sun Sep 18 02:22:19 2016 +0530
----------------------------------------------------------------------
.../core/carbon/datastore/BlockIndexStore.java | 60 +++++++++-
.../carbon/datastore/SegmentTaskIndexStore.java | 6 +-
.../apache/carbondata/core/util/CarbonUtil.java | 21 ++++
.../executor/impl/AbstractQueryExecutor.java | 13 +-
.../carbondata/scan/model/QueryModel.java | 16 +++
.../carbondata/hadoop/CarbonInputFormat.java | 120 +++++++++++--------
.../carbondata/spark/rdd/CarbonScanRDD.scala | 20 +++-
.../CompactionSystemLockFeatureTest.scala | 4 +-
.../DataCompactionCardinalityBoundryTest.scala | 2 +-
.../datacompaction/DataCompactionLockTest.scala | 2 +-
.../DataCompactionNoDictionaryTest.scala | 2 +-
.../datacompaction/DataCompactionTest.scala | 4 +-
.../MajorCompactionIgnoreInMinorTest.scala | 4 +-
.../MajorCompactionStopsAfterCompaction.scala | 4 +-
.../lcm/status/SegmentStatusManager.java | 80 ++++++++-----
15 files changed, 247 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index 07815c0..4a36373 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.carbon.datastore;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -62,6 +63,12 @@ public class BlockIndexStore {
private Map<AbsoluteTableIdentifier, Map<TableBlockInfo, AbstractIndex>> tableBlocksMap;
/**
+ * map to maintain segment id to block info map, this map will be used to
+ * while removing the block from memory when segment is compacted or deleted
+ */
+ private Map<AbsoluteTableIdentifier, Map<String, List<TableBlockInfo>>> segmentIdToBlockListMap;
+
+ /**
* map of block info to lock object map, while loading the btree this will be filled
* and removed after loading the tree for that particular block info, this will be useful
* while loading the tree concurrently so only block level lock will be applied another
@@ -83,6 +90,7 @@ public class BlockIndexStore {
tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
blockInfoLock = new ConcurrentHashMap<TableBlockInfo, Object>();
+ segmentIdToBlockListMap = new ConcurrentHashMap<>();
}
/**
@@ -129,6 +137,7 @@ public class BlockIndexStore {
tableBlockMapTemp = new ConcurrentHashMap<TableBlockInfo, AbstractIndex>();
tableBlocksMap.put(absoluteTableIdentifier, tableBlockMapTemp);
}
+ fillSegmentIdToTableInfoMap(tableBlocksInfos, absoluteTableIdentifier);
}
AbstractIndex tableBlock = null;
List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>();
@@ -189,6 +198,32 @@ public class BlockIndexStore {
}
/**
+ * Below method will be used to fill segment id to its block mapping map.
+ * it will group all the table block info based on segment id and it will fill
+ *
+ * @param tableBlockInfos table block infos
+ * @param absoluteTableIdentifier absolute table identifier
+ */
+ private void fillSegmentIdToTableInfoMap(List<TableBlockInfo> tableBlockInfos,
+ AbsoluteTableIdentifier absoluteTableIdentifier) {
+ Map<String, List<TableBlockInfo>> map = segmentIdToBlockListMap.get(absoluteTableIdentifier);
+ if (null == map) {
+ map = new ConcurrentHashMap<String, List<TableBlockInfo>>();
+ segmentIdToBlockListMap.put(absoluteTableIdentifier, map);
+ }
+ for (TableBlockInfo info : tableBlockInfos) {
+ List<TableBlockInfo> tempTableBlockInfos = map.get(info.getSegmentId());
+ if (null == tempTableBlockInfos) {
+ tempTableBlockInfos = new ArrayList<>();
+ map.put(info.getSegmentId(), tempTableBlockInfos);
+ }
+ if (!tempTableBlockInfos.contains(info)) {
+ tempTableBlockInfos.add(info);
+ }
+ }
+ }
+
+ /**
* Below method will be used to fill the loaded blocks to the array
* which will be used for query execution
*
@@ -246,10 +281,10 @@ public class BlockIndexStore {
* deletion of some of the blocks in case of retention or may be some other
* scenario
*
- * @param removeTableBlocksInfos blocks to be removed
+ * @param segmentsToBeRemoved list of segments to be removed
* @param absoluteTableIdentifier absolute table identifier
*/
- public void removeTableBlocks(List<TableBlockInfo> removeTableBlocksInfos,
+ public void removeTableBlocks(List<String> segmentsToBeRemoved,
AbsoluteTableIdentifier absoluteTableIdentifier) {
// get the lock object if lock object is not present then it is not
// loaded at all
@@ -260,11 +295,26 @@ public class BlockIndexStore {
}
Map<TableBlockInfo, AbstractIndex> map = tableBlocksMap.get(absoluteTableIdentifier);
// if there is no loaded blocks then return
- if (null == map) {
+ if (null == map || map.isEmpty()) {
return;
}
- for (TableBlockInfo blockInfos : removeTableBlocksInfos) {
- map.remove(blockInfos);
+ Map<String, List<TableBlockInfo>> segmentIdToBlockInfoMap =
+ segmentIdToBlockListMap.get(absoluteTableIdentifier);
+ if (null == segmentIdToBlockInfoMap || segmentIdToBlockInfoMap.isEmpty()) {
+ return;
+ }
+ synchronized (lockObject) {
+ for (String segmentId : segmentsToBeRemoved) {
+ List<TableBlockInfo> tableBlockInfoList = segmentIdToBlockInfoMap.remove(segmentId);
+ if (null == tableBlockInfoList) {
+ continue;
+ }
+ Iterator<TableBlockInfo> tableBlockInfoIterator = tableBlockInfoList.iterator();
+ while (tableBlockInfoIterator.hasNext()) {
+ TableBlockInfo info = tableBlockInfoIterator.next();
+ map.remove(info);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index 50d462a..e2218a8 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -141,8 +141,8 @@ public class SegmentTaskIndexStore {
synchronized (segmentLoderLockObject) {
taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
if (null == taskIdToSegmentIndexMap) {
- // creating a map of take if to table segment
- taskIdToSegmentIndexMap = new HashMap<String, AbstractIndex>();
+ // creating a map of task id to table segment
+ taskIdToSegmentIndexMap = new ConcurrentHashMap<String, AbstractIndex>();
Iterator<Entry<String, List<TableBlockInfo>>> iterator =
taskIdToTableBlockInfoMap.entrySet().iterator();
while (iterator.hasNext()) {
@@ -256,7 +256,7 @@ public class SegmentTaskIndexStore {
private Map<String, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
- new HashMap<String, List<TableBlockInfo>>();
+ new ConcurrentHashMap<String, List<TableBlockInfo>>();
Iterator<Entry<String, List<TableBlockInfo>>> iterator =
segmentToTableBlocksInfos.entrySet().iterator();
while (iterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 82c515c..5168208 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1431,5 +1431,26 @@ public final class CarbonUtil {
return builder.toString();
}
+ /**
+ * Below method will be used to get the list of segment in
+ * comma separated string format
+ *
+ * @param segmentList
+ * @return comma separated segment string
+ */
+ public static String getSegmentString(List<String> segmentList) {
+ if (segmentList.isEmpty()) {
+ return "";
+ }
+ StringBuilder segmentStringbuilder = new StringBuilder();
+ for (int i = 0; i < segmentList.size() - 1; i++) {
+ String segmentNo = segmentList.get(i);
+ segmentStringbuilder.append(segmentNo);
+ segmentStringbuilder.append(",");
+ }
+ segmentStringbuilder.append(segmentList.get(segmentList.size() - 1));
+ return segmentStringbuilder.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
index dab8a23..c31824f 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -18,12 +18,7 @@
*/
package org.apache.carbondata.scan.executor.impl;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -101,8 +96,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// query execution
Collections.sort(queryModel.getTableBlockInfos());
// get the table blocks
+ BlockIndexStore blockLoaderInstance = BlockIndexStore.getInstance();
+ // remove the invalid table blocks, block which is deleted or compacted
+ blockLoaderInstance.removeTableBlocks(queryModel.getInvalidSegmentIds(),
+ queryModel.getAbsoluteTableIdentifier());
try {
- queryProperties.dataBlocks = BlockIndexStore.getInstance()
+ queryProperties.dataBlocks = blockLoaderInstance
.loadAndGetBlocks(queryModel.getTableBlockInfos(),
queryModel.getAbsoluteTableIdentifier());
} catch (IndexBuilderException e) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
index 81eb728..1c819a2 100644
--- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
@@ -132,6 +132,13 @@ public class QueryModel implements Serializable {
private QueryStatisticsRecorder statisticsRecorder;
+ /**
+ * Invalid table blocks, which need to be removed from
+ * memory, invalid blocks can be segment which are deleted
+ * or compacted
+ */
+ private List<String> invalidSegmentIds;
+
public QueryModel() {
tableBlockInfos = new ArrayList<TableBlockInfo>();
queryDimension = new ArrayList<QueryDimension>();
@@ -139,6 +146,7 @@ public class QueryModel implements Serializable {
sortDimension = new ArrayList<QueryDimension>();
sortOrder = new byte[0];
paritionColumns = new ArrayList<String>();
+ invalidSegmentIds = new ArrayList<>();
}
public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier,
@@ -504,4 +512,12 @@ public class QueryModel implements Serializable {
public void setStatisticsRecorder(QueryStatisticsRecorder statisticsRecorder) {
this.statisticsRecorder = statisticsRecorder;
}
+
+ public List<String> getInvalidSegmentIds() {
+ return invalidSegmentIds;
+ }
+
+ public void setInvalidSegmentIds(List<String> invalidSegmentIds) {
+ this.invalidSegmentIds = invalidSegmentIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index f6fae6c..2e18629 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -54,6 +54,7 @@ import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
@@ -93,6 +94,9 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.util.StringUtils;
+
+
+
/**
* Carbon Input format class representing one carbon table
*/
@@ -239,21 +243,29 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
/**
* Set List of segments to access
*/
- private void setSegmentsToAccess(Configuration configuration, List<String> segmentNosList) {
-
- //serialize to comma separated string
- StringBuilder stringSegmentsBuilder = new StringBuilder();
- for (int i = 0; i < segmentNosList.size(); i++) {
- String segmentNo = segmentNosList.get(i);
- stringSegmentsBuilder.append(segmentNo);
- if (i < segmentNosList.size() - 1) {
- stringSegmentsBuilder.append(",");
- }
- }
- configuration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, stringSegmentsBuilder.toString());
+ public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
+ configuration
+ .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
}
/**
+ * Below method will be used to set the segments details if
+ * segments are not added in the configuration
+ *
+ * @param job
+ * @param absoluteTableIdentifier
+ * @throws IOException
+ */
+ private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier)
+ throws IOException {
+ if (getSegmentsFromConfiguration(job).length == 0) {
+ // Get the valid segments from the carbon store.
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
+ new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+ setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments());
+ }
+ }
+ /**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
* are used to get table path to read.
@@ -266,25 +278,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
try {
CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
Object filterPredicates = getFilterPredicates(job.getConfiguration());
- if (getValidSegments(job).length == 0) {
- // Get the valid segments from the carbon store.
- SegmentStatusManager.ValidSegmentsInfo validSegments =
- new SegmentStatusManager(getAbsoluteTableIdentifier(job.getConfiguration()))
- .getValidSegments();
- if (validSegments.listOfValidSegments.isEmpty()) {
- return new ArrayList<InputSplit>();
- }
- setSegmentsToAccess(job.getConfiguration(), validSegments.listOfValidSegments);
- }
-
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ getAbsoluteTableIdentifier(job.getConfiguration());
+ addSegmentsIfEmpty(job, absoluteTableIdentifier);
if (filterPredicates == null) {
return getSplitsNonFilter(job);
} else {
if (filterPredicates instanceof Expression) {
//process and resolve the expression.
CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable);
- return getSplits(job, CarbonInputFormatUtil.resolveFilter((Expression) filterPredicates,
- getAbsoluteTableIdentifier(job.getConfiguration())));
+ return getSplits(job, CarbonInputFormatUtil
+ .resolveFilter((Expression) filterPredicates, absoluteTableIdentifier));
} else {
//It means user sets already resolved expression.
return getSplits(job, (FilterResolverIntf) filterPredicates);
@@ -342,7 +346,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
getAbsoluteTableIdentifier(job.getConfiguration());
//for each segment fetch blocks matching filter in Driver BTree
- for (String segmentNo : getValidSegments(job)) {
+ for (String segmentNo : getSegmentsFromConfiguration(job)) {
List<DataRefNode> dataRefNodes =
getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier,
filterResolver, segmentNo);
@@ -368,11 +372,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
long rowCount = 0;
AbsoluteTableIdentifier absoluteTableIdentifier =
getAbsoluteTableIdentifier(job.getConfiguration());
- SegmentStatusManager.ValidSegmentsInfo validSegments =
- new SegmentStatusManager(getAbsoluteTableIdentifier(job.getConfiguration()))
- .getValidSegments();
- setSegmentsToAccess(job.getConfiguration(), validSegments.listOfValidSegments);
// no of core to load the blocks in driver
+ addSegmentsIfEmpty(job, absoluteTableIdentifier);
int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
try {
numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
@@ -385,7 +386,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
List<Future<Map<String, AbstractIndex>>> loadedBlocks =
new ArrayList<Future<Map<String, AbstractIndex>>>();
//for each segment fetch blocks matching filter in Driver BTree
- for (String segmentNo : getValidSegments(job)) {
+ for (String segmentNo : getSegmentsFromConfiguration(job)) {
// submitting the task
loadedBlocks
.add(threadPool.submit(new BlocksLoaderThread(job, absoluteTableIdentifier, segmentNo)));
@@ -494,6 +495,39 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return resultFilterredBlocks;
}
+ /**
+ * Below method will be used to get the table block info
+ *
+ * @param job job context
+ * @param absoluteTableIdentifier absolute table identifier
+ * @param segmentId number of segment id
+ * @return list of table block
+ * @throws IOException
+ */
+ private List<TableBlockInfo> getTableBlockInfo(JobContext job,
+ AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException {
+ // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+ List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+ // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
+
+ // get file location of all files of given segment
+ JobContext newJob =
+ new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
+ newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId + "");
+
+ // identify table blocks
+ for (InputSplit inputSplit : getSplitsInternal(newJob)) {
+ CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+ BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
+ carbonInputSplit.getNumberOfBlocklets());
+ tableBlockInfoList.add(
+ new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+ segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
+ blockletInfos));
+ }
+ return tableBlockInfoList;
+ }
+
private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job,
AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId)
throws IOException, IndexBuilderException {
@@ -503,25 +537,10 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
// if segment tree is not loaded, load the segment tree
if (segmentIndexMap == null) {
// List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
- List<TableBlockInfo> tableBlockInfoList = new LinkedList<TableBlockInfo>();
+ List<TableBlockInfo> tableBlockInfoList =
+ getTableBlockInfo(job, absoluteTableIdentifier, segmentId);
// getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
- // get file location of all files of given segment
- JobContext newJob =
- new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
- newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId + "");
-
- // identify table blocks
- for (InputSplit inputSplit : getSplitsInternal(newJob)) {
- CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
- BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
- carbonInputSplit.getNumberOfBlocklets());
- tableBlockInfoList.add(
- new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
- segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
- blockletInfos));
- }
-
Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
@@ -624,7 +643,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
@Override protected List<FileStatus> listStatus(JobContext job) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
- String[] segmentsToConsider = getValidSegments(job);
+ String[] segmentsToConsider = getSegmentsFromConfiguration(job);
if (segmentsToConsider.length == 0) {
throw new IOException("No segments found");
}
@@ -706,7 +725,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
/**
* @return updateExtension
*/
- private String[] getValidSegments(JobContext job) throws IOException {
+ private String[] getSegmentsFromConfiguration(JobContext job)
+ throws IOException {
String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
// if no segments
if (segmentString.trim().isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 0b8b106..f2cfd81 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
@@ -32,9 +33,11 @@ import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.Dictionary
import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo}
-import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
+import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.scan.executor.QueryExecutorFactory
import org.apache.carbondata.scan.expression.Expression
import org.apache.carbondata.scan.model.QueryModel
@@ -82,15 +85,24 @@ class CarbonScanRDD[V: ClassTag](
val result = new util.ArrayList[Partition](defaultParallelism)
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier)
+ .getValidAndInvalidSegments
// set filter resolver tree
try {
// before applying filter check whether segments are available in the table.
- val splits = carbonInputFormat.getSplits(job)
- if (!splits.isEmpty) {
+ if (!validAndInvalidSegments.getValidSegments.isEmpty) {
val filterResolver = carbonInputFormat
.getResolvedFilter(job.getConfiguration, filterExpression)
CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
queryModel.setFilterExpressionResolverTree(filterResolver)
+ CarbonInputFormat
+ .setSegmentsToAccess(job.getConfiguration,
+ validAndInvalidSegments.getValidSegments
+ )
+ SegmentTaskIndexStore.getInstance()
+ .removeTableBlocks(validAndInvalidSegments.getInvalidSegments,
+ queryModel.getAbsoluteTableIdentifier
+ )
}
}
catch {
@@ -102,7 +114,7 @@ class CarbonScanRDD[V: ClassTag](
val splits = carbonInputFormat.getSplits(job)
if (!splits.isEmpty) {
val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-
+ queryModel.setInvalidSegmentIds(validAndInvalidSegments.getInvalidSegments)
val blockListTemp = carbonInputSplits.map(inputSplit =>
new TableBlockInfo(inputSplit.getPath.toString,
inputSplit.getStart, inputSplit.getSegmentId,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index ae29650..d9e1349 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -118,7 +118,7 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
)
)
// merged segment should not be there
- val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(!segments.contains("0"))
assert(!segments.contains("1"))
@@ -130,7 +130,7 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
)
)
// merged segment should not be there
- val segments2 = segmentStatusManager2.getValidSegments.listOfValidSegments.asScala.toList
+ val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList
assert(segments2.contains("0.1"))
assert(!segments2.contains("0"))
assert(!segments2.contains("1"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index 7bbee54..4ec00ac 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -92,7 +92,7 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
new CarbonTableIdentifier("default", "cardinalityTest", "1")
)
)
- val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
// wait for 2 seconds for compaction to complete.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index 48a1231..4a43767 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -108,7 +108,7 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
absoluteTableIdentifier
)
- val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
assert(true)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
index 09fb427..17fc1e5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
@@ -44,7 +44,7 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll {
new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId)
)
)
- val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
segments
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 7737745..39dba52 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -90,7 +90,7 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
new CarbonTableIdentifier("default", "normalcompaction", "1")
)
)
- val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
// wait for 2 seconds for compaction to complete.
@@ -138,7 +138,7 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
)
)
// merged segment should not be there
- val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList
assert(!segments.contains("0"))
assert(!segments.contains("1"))
assert(!segments.contains("2"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index a062153..9fe178f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -103,7 +103,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
new CarbonTableIdentifier("default", "ignoremajor", noOfRetries + "")
)
)
- val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
segments.foreach(seg =>
System.out.println( "valid segment is =" + seg)
)
@@ -135,7 +135,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
)
)
// merged segment should not be there
- val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(segments.contains("2.1"))
assert(!segments.contains("2"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 2b0afe6..3e51002 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -93,7 +93,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
new CarbonTableIdentifier("default", "stopmajor", noOfRetries + "")
)
)
- val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
segments.foreach(seg =>
System.out.println( "valid segment is =" + seg)
)
@@ -125,7 +125,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
)
)
// merged segment should not be there
- val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+ val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(!segments.contains("0.2"))
assert(!segments.contains("0"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/286904ea/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
index 22ec689..bc6032a 100644
--- a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
@@ -49,7 +49,6 @@ import org.apache.carbondata.lcm.locks.ICarbonLock;
import org.apache.carbondata.lcm.locks.LockUsage;
import com.google.gson.Gson;
-
/**
* Manages Load/Segment status
*/
@@ -64,17 +63,6 @@ public class SegmentStatusManager {
this.absoluteTableIdentifier = absoluteTableIdentifier;
}
- public static class ValidSegmentsInfo {
- public final List<String> listOfValidSegments;
- public final List<String> listOfValidUpdatedSegments;
-
- public ValidSegmentsInfo(List<String> listOfValidSegments,
- List<String> listOfValidUpdatedSegments) {
- this.listOfValidSegments = listOfValidSegments;
- this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
- }
- }
-
/**
* This will return the lock object used to lock the table status file before updation.
*
@@ -89,9 +77,9 @@ public class SegmentStatusManager {
* This method will return last modified time of tablestatus file
*/
public long getTableStatusLastModifiedTime() throws IOException {
- String tableStatusPath = CarbonStorePath.getCarbonTablePath(
- absoluteTableIdentifier.getStorePath(), absoluteTableIdentifier.getCarbonTableIdentifier())
- .getTableStatusFilePath();
+ String tableStatusPath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier()).getTableStatusFilePath();
if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
return 0L;
} else {
@@ -102,14 +90,16 @@ public class SegmentStatusManager {
/**
* get valid segment for given table
+ *
* @return
* @throws IOException
*/
- public ValidSegmentsInfo getValidSegments() throws IOException {
+ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
// @TODO: move reading LoadStatus file to separate class
List<String> listOfValidSegments = new ArrayList<String>(10);
List<String> listOfValidUpdatedSegments = new ArrayList<String>(10);
+ List<String> listOfInvalidSegments = new ArrayList<String>(10);
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
@@ -157,8 +147,15 @@ public class SegmentStatusManager {
listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
}
listOfValidSegments.add(loadMetadataDetails.getLoadName());
-
+ } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+ || CarbonCommonConstants.SEGMENT_COMPACTED
+ .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+ || CarbonCommonConstants.MARKED_FOR_DELETE
+ .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
+ listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
}
+
}
}
} catch (IOException e) {
@@ -176,7 +173,8 @@ public class SegmentStatusManager {
}
}
- return new ValidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments);
+ return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
+ listOfInvalidSegments);
}
/**
@@ -218,6 +216,7 @@ public class SegmentStatusManager {
/**
* returns current time
+ *
* @return
*/
private String readCurrentTime() {
@@ -244,6 +243,7 @@ public class SegmentStatusManager {
/**
* updates deletion status
+ *
* @param loadIds
* @param tableFolderPath
* @return
@@ -271,13 +271,10 @@ public class SegmentStatusManager {
listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
- if(invalidLoadIds.isEmpty())
- {
+ if (invalidLoadIds.isEmpty()) {
// All or None , if anything fails then dont write
writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
- }
- else
- {
+ } else {
return invalidLoadIds;
}
@@ -330,13 +327,11 @@ public class SegmentStatusManager {
// read existing metadata details in load metadata.
listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
- updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray,
- invalidLoadTimestamps, loadStartTime);
- if(invalidLoadTimestamps.isEmpty()) {
+ updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps,
+ loadStartTime);
+ if (invalidLoadTimestamps.isEmpty()) {
writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
- }
- else
- {
+ } else {
return invalidLoadTimestamps;
}
@@ -395,6 +390,7 @@ public class SegmentStatusManager {
/**
* updates deletion status details for each load and returns invalidLoadIds
+ *
* @param loadIds
* @param listOfLoadFolderDetailsArray
* @param invalidLoadIds
@@ -504,17 +500,39 @@ public class SegmentStatusManager {
/**
* unlocks given file
+ *
* @param carbonLock
*/
private void fileUnlock(ICarbonLock carbonLock) {
if (carbonLock.unlock()) {
LOG.info("Metadata lock has been successfully released");
} else {
- LOG
- .error("Not able to release the metadata lock");
+ LOG.error("Not able to release the metadata lock");
}
}
+ public static class ValidAndInvalidSegmentsInfo {
+ private final List<String> listOfValidSegments;
+ private final List<String> listOfValidUpdatedSegments;
+ private final List<String> listOfInvalidSegments;
+
+ private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
+ List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) {
+ this.listOfValidSegments = listOfValidSegments;
+ this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
+ this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
+ }
+
+ public List<String> getInvalidSegments() {
+ return listOfInvalidSegments;
+ }
+ public List<String> getValidSegments() {
+ return listOfValidSegments;
+ }
+ public List<String> getUpadtedSegments() {
+ return listOfValidUpdatedSegments;
+ }
+ }
}
[2/2] incubator-carbondata git commit: [CARBONDATA-241] delete
invalid segments from btree. This closes #158
Posted by gv...@apache.org.
[CARBONDATA-241] delete invalid segments from btree. This closes #158
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/381b08a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/381b08a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/381b08a0
Branch: refs/heads/master
Commit: 381b08a01ab3f05448a79b9eecfb476e224d8a00
Parents: 0d9970f 286904e
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Sun Sep 18 02:26:25 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Sun Sep 18 02:26:25 2016 +0530
----------------------------------------------------------------------
.../core/carbon/datastore/BlockIndexStore.java | 60 +++++++++-
.../carbon/datastore/SegmentTaskIndexStore.java | 6 +-
.../apache/carbondata/core/util/CarbonUtil.java | 21 ++++
.../executor/impl/AbstractQueryExecutor.java | 13 +-
.../carbondata/scan/model/QueryModel.java | 16 +++
.../carbondata/hadoop/CarbonInputFormat.java | 120 +++++++++++--------
.../carbondata/spark/rdd/CarbonScanRDD.scala | 20 +++-
.../CompactionSystemLockFeatureTest.scala | 4 +-
.../DataCompactionCardinalityBoundryTest.scala | 2 +-
.../datacompaction/DataCompactionLockTest.scala | 2 +-
.../DataCompactionNoDictionaryTest.scala | 2 +-
.../datacompaction/DataCompactionTest.scala | 4 +-
.../MajorCompactionIgnoreInMinorTest.scala | 4 +-
.../MajorCompactionStopsAfterCompaction.scala | 4 +-
.../lcm/status/SegmentStatusManager.java | 80 ++++++++-----
15 files changed, 247 insertions(+), 111 deletions(-)
----------------------------------------------------------------------