You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/20 16:13:47 UTC
[1/2] carbondata git commit: [CARBONDATA-2361] Refactoring
ReadCommittedStatus
Repository: carbondata
Updated Branches:
refs/heads/master 2fc0ad306 -> c58eb43ba
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 30a620c..6e7f2d6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -80,7 +80,7 @@ public class DataMapWriterListener {
}
List<String> columns = factory.getMeta().getIndexedColumns();
List<DataMapWriter> writers = registry.get(columns);
- DataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
+ DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath);
if (writers != null) {
writers.add(writer);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 5bc85f8..2ac351c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -231,8 +231,8 @@ public final class CarbonDataMergerUtil {
.setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
}
// Update segement file name to status file
- int segmentFileIndex =
- segmentFilesToBeUpdated.indexOf(Segment.toSegment(loadDetail.getLoadName()));
+ int segmentFileIndex = segmentFilesToBeUpdated
+ .indexOf(Segment.toSegment(loadDetail.getLoadName(), null));
if (segmentFileIndex > -1) {
loadDetail.setSegmentFile(
segmentFilesToBeUpdated.get(segmentFileIndex).getSegmentFileName());
@@ -857,9 +857,9 @@ public final class CarbonDataMergerUtil {
//check if this load is an already merged load.
if (null != segment.getMergedLoadName()) {
- segments.add(Segment.toSegment(segment.getMergedLoadName()));
+ segments.add(Segment.toSegment(segment.getMergedLoadName(), null));
} else {
- segments.add(Segment.toSegment(segment.getLoadName()));
+ segments.add(Segment.toSegment(segment.getLoadName(), null));
}
}
return segments;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index aabe91a..2b4748f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -295,7 +295,8 @@ public final class CarbonLoaderUtil {
// if the segments is in the list of marked for delete then update the status.
if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) {
detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
- } else if (segmentFilesTobeUpdated.contains(Segment.toSegment(detail.getLoadName()))) {
+ } else if (segmentFilesTobeUpdated
+ .contains(Segment.toSegment(detail.getLoadName(), null))) {
detail.setSegmentFile(
detail.getLoadName() + "_" + newMetaEntry.getUpdateStatusFileName()
+ CarbonTablePath.SEGMENT_EXT);
[2/2] carbondata git commit: [CARBONDATA-2361] Refactoring
ReadCommittedStatus
Posted by ja...@apache.org.
[CARBONDATA-2361] Refactoring ReadCommittedStatus
Move ReadCommittedStatus into Segment class
This closes #2179
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c58eb43b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c58eb43b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c58eb43b
Branch: refs/heads/master
Commit: c58eb43bacf4dd8918e647d507a74798f26fd3bb
Parents: 2fc0ad3
Author: sounakr <so...@gmail.com>
Authored: Wed Apr 18 09:08:27 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Apr 21 00:13:29 2018 +0800
----------------------------------------------------------------------
.../apache/carbondata/core/datamap/Segment.java | 63 +++++++++++++++++---
.../carbondata/core/datamap/TableDataMap.java | 31 ++++------
.../core/datamap/dev/DataMapFactory.java | 5 +-
.../datamap/dev/expr/AndDataMapExprWrapper.java | 10 ++--
.../datamap/dev/expr/DataMapExprWrapper.java | 4 +-
.../dev/expr/DataMapExprWrapperImpl.java | 7 +--
.../datamap/dev/expr/OrDataMapExprWrapper.java | 10 ++--
.../core/indexstore/BlockletDetailsFetcher.java | 13 +---
.../indexstore/SegmentPropertiesFetcher.java | 4 +-
.../blockletindex/BlockletDataMapFactory.java | 45 ++++++--------
.../core/metadata/SegmentFileStore.java | 4 +-
.../core/mutate/CarbonUpdateUtil.java | 5 +-
.../LatestFilesReadCommittedScope.java | 1 +
.../ReadCommittedIndexFileSnapShot.java | 3 +-
.../core/readcommitter/ReadCommittedScope.java | 3 +-
.../TableStatusReadCommittedScope.java | 8 +++
.../statusmanager/SegmentStatusManager.java | 35 +++++++----
.../apache/carbondata/core/util/CarbonUtil.java | 2 +-
.../examples/MinMaxIndexDataMapFactory.java | 10 +---
.../lucene/LuceneCoarseGrainDataMapFactory.java | 9 +--
.../lucene/LuceneFineGrainDataMapFactory.java | 7 +--
.../hadoop/api/CarbonFileInputFormat.java | 9 ++-
.../hadoop/api/CarbonInputFormat.java | 16 +++--
.../hadoop/api/CarbonOutputCommitter.java | 5 +-
.../hadoop/api/CarbonTableInputFormat.java | 40 +++++++------
.../hadoop/api/DistributableDataMapFormat.java | 7 +--
.../testsuite/datamap/CGDataMapTestCase.scala | 4 +-
.../testsuite/datamap/DataMapWriterSuite.scala | 4 +-
.../testsuite/datamap/FGDataMapTestCase.scala | 4 +-
.../testsuite/datamap/TestDataMapStatus.scala | 6 +-
.../TestInsertAndOtherCommandConcurrent.scala | 4 +-
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 2 +-
.../org/apache/spark/util/PartitionUtils.scala | 7 ++-
.../spark/rdd/AggregateDataMapCompactor.scala | 2 +-
.../spark/rdd/CarbonTableCompactor.scala | 1 +
.../preaaggregate/PreAggregateListeners.scala | 2 +-
.../datasources/SparkCarbonFileFormat.scala | 11 ++--
.../spark/sql/optimizer/CarbonFilters.scala | 2 +-
.../datamap/DataMapWriterListener.java | 2 +-
.../processing/merger/CarbonDataMergerUtil.java | 8 +--
.../processing/util/CarbonLoaderUtil.java | 3 +-
41 files changed, 222 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index a2a2a41..0eb82ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -16,11 +16,14 @@
*/
package org.apache.carbondata.core.datamap;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -36,9 +39,47 @@ public class Segment implements Serializable {
private String segmentFileName;
+ /**
+ * Points to the Read Committed Scope of the segment. This is a flavor of
+ * transactional isolation level which only allows snapshot read of the
+ * data and make non committed data invisible to the reader.
+ */
+ private ReadCommittedScope readCommittedScope;
+
+ /**
+ * ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw
+ * a NullPointerException. In case getCommittedIndexFile is need to be accessed then
+ * use the other constructor and pass proper ReadCommittedScope.
+ * @param segmentNo
+ * @param segmentFileName
+ */
public Segment(String segmentNo, String segmentFileName) {
this.segmentNo = segmentNo;
this.segmentFileName = segmentFileName;
+ this.readCommittedScope = null;
+ }
+
+ /**
+ *
+ * @param segmentNo
+ * @param segmentFileName
+ * @param readCommittedScope
+ */
+ public Segment(String segmentNo, String segmentFileName, ReadCommittedScope readCommittedScope) {
+ this.segmentNo = segmentNo;
+ this.segmentFileName = segmentFileName;
+ this.readCommittedScope = readCommittedScope;
+ }
+
+ /**
+ *
+ * @return map of Absolute path of index file as key and null as value -- without mergeIndex
+ * map of AbsolutePath with fileName of MergeIndex parent file as key and mergeIndexFileName
+ * as value -- with mergeIndex
+ * @throws IOException
+ */
+ public Map<String, String> getCommittedIndexFile() throws IOException {
+ return readCommittedScope.getCommittedIndexFile(this);
}
public String getSegmentNo() {
@@ -49,35 +90,39 @@ public class Segment implements Serializable {
return segmentFileName;
}
- public static List<Segment> toSegmentList(String[] segmentIds) {
+ public static List<Segment> toSegmentList(String[] segmentIds,
+ ReadCommittedScope readCommittedScope) {
List<Segment> list = new ArrayList<>(segmentIds.length);
for (String segmentId : segmentIds) {
- list.add(toSegment(segmentId));
+ list.add(toSegment(segmentId, readCommittedScope));
}
return list;
}
- public static List<Segment> toSegmentList(List<String> segmentIds) {
+ public static List<Segment> toSegmentList(List<String> segmentIds,
+ ReadCommittedScope readCommittedScope) {
List<Segment> list = new ArrayList<>(segmentIds.size());
for (String segmentId : segmentIds) {
- list.add(toSegment(segmentId));
+ list.add(toSegment(segmentId, readCommittedScope));
}
return list;
}
/**
- * SegmentId can be combination of segmentNo and segmentFileName
+ * readCommittedScope provide Read Snapshot isolation.
* @param segmentId
+ * @param readCommittedScope
* @return
*/
- public static Segment toSegment(String segmentId) {
+ public static Segment toSegment(String segmentId, ReadCommittedScope readCommittedScope) {
+ // SegmentId can be combination of segmentNo and segmentFileName.
String[] split = segmentId.split("#");
if (split.length > 1) {
- return new Segment(split[0], split[1]);
+ return new Segment(split[0], split[1], readCommittedScope);
} else if (split.length > 0) {
- return new Segment(split[0], null);
+ return new Segment(split[0], null, readCommittedScope);
}
- return new Segment(segmentId, null);
+ return new Segment(segmentId, null, readCommittedScope);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 8143b1c..2dc6317 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.events.Event;
import org.apache.carbondata.events.OperationContext;
@@ -80,29 +79,26 @@ public final class TableDataMap extends OperationEventListener {
*
* @param segments
* @param filterExp
- * @param readCommittedScope
* @return
*/
public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp,
- List<PartitionSpec> partitions, ReadCommittedScope readCommittedScope) throws IOException {
+ List<PartitionSpec> partitions) throws IOException {
List<ExtendedBlocklet> blocklets = new ArrayList<>();
SegmentProperties segmentProperties;
for (Segment segment : segments) {
List<Blocklet> pruneBlocklets = new ArrayList<>();
// if filter is not passed then return all the blocklets
if (filterExp == null) {
- pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions,
- readCommittedScope);
+ pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
} else {
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment, readCommittedScope);
- segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment,
- readCommittedScope);
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
+ segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
for (DataMap dataMap : dataMaps) {
pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
}
}
blocklets.addAll(addSegmentId(
- blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment, readCommittedScope),
+ blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
segment.getSegmentNo()));
}
return blocklets;
@@ -143,19 +139,16 @@ public final class TableDataMap extends OperationEventListener {
*
* @param distributable
* @param filterExp
- * @param readCommittedScope
* @return
*/
public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
- FilterResolverIntf filterExp, List<PartitionSpec> partitions,
- ReadCommittedScope readCommittedScope) throws IOException {
+ FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException {
List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
List<Blocklet> blocklets = new ArrayList<>();
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable, readCommittedScope);
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
for (DataMap dataMap : dataMaps) {
blocklets.addAll(dataMap.prune(filterExp,
- segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment(),
- readCommittedScope),
+ segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()),
partitions));
}
BlockletSerializer serializer = new BlockletSerializer();
@@ -167,7 +160,7 @@ public final class TableDataMap extends OperationEventListener {
}
for (Blocklet blocklet : blocklets) {
ExtendedBlocklet detailedBlocklet = blockletDetailsFetcher
- .getExtendedBlocklet(blocklet, distributable.getSegment(), readCommittedScope);
+ .getExtendedBlocklet(blocklet, distributable.getSegment());
if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
String blockletwritePath =
writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
@@ -221,16 +214,14 @@ public final class TableDataMap extends OperationEventListener {
*
* @param segments
* @param filterExp
- * @param readCommittedScope
* @return
* @throws IOException
*/
- public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp,
- ReadCommittedScope readCommittedScope)
+ public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp)
throws IOException {
List<Segment> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (Segment segment : segments) {
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment, readCommittedScope);
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
for (DataMap dataMap : dataMaps) {
if (dataMap.isScanRequired(filterExp)) {
// If any one task in a given segment contains the data that means the segment need to
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 70f2772..a315ce6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.events.Event;
/**
@@ -48,12 +47,12 @@ public interface DataMapFactory<T extends DataMap> {
/**
* Get the datamap for segmentid
*/
- List<T> getDataMaps(Segment segment, ReadCommittedScope readCommittedScope) throws IOException;
+ List<T> getDataMaps(Segment segment) throws IOException;
/**
* Get datamaps for distributable object.
*/
- List<T> getDataMaps(DataMapDistributable distributable, ReadCommittedScope readCommittedScope)
+ List<T> getDataMaps(DataMapDistributable distributable)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
index 850e08a..c573dcb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
/**
@@ -46,11 +45,10 @@ public class AndDataMapExprWrapper implements DataMapExprWrapper {
}
@Override
- public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
- ReadCommittedScope readCommittedScope) throws IOException {
- List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune, readCommittedScope);
- List<ExtendedBlocklet> rightPrune =
- right.prune(segments, partitionsToPrune, readCommittedScope);
+ public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
+ throws IOException {
+ List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
+ List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
for (ExtendedBlocklet blocklet : leftPrune) {
if (rightPrune.contains(blocklet)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
index b5fb173..14cfc33 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
/**
@@ -37,8 +36,7 @@ public interface DataMapExprWrapper extends Serializable {
* It get the blocklets from each leaf node datamap and apply expressions on the blocklets
* using list of segments, it is used in case on non distributable datamap.
*/
- List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
- ReadCommittedScope readCommittedScope)
+ List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index ffd4f80..f9518ba 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
public class DataMapExprWrapperImpl implements DataMapExprWrapper {
@@ -47,9 +46,9 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
}
@Override
- public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
- ReadCommittedScope readCommittedScope) throws IOException {
- return dataMap.prune(segments, expression, partitionsToPrune, readCommittedScope);
+ public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
+ throws IOException {
+ return dataMap.prune(segments, expression, partitionsToPrune);
}
@Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
index 0667d0a..93dd242 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
/**
@@ -48,11 +47,10 @@ public class OrDataMapExprWrapper implements DataMapExprWrapper {
}
@Override
- public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
- ReadCommittedScope readCommittedScope) throws IOException {
- List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune, readCommittedScope);
- List<ExtendedBlocklet> rightPrune =
- right.prune(segments, partitionsToPrune, readCommittedScope);
+ public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
+ throws IOException {
+ List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
+ List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
andBlocklets.addAll(leftPrune);
andBlocklets.addAll(rightPrune);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index cf283f2..58c11db 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
/**
* Fetches the detailed blocklet which has more information to execute the query
@@ -32,12 +31,10 @@ public interface BlockletDetailsFetcher {
*
* @param blocklets
* @param segment
- * @param readCommittedScope
* @return
* @throws IOException
*/
- List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment,
- ReadCommittedScope readCommittedScope)
+ List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
throws IOException;
/**
@@ -45,21 +42,17 @@ public interface BlockletDetailsFetcher {
*
* @param blocklet
* @param segment
- * @param readCommittedScope
* @return
* @throws IOException
*/
- ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment,
- ReadCommittedScope readCommittedScope) throws IOException;
+ ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException;
/**
* Get all the blocklets in a segment
*
* @param segment
- * @param readCommittedScope
* @return
*/
- List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions,
- ReadCommittedScope readCommittedScope)
+ List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
throws IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
index d464083..b7fb98c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
/**
* Fetches the detailed segmentProperties which has more information to execute the query
@@ -31,10 +30,9 @@ public interface SegmentPropertiesFetcher {
/**
* get the Segment properties based on the SegmentID.
* @param segmentId
- * @param readCommittedScope
* @return
* @throws IOException
*/
- SegmentProperties getSegmentProperties(Segment segment, ReadCommittedScope readCommittedScope)
+ SegmentProperties getSegmentProperties(Segment segment)
throws IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 4c8ac0c..caac733 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -45,7 +45,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
@@ -84,24 +83,20 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
throw new UnsupportedOperationException("not implemented");
}
- @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment,
- ReadCommittedScope readCommittedScope) throws IOException {
+ @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
- getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
+ getTableBlockIndexUniqueIdentifiers(segment);
return cache.getAll(tableBlockIndexUniqueIdentifiers);
}
- private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment,
- ReadCommittedScope readCommittedScope) throws IOException {
- if (readCommittedScope == null) {
- throw new IOException("readCommittedScope is null. Internal error");
- }
+ private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
+ throws IOException {
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segment.getSegmentNo());
if (tableBlockIndexUniqueIdentifiers == null) {
tableBlockIndexUniqueIdentifiers = new ArrayList<>();
- Map<String, String> indexFiles = readCommittedScope.getCommittedIndexFile(segment);
- for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
+ Map<String, String> indexFiles = segment.getCommittedIndexFile();
+ for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
Path indexFile = new Path(indexFileEntry.getKey());
tableBlockIndexUniqueIdentifiers.add(
new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
@@ -118,8 +113,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
* default datamap.
*/
@Override
- public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment,
- ReadCommittedScope readCommittedScope)
+ public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
throws IOException {
List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
// If it is already detailed blocklet then type cast and return same
@@ -130,7 +124,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
return detailedBlocklets;
}
List<TableBlockIndexUniqueIdentifier> identifiers =
- getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
+ getTableBlockIndexUniqueIdentifiers(segment);
// Retrieve each blocklets detail information from blocklet datamap
for (Blocklet blocklet : blocklets) {
detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
@@ -139,14 +133,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
}
@Override
- public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment,
- ReadCommittedScope readCommittedScope)
+ public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
throws IOException {
if (blocklet instanceof ExtendedBlocklet) {
return (ExtendedBlocklet) blocklet;
}
List<TableBlockIndexUniqueIdentifier> identifiers =
- getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
+ getTableBlockIndexUniqueIdentifiers(segment);
return getExtendedBlocklet(identifiers, blocklet);
}
@@ -221,13 +214,12 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
@Override
public void clear() {
for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
- clear(new Segment(segmentId, null));
+ clear(new Segment(segmentId, null, null));
}
}
@Override
- public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
- ReadCommittedScope readCommittedScope)
+ public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
@@ -267,9 +259,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
}
- @Override public SegmentProperties getSegmentProperties(Segment segment,
- ReadCommittedScope readCommittedScope) throws IOException {
- List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);
+ @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
+ List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
assert (dataMaps.size() > 0);
CoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
assert (coarseGrainDataMap instanceof BlockletDataMap);
@@ -277,13 +268,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
return dataMap.getSegmentProperties();
}
- @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions,
- ReadCommittedScope readCommittedScope) throws IOException {
+ @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
+ throws IOException {
List<Blocklet> blocklets = new ArrayList<>();
- List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);
+ List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
for (CoarseGrainDataMap dataMap : dataMaps) {
blocklets.addAll(
- dataMap.prune(null, getSegmentProperties(segment, readCommittedScope), partitions));
+ dataMap.prune(null, getSegmentProperties(segment), partitions));
}
return blocklets;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index d609c56..dff496b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -586,8 +586,8 @@ public class SegmentFileStore {
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments().getValidSegments());
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
- Segment.toSegmentList(toBeDeleteSegments), Segment.toSegmentList(toBeUpdatedSegments),
- uuid);
+ Segment.toSegmentList(toBeDeleteSegments, null),
+ Segment.toSegmentList(toBeUpdatedSegments, null), uuid);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 93e5580..1d0ef44 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -248,7 +248,8 @@ public class CarbonUpdateUtil {
// update end timestamp for each time.
loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
}
- if (segmentFilesTobeUpdated.contains(Segment.toSegment(loadMetadata.getLoadName()))) {
+ if (segmentFilesTobeUpdated
+ .contains(Segment.toSegment(loadMetadata.getLoadName(), null))) {
loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp
+ CarbonTablePath.SEGMENT_EXT);
}
@@ -553,7 +554,7 @@ public class CarbonUpdateUtil {
}
}
if (updateSegmentFile) {
- segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName()));
+ segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName(), null));
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 631d12c..6afa280 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -42,6 +42,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
private String carbonFilePath;
private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
private LoadMetadataDetails[] loadMetadataDetails;
+
public LatestFilesReadCommittedScope(String path) {
this.carbonFilePath = path;
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
index 4491a29..3e8e04f 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.readcommitter;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -29,7 +30,7 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
*/
@InterfaceAudience.Internal
@InterfaceStability.Evolving
-public class ReadCommittedIndexFileSnapShot {
+public class ReadCommittedIndexFileSnapShot implements Serializable {
/**
* Segment Numbers are mapped with list of Index Files.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index f6ba78e..9ae462b 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.readcommitter;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -29,7 +30,7 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
-public interface ReadCommittedScope {
+public interface ReadCommittedScope extends Serializable {
public LoadMetadataDetails[] getSegmentList() throws IOException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 4f54241..bc4a90d 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -35,7 +35,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
@InterfaceAudience.Internal
@InterfaceStability.Stable
public class TableStatusReadCommittedScope implements ReadCommittedScope {
+
private LoadMetadataDetails[] loadMetadataDetails;
+
private AbsoluteTableIdentifier identifier;
public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier) throws IOException {
@@ -43,6 +45,12 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
takeCarbonIndexFileSnapShot();
}
+ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
+ LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+ this.identifier = identifier;
+ this.loadMetadataDetails = loadMetadataDetails;
+ }
+
@Override public LoadMetadataDetails[] getSegmentList() throws IOException {
try {
if (loadMetadataDetails == null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index be53f2b..a4011bc 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -47,6 +47,8 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DeleteLoadFolders;
@@ -98,15 +100,20 @@ public class SegmentStatusManager {
* @throws IOException
*/
public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
- return getValidAndInvalidSegments(null);
+ return getValidAndInvalidSegments(null, null);
+ }
+
+ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
+ LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+ return getValidAndInvalidSegments(loadMetadataDetails, null);
}
/**
* get valid segment for given load status details.
- *
*/
public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
- LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+ LoadMetadataDetails[] loadMetadataDetails, ReadCommittedScope readCommittedScope)
+ throws IOException {
// @TODO: move reading LoadStatus file to separate class
List<Segment> listOfValidSegments = new ArrayList<>(10);
@@ -120,6 +127,10 @@ public class SegmentStatusManager {
loadMetadataDetails = readTableStatusFile(
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
}
+
+ if (readCommittedScope == null) {
+ readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails);
+ }
//just directly iterate Array
for (LoadMetadataDetails segment : loadMetadataDetails) {
if (SegmentStatus.SUCCESS == segment.getSegmentStatus()
@@ -129,7 +140,8 @@ public class SegmentStatusManager {
|| SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
// check for merged loads.
if (null != segment.getMergedLoadName()) {
- Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile());
+ Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile(),
+ readCommittedScope);
if (!listOfValidSegments.contains(seg)) {
listOfValidSegments.add(seg);
}
@@ -142,24 +154,25 @@ public class SegmentStatusManager {
if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
- listOfValidUpdatedSegments
- .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+ listOfValidUpdatedSegments.add(
+ new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
}
if (SegmentStatus.STREAMING == segment.getSegmentStatus()
|| SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
- listOfStreamSegments
- .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+ listOfStreamSegments.add(
+ new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
continue;
}
- listOfValidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+ listOfValidSegments.add(
+ new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
} else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
|| SegmentStatus.COMPACTED == segment.getSegmentStatus()
|| SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
listOfInvalidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
} else if (SegmentStatus.INSERT_IN_PROGRESS == segment.getSegmentStatus() ||
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segment.getSegmentStatus()) {
- listOfInProgressSegments
- .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+ listOfInProgressSegments.add(
+ new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 09692e6..91b35f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2440,7 +2440,7 @@ public final class CarbonUtil {
}
for (String value : values) {
if (!value.equalsIgnoreCase("*")) {
- Segment segment = Segment.toSegment(value);
+ Segment segment = Segment.toSegment(value, null);
Float aFloatValue = Float.parseFloat(segment.getSegmentNo());
if (aFloatValue < 0 || aFloatValue > Float.MAX_VALUE) {
throw new InvalidConfigurationException(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 01aaffa..7e43610 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -37,7 +37,6 @@ import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
@@ -98,13 +97,11 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
* getDataMaps Factory method Initializes the Min Max Data Map and returns.
*
* @param segment
- * @param readCommittedScope
* @return
* @throws IOException
*/
@Override
- public List<CoarseGrainDataMap> getDataMaps(Segment segment,
- ReadCommittedScope readCommittedScope)
+ public List<CoarseGrainDataMap> getDataMaps(Segment segment)
throws IOException {
List<CoarseGrainDataMap> dataMapList = new ArrayList<>();
// Form a dataMap of Type MinMaxIndexDataMap.
@@ -144,10 +141,9 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
@Override public void clear() {
}
- @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
- ReadCommittedScope readCommittedScope)
+ @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
- return getDataMaps(distributable.getSegment(), readCommittedScope);
+ return getDataMaps(distributable.getSegment());
}
@Override public void fireEvent(Event event) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index f63656b..e8c740d 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
/**
* FG level of lucene DataMap
@@ -44,8 +43,7 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
* Get the datamap for segmentid
*/
@Override
- public List<CoarseGrainDataMap> getDataMaps(Segment segment,
- ReadCommittedScope readCommittedScope) throws IOException {
+ public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
List<CoarseGrainDataMap> lstDataMap = new ArrayList<>();
CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer);
try {
@@ -64,10 +62,9 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
* Get datamaps for distributable object.
*/
@Override
- public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
- ReadCommittedScope readCommittedScope)
+ public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
- return getDataMaps(distributable.getSegment(), readCommittedScope);
+ return getDataMaps(distributable.getSegment());
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index 151e674..9026fbc 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
/**
* CG level of lucene DataMap
@@ -39,8 +38,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
/**
* Get the datamap for segmentid
*/
- @Override public List<FineGrainDataMap> getDataMaps(Segment segment,
- ReadCommittedScope readCommittedScope) throws IOException {
+ @Override public List<FineGrainDataMap> getDataMaps(Segment segment) throws IOException {
List<FineGrainDataMap> lstDataMap = new ArrayList<>();
FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
try {
@@ -59,8 +57,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
* Get datamaps for distributable object.
*/
@Override
- public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable,
- ReadCommittedScope readCommittedScope)
+ public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
List<FineGrainDataMap> lstDataMap = new ArrayList<>();
FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 214e534..3dac3bb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -131,7 +131,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
if (FileFactory.isFileExist(segmentDir, fileType)) {
// if external table Segments are found, add it to the List
List<Segment> externalTableSegments = new ArrayList<Segment>();
- Segment seg = new Segment("null", null);
+ Segment seg = new Segment("null", null, readCommittedScope);
externalTableSegments.add(seg);
Map<String, String> indexFiles =
@@ -142,8 +142,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
}
// do block filtering and get split
List<InputSplit> splits =
- getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null,
- readCommittedScope);
+ getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
return splits;
}
@@ -161,7 +160,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
*/
private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
- List<Integer> oldPartitionIdList, ReadCommittedScope readCommittedScope) throws IOException {
+ List<Integer> oldPartitionIdList) throws IOException {
numSegments = validSegments.size();
List<InputSplit> result = new LinkedList<InputSplit>();
@@ -175,7 +174,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
// for each segment fetch blocks matching filter in Driver BTree
List<CarbonInputSplit> dataBlocksOfSegment =
getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
- validSegments, partitionInfo, oldPartitionIdList, readCommittedScope);
+ validSegments, partitionInfo, oldPartitionIdList);
numBlocks = dataBlocksOfSegment.size();
for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index b1d7603..8016d90 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -42,7 +42,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.UpdateVO;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.SingleTableProvider;
import org.apache.carbondata.core.scan.filter.TableProvider;
@@ -238,8 +237,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
String segmentNumbersFromProperty = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
if (!segmentNumbersFromProperty.trim().equals("*")) {
- CarbonInputFormat
- .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
+ CarbonInputFormat.setSegmentsToAccess(conf,
+ Segment.toSegmentList(segmentNumbersFromProperty.split(","), null));
}
}
@@ -249,7 +248,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
public static void setQuerySegment(Configuration conf, String segmentList) {
if (!segmentList.trim().equals("*")) {
CarbonInputFormat
- .setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(",")));
+ .setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(","), null));
}
}
@@ -335,10 +334,9 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
/**
* get data blocks of given segment
*/
- protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
- CarbonTable carbonTable, FilterResolverIntf resolver,
- BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
- List<Integer> oldPartitionIdList, ReadCommittedScope readCommittedScope) throws IOException {
+ protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable,
+ FilterResolverIntf resolver, BitSet matchedPartitions, List<Segment> segmentIds,
+ PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) throws IOException {
QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
QueryStatistic statistic = new QueryStatistic();
@@ -362,7 +360,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
// Apply expression on the blocklets.
prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
} else {
- prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune, readCommittedScope);
+ prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
}
List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 0add279..6f65d7d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -179,7 +179,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
String segmentsToBeDeleted =
context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
- List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","));
+ List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
Set<Segment> segmentSet = new HashSet<>(
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments().getValidSegments());
@@ -230,7 +230,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
newMetaEntry.setUpdateStatusFileName(uniqueId);
// Commit the removed partitions in carbon store.
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid,
- Segment.toSegmentList(tobeDeletedSegs), Segment.toSegmentList(tobeUpdatedSegs));
+ Segment.toSegmentList(tobeDeletedSegs, null),
+ Segment.toSegmentList(tobeUpdatedSegs, null));
return uniqueId;
}
return null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 06ada3d..7b5b8d1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -149,8 +149,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
List<Segment> streamSegments = null;
// get all valid segments and set them into the configuration
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
- SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
- segmentStatusManager.getValidAndInvalidSegments(loadMetadataDetails);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
+ .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
// to check whether only streaming segments access is enabled or not,
// if access streaming segment is true then data will be read from streaming segments
boolean accessStreamingSegments = getAccessStreamingSegments(job.getConfiguration());
@@ -158,12 +158,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
if (!accessStreamingSegments) {
List<Segment> validSegments = segments.getValidSegments();
streamSegments = segments.getStreamSegments();
- streamSegments = getFilteredSegment(job, streamSegments, true);
+ streamSegments = getFilteredSegment(job, streamSegments, true, readCommittedScope);
if (validSegments.size() == 0) {
return getSplitsOfStreaming(job, identifier, streamSegments);
}
List<Segment> filteredSegmentToAccess =
- getFilteredSegment(job, segments.getValidSegments(), true);
+ getFilteredSegment(job, segments.getValidSegments(), true, readCommittedScope);
if (filteredSegmentToAccess.size() == 0) {
return getSplitsOfStreaming(job, identifier, streamSegments);
} else {
@@ -171,7 +171,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
}
} else {
List<Segment> filteredNormalSegments =
- getFilteredNormalSegments(job, segments.getValidSegments(), getSegmentsToAccess(job));
+ getFilteredNormalSegments(job, segments.getValidSegments(),
+ getSegmentsToAccess(job, readCommittedScope));
streamSegments = segments.getStreamSegments();
if (filteredNormalSegments.size() == 0) {
return getSplitsOfStreaming(job, identifier, streamSegments);
@@ -195,7 +196,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
validAndInProgressSegments.addAll(segments.getListOfInProgressSegments());
// get updated filtered list
List<Segment> filteredSegmentToAccess =
- getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), false);
+ getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), false,
+ readCommittedScope);
// Clean the updated segments from memory if the update happens on segments
List<Segment> toBeCleanedSegments = new ArrayList<>();
for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
@@ -291,8 +293,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
* `INPUT_SEGMENT_NUMBERS` in job configuration
*/
private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments,
- boolean validationRequired) {
- Segment[] segmentsToAccess = getSegmentsToAccess(job);
+ boolean validationRequired, ReadCommittedScope readCommittedScope) {
+ Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope);
List<Segment> segmentToAccessSet =
new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess)));
List<Segment> filteredSegmentToAccess = new ArrayList<>();
@@ -418,16 +420,16 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
List<Segment> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
- List<Segment> segmentList = new ArrayList<>();
- segmentList.add(new Segment(targetSegment, null));
- setSegmentsToAccess(job.getConfiguration(), segmentList);
-
try {
carbonTable = getOrCreateCarbonTable(job.getConfiguration());
ReadCommittedScope readCommittedScope =
getReadCommitted(job, carbonTable.getAbsoluteTableIdentifier());
this.readCommittedScope = readCommittedScope;
+ List<Segment> segmentList = new ArrayList<>();
+ segmentList.add(new Segment(targetSegment, null, readCommittedScope));
+ setSegmentsToAccess(job.getConfiguration(), segmentList);
+
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
@@ -522,7 +524,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
// for each segment fetch blocks matching filter in Driver BTree
List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
- validSegments, partitionInfo, oldPartitionIdList, readCommittedScope);
+ validSegments, partitionInfo, oldPartitionIdList);
numBlocks = dataBlocksOfSegment.size();
for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
@@ -557,12 +559,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
/**
* return valid segment to access
*/
- public Segment[] getSegmentsToAccess(JobContext job) {
+ public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope) {
String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
if (segmentString.trim().isEmpty()) {
return new Segment[0];
}
- List<Segment> segments = Segment.toSegmentList(segmentString.split(","));
+ List<Segment> segments = Segment.toSegmentList(segmentString.split(","), readCommittedScope);
return segments.toArray(new Segment[segments.size()]);
}
@@ -580,15 +582,17 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
table, loadMetadataDetails);
SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
- new SegmentStatusManager(identifier).getValidAndInvalidSegments(loadMetadataDetails);
+ new SegmentStatusManager(identifier)
+ .getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope);
Map<String, Long> blockRowCountMapping = new HashMap<>();
Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
// TODO: currently only batch segment is supported, add support for streaming table
- List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments(), false);
+ List<Segment> filteredSegment =
+ getFilteredSegment(job, allSegments.getValidSegments(), false, readCommittedScope);
List<ExtendedBlocklet> blocklets =
- blockletMap.prune(filteredSegment, null, partitions, readCommittedScope);
+ blockletMap.prune(filteredSegment, null, partitions);
for (ExtendedBlocklet blocklet : blocklets) {
String blockName = blocklet.getPath();
blockName = CarbonTablePath.getCarbonDataFileName(blockName);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index deeeabe..213c5a5 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -30,8 +30,6 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
@@ -108,11 +106,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
TableDataMap dataMap = DataMapStoreManager.getInstance()
.getDataMap(table, distributable.getDistributable().getDataMapSchema());
- ReadCommittedScope readCommittedScope =
- new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier());
List<ExtendedBlocklet> blocklets = dataMap.prune(distributable.getDistributable(),
- dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions,
- readCommittedScope);
+ dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
for (ExtendedBlocklet blocklet : blocklets) {
blocklet.setDataMapUniqueId(distributable.getUniqueId());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 6087727..ae21416 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -70,7 +70,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
/**
* Get the datamap for segmentid
*/
- override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = {
+ override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = {
val file = FileFactory.getCarbonFile(
CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
@@ -88,7 +88,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
/**
* Get datamaps for distributable object.
*/
- override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = {
+ override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
val dataMap: CoarseGrainDataMap = new CGDataMap()
dataMap.init(new DataMapModel(mapDistributable.getFilePath))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 5fa5209..329c888 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -55,9 +55,9 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
override def clear(): Unit = {}
- override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
+ override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
- override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
+ override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
override def createWriter(segment: Segment, dataWritePath: String): DataMapWriter =
DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 551f9e1..d33191c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -72,7 +72,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
/**
* Get the datamap for segmentid
*/
- override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap] = {
+ override def getDataMaps(segment: Segment): java.util.List[FineGrainDataMap] = {
val file = FileFactory
.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
@@ -89,7 +89,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
/**
* Get datamap for distributable object.
*/
- override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap]= {
+ override def getDataMaps(distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
val dataMap: FineGrainDataMap = new FGDataMap()
dataMap.init(new DataMapModel(mapDistributable.getFilePath))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 280c20d..e19c4b7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -185,13 +185,11 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
override def clear(): Unit = {}
- override def getDataMaps(distributable: DataMapDistributable,
- readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = {
+ override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = {
???
}
- override def getDataMaps(segment: Segment,
- readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = {
+ override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = {
???
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 5c9709c..55e9bac 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -293,9 +293,9 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
override def clear(): Unit = {}
- override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
+ override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
- override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
+ override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = {
new DataMapWriter(identifier, segment, writeDirectoryPath) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index f69e237..1f3decc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -203,7 +203,7 @@ class StreamHandoffRDD[K, V](
val job = Job.getInstance(FileFactory.getConfiguration)
val inputFormat = new CarbonTableInputFormat[Array[Object]]()
val segmentList = new util.ArrayList[Segment](1)
- segmentList.add(Segment.toSegment(handOffSegmentId))
+ segmentList.add(Segment.toSegment(handOffSegmentId, null))
val splits = inputFormat.getSplitsOfStreaming(
job,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 22a0112..15e82fe 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -211,9 +212,11 @@ object PartitionUtils {
identifier.getTablePath,
alterPartitionModel.segmentId,
alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
- val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file)).asJava
+ val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file, null))
+ .asJava
if (!CarbonUpdateUtil.updateTableMetadataStatus(
- new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId, null)).asJava),
+ new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId,
+ null, null)).asJava),
carbonTable,
alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString,
true,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index dd2b097..1487277 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -52,7 +52,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
// therefore the segment file name should be loadName#segmentFileName.segment
val segments = loadMetaDataDetails.asScala.map {
loadDetail =>
- new Segment(loadDetail.getLoadName, loadDetail.getSegmentFile).toString
+ new Segment(loadDetail.getLoadName, loadDetail.getSegmentFile, null).toString
}
if (segments.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 91e2335..626a172 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.readcommitter.{ReadCommittedScope, TableStatusReadCommittedScope}
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index c86fe5a..86a235a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -451,7 +451,7 @@ object LoadPostAggregateListener extends OperationEventListener {
val segment = if (currentSegmentFile != null) {
new Segment(carbonLoadModel.getSegmentId, currentSegmentFile.toString)
} else {
- Segment.toSegment(carbonLoadModel.getSegmentId)
+ Segment.toSegment(carbonLoadModel.getSegmentId, null)
}
(TableIdentifier(table.getTableName, Some(table.getDatabaseName)), segment.toString)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 788a820..942a21b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -216,15 +216,16 @@ class SparkCarbonFileFormat extends FileFormat
val model = format.createQueryModel(split, attemptContext)
- var segments = new java.util.ArrayList[Segment]()
- val seg = new Segment("null", null)
- segments.add(seg)
var partition : java.util.List[PartitionSpec] = new java.util.ArrayList[PartitionSpec]()
-
val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null")
val readCommittedScope = new LatestFilesReadCommittedScope(
identifier.getTablePath + "/Fact/Part0/Segment_null/")
+
+ var segments = new java.util.ArrayList[Segment]()
+ val seg = new Segment("null", null, readCommittedScope)
+ segments.add(seg)
+
val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath)
if (indexFiles.size() == 0) {
throw new SparkException("Index file not present to read the carbondata file")
@@ -236,7 +237,7 @@ class SparkCarbonFileFormat extends FileFormat
.choose(tab, model.getFilterExpressionResolverTree)
// TODO : handle the partition for CarbonFileLevelFormat
- val prunedBlocklets = dataMapExprWrapper.prune(segments, null, readCommittedScope)
+ val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
val detailInfo = prunedBlocklets.get(0).getDetailInfo
detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index e565e68..2290941 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -528,7 +528,7 @@ object CarbonFilters {
// In case of compaction multiple segments will be passed as CARBON_INPUT_SEGMENTS.
// Therefore partitionSpec will be extracted from all segments.
val segments = segmentNumbersFromProperty.split(",").flatMap { a =>
- val segment = Segment.toSegment(a)
+ val segment = Segment.toSegment(a, null)
val segmentFile = new SegmentFileStore(table.getTablePath, segment.getSegmentFileName)
segmentFile.getPartitionSpecs.asScala
}