You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 03:28:52 UTC
[32/49] carbondata git commit: [REBASE] resolve conflict after
rebasing to master
[REBASE] resolve conflict after rebasing to master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a6bf77ff
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a6bf77ff
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a6bf77ff
Branch: refs/heads/carbonstore-rebase4
Commit: a6bf77ff2a2bbf36e139581f075a020425754c5b
Parents: 43e34fc
Author: Jacky Li <ja...@qq.com>
Authored: Tue Feb 27 08:51:25 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Feb 27 08:51:25 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/datamap/TableDataMap.java | 2 +-
.../core/datamap/dev/AbstractDataMapWriter.java | 5 ++--
.../core/datamap/dev/DataMapFactory.java | 2 +-
.../core/indexstore/BlockletDetailsFetcher.java | 2 +-
.../indexstore/SegmentPropertiesFetcher.java | 3 +-
.../blockletindex/BlockletDataMap.java | 2 +-
.../blockletindex/BlockletDataMapFactory.java | 21 ++++++-------
.../core/metadata/SegmentFileStore.java | 2 +-
.../RowLevelRangeGrtThanFiterExecuterImpl.java | 1 +
...elRangeGrtrThanEquaToFilterExecuterImpl.java | 1 +
...velRangeLessThanEqualFilterExecuterImpl.java | 1 +
.../RowLevelRangeLessThanFiterExecuterImpl.java | 1 +
.../SegmentUpdateStatusManager.java | 26 ++++------------
.../apache/carbondata/core/util/CarbonUtil.java | 16 ++++------
.../testsuite/datamap/CGDataMapTestCase.scala | 26 ++++++++--------
.../testsuite/datamap/DataMapWriterSuite.scala | 19 ++++++------
.../testsuite/datamap/FGDataMapTestCase.scala | 31 +++++++++-----------
.../iud/DeleteCarbonTableTestCase.scala | 2 +-
.../TestInsertAndOtherCommandConcurrent.scala | 14 +++++----
.../StandardPartitionTableCleanTestCase.scala | 12 ++++----
.../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +-
.../carbondata/spark/util/DataLoadingUtil.scala | 4 +--
.../CreatePreAggregateTableCommand.scala | 2 +-
.../apache/spark/sql/hive/CarbonRelation.scala | 3 +-
.../datamap/DataMapWriterListener.java | 2 +-
.../loading/model/CarbonLoadModel.java | 2 +-
.../processing/merger/CarbonDataMergerUtil.java | 15 +++-------
.../merger/CompactionResultSortProcessor.java | 6 ++--
.../merger/RowResultMergerProcessor.java | 6 ++--
.../partition/spliter/RowResultProcessor.java | 3 +-
.../util/CarbonDataProcessorUtil.java | 4 +--
.../processing/util/CarbonLoaderUtil.java | 2 +-
32 files changed, 104 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index eed650e3..2a6ceaa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -143,7 +143,7 @@ public final class TableDataMap extends OperationEventListener {
blocklets.addAll(
dataMap.prune(
filterExp,
- segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId()),
+ segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()),
partitions));
}
BlockletSerializer serializer = new BlockletSerializer();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
index bcc9bad..de6dcb1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap.dev;
import java.io.IOException;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -35,10 +36,10 @@ public abstract class AbstractDataMapWriter {
protected String writeDirectoryPath;
- public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId,
+ public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
String writeDirectoryPath) {
this.identifier = identifier;
- this.segmentId = segmentId;
+ this.segmentId = segment.getSegmentNo();
this.writeDirectoryPath = writeDirectoryPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index df5670d..50ac279 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -39,7 +39,7 @@ public interface DataMapFactory<T extends DataMap> {
/**
* Return a new write for this datamap
*/
- AbstractDataMapWriter createWriter(Segment segment);
+ AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath);
/**
* Get the datamap for segmentid
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 5a5fc1e..dd592c0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -53,5 +53,5 @@ public interface BlockletDetailsFetcher {
* @param segment
* @return
*/
- List<Blocklet> getAllBlocklets(Segment segment, List<String> partitions) throws IOException;
+ List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
index ec2ae93..6f94be5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.indexstore;
import java.io.IOException;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
/**
@@ -32,5 +33,5 @@ public interface SegmentPropertiesFetcher {
* @return
* @throws IOException
*/
- SegmentProperties getSegmentProperties(String segmentId) throws IOException;
+ SegmentProperties getSegmentProperties(Segment segment) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index ef1bd33..ce6193b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -660,7 +660,7 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache
}
@Override
- public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
+ public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions) {
if (unsafeMemoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 5ca3ac5..ee849bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -38,13 +38,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
@@ -57,8 +55,7 @@ import org.apache.hadoop.fs.RemoteIterator;
* Table map for blocklet
*/
public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
- implements BlockletDetailsFetcher,
- SegmentPropertiesFetcher {
+ implements BlockletDetailsFetcher, SegmentPropertiesFetcher {
private AbsoluteTableIdentifier identifier;
@@ -75,12 +72,12 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
}
@Override
- public DataMapWriter createWriter(Segment segment) {
+ public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
throw new UnsupportedOperationException("not implemented");
}
@Override
- public List<DataMap> getDataMaps(Segment segment) throws IOException {
+ public List<AbstractCoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
getTableBlockIndexUniqueIdentifiers(segment);
return cache.getAll(tableBlockIndexUniqueIdentifiers);
@@ -262,8 +259,8 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
return null;
}
- @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException {
- List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+ @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
+ List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segment);
assert (dataMaps.size() > 0);
AbstractCoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
assert (coarseGrainDataMap instanceof BlockletDataMap);
@@ -271,12 +268,12 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
return dataMap.getSegmentProperties();
}
- @Override public List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions)
+ @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
throws IOException {
List<Blocklet> blocklets = new ArrayList<>();
- List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+ List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segment);
for (AbstractCoarseGrainDataMap dataMap : dataMaps) {
- blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), partitions));
+ blocklets.addAll(dataMap.prune(null, getSegmentProperties(segment), partitions));
}
return blocklets;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index b5f5a25..f48cc6d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -432,7 +432,7 @@ public class SegmentFileStore {
boolean forceDelete) throws IOException {
LoadMetadataDetails[] details =
- SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath());
+ SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
// scan through each segment.
for (LoadMetadataDetails segment : details) {
// if this segment is valid then only we will go for deletion of related
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 1f63a81..e35bb8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 9140a11..d48abf9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 120671f..d89a488 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 547ecaa..a00c7db 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 25ce0c8..6ec6fa2 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -233,10 +233,7 @@ public class SegmentUpdateStatusManager {
* @throws Exception
*/
public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
-// int tableFactPathLength = CarbonTablePath.getFactDir(identifier.getTablePath()).length() + 1;
-// String blockId = blockFilePath.substring(tableFactPathLength);
-
- String blockId = CarbonUtil.getBlockId(absoluteTableIdentifier, blockFilePath, segmentId);
+ String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId);
String tupleId;
if (isPartitionTable) {
tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
@@ -249,13 +246,8 @@ public class SegmentUpdateStatusManager {
/**
* Returns all delta file paths of specified block
- *
- * @param tupleId
- * @param extension
- * @return
- * @throws Exception
*/
- public List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
+ private List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
try {
String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
String completeBlockName = CarbonTablePath.addDataPartPrefix(
@@ -263,11 +255,11 @@ public class SegmentUpdateStatusManager {
+ CarbonCommonConstants.FACT_FILE_EXT);
String blockPath;
if (isPartitionTable) {
- blockPath = absoluteTableIdentifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
+ blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
.replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
} else {
- String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
+ String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment);
blockPath =
carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
}
@@ -391,16 +383,10 @@ public class SegmentUpdateStatusManager {
* @return
*/
public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) {
-
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
-
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId.getSegmentNo());
-
+ String segmentPath =
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId.getSegmentNo());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-
for (SegmentUpdateDetails block : updateDetails) {
if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
(block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5ec0158..c9b4337 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2361,21 +2361,19 @@ public final class CarbonUtil {
}
// Get the total size of carbon data and the total size of carbon index
- public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath,
+ public static HashMap<String, Long> getDataSizeAndIndexSize(String tablePath,
Segment segment) throws IOException {
if (segment.getSegmentFileName() != null) {
- SegmentFileStore fileStore =
- new SegmentFileStore(carbonTablePath.getPath(), segment.getSegmentFileName());
+ SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
return getDataSizeAndIndexSize(fileStore);
} else {
- return getDataSizeAndIndexSize(carbonTablePath, segment.getSegmentNo());
+ return getDataSizeAndIndexSize(tablePath, segment.getSegmentNo());
}
}
// Get the total size of segment.
- public static long getSizeOfSegment(CarbonTablePath carbonTablePath,
- Segment segment) throws IOException {
- HashMap<String, Long> dataSizeAndIndexSize = getDataSizeAndIndexSize(carbonTablePath, segment);
+ public static long getSizeOfSegment(String tablePath, Segment segment) throws IOException {
+ HashMap<String, Long> dataSizeAndIndexSize = getDataSizeAndIndexSize(tablePath, segment);
long size = 0;
for (Long eachSize: dataSizeAndIndexSize.values()) {
size += eachSize;
@@ -2585,9 +2583,7 @@ public final class CarbonUtil {
String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
String tablePath = identifier.getTablePath();
if (filePath.startsWith(tablePath)) {
- String factDir =
- CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier())
- .getFactDir();
+ String factDir = CarbonTablePath.getFactDir(tablePath);
if (filePath.startsWith(factDir)) {
blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+ CarbonCommonConstants.FILE_SEPARATOR + blockName;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 4b6f231..1cbbcb4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
import org.apache.carbondata.core.datastore.compression.SnappyCompressor
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.scan.expression.Expression
@@ -62,16 +62,16 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
/**
* Return a new write for this datamap
*/
- override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
- new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+ override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+ new CGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
}
/**
* Get the datamap for segmentid
*/
- override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
+ override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = {
val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -108,9 +108,9 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
*
* @return
*/
- override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -125,7 +125,7 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
/**
* Clears datamap of the segment
*/
- override def clear(segmentId: String): Unit = {
+ override def clear(segment: Segment): Unit = {
}
@@ -175,7 +175,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
override def prune(
filterExp: FilterResolverIntf,
segmentProperties: SegmentProperties,
- partitions: java.util.List[String]): java.util.List[Blocklet] = {
+ partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
val expression = filterExp.getFilterExpression
getEqualToExpression(expression, buffer)
@@ -184,7 +184,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
}
val meta = findMeta(value(0).getBytes)
meta.map { f=>
- new Blocklet(f._1, f._2+"")
+ new Blocklet(f._1, f._2 + "")
}.asJava
}
@@ -219,10 +219,10 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
}
class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segmentId: String,
+ segment: Segment,
dataWritePath: String,
dataMapName: String)
- extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+ extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
var currentBlockId: String = null
val cgwritepath = dataWritePath + "/" +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 2f8a1d1..7e93959 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -20,21 +20,19 @@ package org.apache.carbondata.spark.testsuite.datamap
import java.util
import scala.collection.JavaConverters._
+
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.Event
@@ -49,15 +47,16 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
override def fireEvent(event: Event): Unit = ???
- override def clear(segmentId: Segment): Unit = {}
+ override def clear(segment: Segment): Unit = {}
override def clear(): Unit = {}
- override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+ override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
- override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
+ override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
- override def createWriter(segmentId: Segment): AbstractDataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+ override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter =
+ DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)
override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
@@ -175,9 +174,9 @@ object DataMapWriterSuite {
var callbackSeq: Seq[String] = Seq[String]()
- def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
+ def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment,
dataWritePath: String) =
- new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+ new AbstractDataMapWriter(identifier, segment, dataWritePath) {
override def onPageAdded(
blockletId: Int,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index d1bb65f..9c8cc15 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory}
import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
import org.apache.carbondata.core.datastore.compression.SnappyCompressor
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.FineGrainBlocklet
+import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.scan.expression.Expression
@@ -62,16 +62,16 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
/**
* Return a new write for this datamap
*/
- override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
- new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+ override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+ new FGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
}
/**
* Get the datamap for segmentid
*/
- override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = {
+ override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainDataMap] = {
val file = FileFactory
- .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -99,9 +99,9 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
*
* @return
*/
- override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
- val file = FileFactory
- .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
+ val file = FileFactory.getCarbonFile(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -112,7 +112,6 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
}.toList.asJava
}
-
/**
*
* @param event
@@ -124,7 +123,7 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
/**
* Clears datamap of the segment
*/
- override def clear(segmentId: String): Unit = {
+ override def clear(segment: Segment): Unit = {
}
/**
@@ -173,7 +172,7 @@ class FGDataMap extends AbstractFineGrainDataMap {
override def prune(
filterExp: FilterResolverIntf,
segmentProperties: SegmentProperties,
- partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
+ partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
val expression = filterExp.getFilterExpression
getEqualToExpression(expression, buffer)
@@ -187,7 +186,7 @@ class FGDataMap extends AbstractFineGrainDataMap {
}
private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
- value: Array[Byte]): Option[FineGrainBlocklet] = {
+ value: Array[Byte]): Option[Blocklet] = {
val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
val obj = new ObjectInputStream(outputStream)
@@ -211,12 +210,10 @@ class FGDataMap extends AbstractFineGrainDataMap {
pg.setRowId(f._2(p._2).toArray)
pg
}
- pages
Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
} else {
None
}
-
}
private def findMeta(value: Array[Byte]) = {
@@ -249,8 +246,8 @@ class FGDataMap extends AbstractFineGrainDataMap {
}
class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segmentId: String, dataWriterPath: String, dataMapName: String)
- extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
+ segment: Segment, dataWriterPath: String, dataMapName: String)
+ extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
var currentBlockId: String = null
val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 22aa385..f92649a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -194,7 +194,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("delete from update_status_files where age=5").show()
val carbonTable = CarbonEnv
.getCarbonTable(Some("iud_db"), "update_status_files")(sqlContext.sparkSession)
- val metaPath = carbonTable.getMetaDataFilepath
+ val metaPath = carbonTable.getMetadataPath
val files = FileFactory.getCarbonFile(metaPath)
assert(files.listFiles().length == 2)
sql("drop table update_status_files")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 5550358..b39c44c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -269,7 +269,11 @@ object Global {
class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
- override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+ private var identifier: AbsoluteTableIdentifier = _
+
+ override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = {
+ this.identifier = identifier
+ }
override def fireEvent(event: Event): Unit = ???
@@ -277,12 +281,12 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
override def clear(): Unit = {}
- override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+ override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
- override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
+ override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
- override def createWriter(segmentId: Segment): AbstractDataMapWriter = {
- new AbstractDataMapWriter {
+ override def createWriter(segment: Segment, writeDirectoryPath: String): AbstractDataMapWriter = {
+ new AbstractDataMapWriter(identifier, segment, writeDirectoryPath) {
override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
override def onBlockletEnd(blockletId: Int): Unit = { }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index f238d2b..cfc6983 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -52,14 +52,12 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int, indexes: Int): Unit = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
- carbonTable.getTablePath)
- val partitions = CarbonFilters
- .getPartitions(Seq.empty,
- sqlContext.sparkSession,
- TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
+ val partitions = CarbonFilters.getPartitions(
+ Seq.empty,
+ sqlContext.sparkSession,
+ TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
assert(partitions.get.length == partition)
- val details = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath)
+ val details = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
val segLoad = details.find(_.getLoadName.equals(segmentId)).get
val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile)
assert(seg.getIndexFiles.size == indexes)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 8ba2767..97e3061 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -291,7 +291,7 @@ class NewCarbonDataLoadRDD[K, V](
val fileList: java.util.List[String] = new java.util.ArrayList[String](
CarbonCommonConstants.CONSTANT_SIZE_TEN)
CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",")
- model = carbonLoadModel.getCopyWithPartition(
+ model = carbonLoadModel.getCopyWithPartition("0",
carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
StandardLogService.setThreadName(StandardLogService
.getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index cf35c12..49e4420 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -441,8 +441,8 @@ object DataLoadingUtil {
private def isUpdationRequired(isForceDeletion: Boolean,
carbonTable: CarbonTable,
- absoluteTableIdentifier: AbsoluteTableIdentifier) = {
- val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
+ absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = {
+ val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
// Delete marked loads
val isUpdationRequired =
DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 59c43aa..4d0a4c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -184,7 +184,7 @@ case class CreatePreAggregateTableCommand(
CarbonFilters.getCurrentPartitions(sparkSession,
TableIdentifier(parentTable.getTableName,
Some(parentTable.getDatabaseName))).map(_.asJava).orNull)
- val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
+ val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
throw new UnsupportedOperationException(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index c9833d0..5771503 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -222,9 +222,8 @@ case class CarbonRelation(
// for each segment calculate the size
segments.foreach {validSeg =>
if (validSeg.getSegmentFileName != null) {
- val fileStore = new SegmentFileStore(tablePath, validSeg.getSegmentFileName)
size = size + CarbonUtil.getSizeOfSegment(
- carbonTablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName))
+ tablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName))
} else {
size = size + FileFactory.getDirectorySize(
CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 5083ab5..1104229 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,7 @@ public class DataMapWriterListener {
}
List<String> columns = factory.getMeta().getIndexedColumns();
List<AbstractDataMapWriter> writers = registry.get(columns);
- AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null));
+ AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
if (writers != null) {
writers.add(writer);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index a17178a..638ad39 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -478,7 +478,7 @@ public class CarbonLoadModel implements Serializable {
* @param delimiter
* @return
*/
- public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
+ public CarbonLoadModel getCopyWithPartition(String uniqueId,
String header, String delimiter) {
CarbonLoadModel copyObj = new CarbonLoadModel();
copyObj.tableName = tableName;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 89326a3..d2faef5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -44,7 +44,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -612,10 +611,10 @@ public final class CarbonDataMergerUtil {
// variable to store one segment size across partition.
long sizeOfOneSegmentAcrossPartition;
if (segment.getSegmentFile() != null) {
- sizeOfOneSegmentAcrossPartition = CarbonUtil
- .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile()));
+ sizeOfOneSegmentAcrossPartition = CarbonUtil.getSizeOfSegment(
+ tablePath, new Segment(segId, segment.getSegmentFile()));
} else {
- sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, tableIdentifier, segId);
+ sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, segId);
}
// if size of a segment is greater than the Major compaction size. then ignore it.
@@ -1006,14 +1005,8 @@ public final class CarbonDataMergerUtil {
/**
* This method traverses Update Delta Files inside the seg and return true
* if UpdateDelta Files are more than IUD Compaction threshold.
- *
- * @param seg
- * @param identifier
- * @param segmentUpdateStatusManager
- * @param numberDeltaFilesThreshold
- * @return
*/
- public static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
+ private static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
AbsoluteTableIdentifier absoluteTableIdentifier,
SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index b71612a..ea11e22 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -406,14 +406,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
+ carbonLoadModel.getFactTimeStamp() + ".tmp";
} else {
carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(),
- tableName, carbonLoadModel.getPartitionId(), carbonLoadModel.getSegmentId());
+ .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
tempStoreLocation, carbonStoreLocation);
- setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
- carbonFactDataHandlerModel);
+ setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index b41829f..278d5bb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -77,14 +77,12 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
.getFactTimeStamp() + ".tmp";
} else {
carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+ .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
tempStoreLocation, carbonStoreLocation);
- setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
- carbonFactDataHandlerModel);
+ setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
carbonFactDataHandlerModel.setCompactionFlow(true);
dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index ff6ca93..df2e2a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -48,8 +48,7 @@ public class RowResultProcessor {
this.segmentProperties = segProp;
String tableName = carbonTable.getTableName();
String carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+ .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
segProp, tableName, tempStoreLocation, carbonStoreLocation);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 2c08c18..dc8ffd7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -371,8 +370,7 @@ public final class CarbonDataProcessorUtil {
*
* @return data directory path
*/
- public static String createCarbonStoreLocation(String factStoreLocation,
- String databaseName, String tableName, String segmentId) {
+ public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) {
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1eea61d..a3e889a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1066,7 +1066,7 @@ public final class CarbonLoaderUtil {
*/
public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
String segmentId, CarbonTable carbonTable) throws IOException {
- Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable.getAbsoluteTableIdentifier(),
+ Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable.getTablePath(),
new Segment(segmentId, loadMetadataDetails.getSegmentFile()));
Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
loadMetadataDetails.setDataSize(String.valueOf(dataSize));