You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/26 12:19:09 UTC
[9/9] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition
restructure for new folder structure and supporting partition location
feature
[CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature
This closes #1984
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8d3c7740
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8d3c7740
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8d3c7740
Branch: refs/heads/master
Commit: 8d3c77400cd6abc30c6914084268d9ecd3d18751
Parents: dded5d5
Author: ravipesala <ra...@gmail.com>
Authored: Thu Feb 15 00:31:56 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Feb 26 17:43:13 2018 +0530
----------------------------------------------------------------------
.../core/datamap/DataMapDistributable.java | 10 +-
.../apache/carbondata/core/datamap/Segment.java | 97 +++
.../carbondata/core/datamap/TableDataMap.java | 45 +-
.../carbondata/core/datamap/dev/DataMap.java | 3 +-
.../core/datamap/dev/DataMapFactory.java | 9 +-
.../filesystem/AbstractDFSCarbonFile.java | 27 +-
.../indexstore/BlockletDataMapIndexStore.java | 107 +--
.../core/indexstore/BlockletDetailsFetcher.java | 10 +-
.../core/indexstore/PartitionSpec.java | 92 +++
.../TableBlockIndexUniqueIdentifier.java | 87 +--
.../blockletindex/BlockletDataMap.java | 134 ++--
.../BlockletDataMapDistributable.java | 4 +-
.../blockletindex/BlockletDataMapFactory.java | 104 ++-
.../blockletindex/BlockletDataMapModel.java | 27 +-
.../blockletindex/SegmentIndexFileStore.java | 94 ++-
.../carbondata/core/locks/HdfsFileLock.java | 16 -
.../carbondata/core/locks/LocalFileLock.java | 16 +-
.../core/metadata/PartitionMapFileStore.java | 484 ------------
.../core/metadata/SegmentFileStore.java | 744 +++++++++++++++++++
.../core/mutate/CarbonUpdateUtil.java | 50 +-
.../executor/impl/AbstractQueryExecutor.java | 15 +-
.../core/statusmanager/LoadMetadataDetails.java | 18 +
.../statusmanager/SegmentStatusManager.java | 87 ++-
.../SegmentUpdateStatusManager.java | 41 +-
.../apache/carbondata/core/util/CarbonUtil.java | 95 ++-
.../core/util/DataTypeConverterImpl.java | 18 +-
.../carbondata/core/util/DataTypeUtil.java | 34 +-
.../core/util/path/CarbonTablePath.java | 33 +-
.../carbondata/core/util/CarbonUtilTest.java | 9 +-
.../carbondata/hadoop/CarbonInputSplit.java | 9 +-
.../hadoop/api/CarbonOutputCommitter.java | 191 +++--
.../hadoop/api/CarbonTableInputFormat.java | 128 ++--
.../hadoop/api/CarbonTableOutputFormat.java | 2 +
.../hadoop/api/DistributableDataMapFormat.java | 10 +-
.../MajorCompactionIgnoreInMinorTest.scala | 6 +-
.../MajorCompactionStopsAfterCompaction.scala | 2 +-
.../testsuite/datamap/DataMapWriterSuite.scala | 10 +-
.../TestInsertAndOtherCommandConcurrent.scala | 10 +-
.../StandardPartitionBadRecordLoggerTest.scala | 17 -
.../StandardPartitionGlobalSortTestCase.scala | 388 ++++++++--
.../StandardPartitionTableCleanTestCase.scala | 72 +-
...andardPartitionTableCompactionTestCase.scala | 36 +-
.../StandardPartitionTableDropTestCase.scala | 1 +
.../StandardPartitionTableLoadingTestCase.scala | 62 +-
...tandardPartitionTableOverwriteTestCase.scala | 76 +-
.../StandardPartitionTableQueryTestCase.scala | 79 +-
.../spark/util/SparkDataTypeConverterImpl.java | 12 +
.../org/apache/carbondata/api/CarbonStore.scala | 12 +-
.../spark/rdd/CarbonDropPartitionRDD.scala | 118 +--
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 52 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 3 +-
.../spark/rdd/CarbonSparkPartition.scala | 3 +-
.../carbondata/spark/rdd/PartitionDropper.scala | 2 +-
.../carbondata/spark/util/CarbonScalaUtil.scala | 207 ++++--
.../carbondata/spark/util/CommonUtil.scala | 27 +-
.../carbondata/spark/util/DataLoadingUtil.scala | 88 ++-
.../command/carbonTableSchemaCommon.scala | 17 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 49 +-
.../spark/rdd/CarbonTableCompactor.scala | 72 +-
.../org/apache/spark/sql/CarbonCountStar.scala | 4 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 3 +-
.../CarbonAlterTableCompactionCommand.scala | 4 +-
.../management/CarbonCleanFilesCommand.scala | 13 +-
.../management/CarbonLoadDataCommand.scala | 432 +++++++----
.../management/RefreshCarbonTableCommand.scala | 27 +-
.../CarbonProjectForUpdateCommand.scala | 3 +-
.../command/mutation/DeleteExecution.scala | 20 +-
.../command/mutation/HorizontalCompaction.scala | 5 +-
...arbonAlterTableAddHivePartitionCommand.scala | 117 +++
...rbonAlterTableDropHivePartitionCommand.scala | 102 +--
.../CarbonAlterTableDropPartitionCommand.scala | 10 +-
.../CarbonAlterTableSplitPartitionCommand.scala | 4 +-
.../CreatePreAggregateTableCommand.scala | 7 +-
.../preaaggregate/PreAggregateListeners.scala | 13 +-
.../table/CarbonCreateTableCommand.scala | 13 +
.../datasources/CarbonFileFormat.scala | 222 +++---
.../strategy/CarbonLateDecodeStrategy.scala | 15 +-
.../sql/execution/strategy/DDLStrategy.scala | 15 +-
.../apache/spark/sql/hive/CarbonRelation.scala | 18 +-
.../spark/sql/optimizer/CarbonFilters.scala | 59 +-
.../spark/sql/hive/CarbonSessionState.scala | 12 +-
.../spark/sql/hive/CarbonSessionState.scala | 12 +-
.../datamap/DataMapWriterListener.java | 3 +-
.../loading/CarbonDataLoadConfiguration.java | 13 +
.../loading/DataLoadProcessBuilder.java | 4 +-
.../impl/MeasureFieldConverterImpl.java | 2 +-
.../loading/model/CarbonLoadModel.java | 13 +
.../processing/merger/CarbonDataMergerUtil.java | 77 +-
.../merger/CompactionResultSortProcessor.java | 35 +-
.../merger/RowResultMergerProcessor.java | 34 +-
.../partition/spliter/RowResultProcessor.java | 5 +-
.../store/CarbonFactDataHandlerModel.java | 10 +-
.../util/CarbonDataProcessorUtil.java | 8 +-
.../processing/util/CarbonLoaderUtil.java | 39 +-
.../processing/util/DeleteLoadFolders.java | 80 +-
.../carbon/datastore/BlockIndexStoreTest.java | 6 +-
.../carbondata/streaming/StreamHandoffRDD.scala | 5 +-
97 files changed, 3741 insertions(+), 1994 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
index 50af789..edd724a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -31,7 +31,7 @@ public abstract class DataMapDistributable extends InputSplit
private String tablePath;
- private String segmentId;
+ private Segment segment;
private String dataMapName;
@@ -47,12 +47,12 @@ public abstract class DataMapDistributable extends InputSplit
this.tablePath = tablePath;
}
- public String getSegmentId() {
- return segmentId;
+ public Segment getSegment() {
+ return segment;
}
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
+ public void setSegment(Segment segment) {
+ this.segment = segment;
}
public String getDataMapName() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
new file mode 100644
index 0000000..c47f16c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents one load of carbondata
+ */
+public class Segment implements Serializable {
+
+ private static final long serialVersionUID = 7044555408162234064L;
+
+ private String segmentNo;
+
+ private String segmentFileName;
+
+ public Segment(String segmentNo, String segmentFileName) {
+ this.segmentNo = segmentNo;
+ this.segmentFileName = segmentFileName;
+ }
+
+ public String getSegmentNo() {
+ return segmentNo;
+ }
+
+ public String getSegmentFileName() {
+ return segmentFileName;
+ }
+
+ public static List<Segment> toSegmentList(String[] segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.length);
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static List<Segment> toSegmentList(List<String> segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.size());
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ /**
+ * SegmentId can be combination of segmentNo and segmentFileName
+ * @param segmentId
+ * @return
+ */
+ public static Segment toSegment(String segmentId) {
+ String[] split = segmentId.split("#");
+ if (split.length > 1) {
+ return new Segment(split[0], split[1]);
+ } else if (split.length > 0) {
+ return new Segment(split[0], null);
+ }
+ return new Segment(segmentId, null);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Segment segment = (Segment) o;
+ return Objects.equals(segmentNo, segment.segmentNo);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(segmentNo);
+ }
+
+ @Override public String toString() {
+ if (segmentFileName != null) {
+ return segmentNo + "#" + segmentFileName;
+ } else {
+ return segmentNo;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 9c84891..6555d6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.events.Event;
@@ -60,21 +61,21 @@ public final class TableDataMap extends OperationEventListener {
/**
* Pass the valid segments and prune the datamap using filter expression
*
- * @param segmentIds
+ * @param segments
* @param filterExp
* @return
*/
- public List<ExtendedBlocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp,
- List<String> partitions) throws IOException {
+ public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp,
+ List<PartitionSpec> partitions) throws IOException {
List<ExtendedBlocklet> blocklets = new ArrayList<>();
- for (String segmentId : segmentIds) {
+ for (Segment segment : segments) {
List<Blocklet> pruneBlocklets = new ArrayList<>();
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
for (DataMap dataMap : dataMaps) {
pruneBlocklets.addAll(dataMap.prune(filterExp, partitions));
}
blocklets.addAll(addSegmentId(blockletDetailsFetcher
- .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
+ .getExtendedBlocklets(pruneBlocklets, segment), segment.getSegmentNo()));
}
return blocklets;
}
@@ -94,13 +95,13 @@ public final class TableDataMap extends OperationEventListener {
*
* @return
*/
- public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException {
+ public List<DataMapDistributable> toDistributable(List<Segment> segments) throws IOException {
List<DataMapDistributable> distributables = new ArrayList<>();
- for (String segmentsId : segmentIds) {
- List<DataMapDistributable> list = dataMapFactory.toDistributable(segmentsId);
+ for (Segment segment : segments) {
+ List<DataMapDistributable> list = dataMapFactory.toDistributable(segment);
for (DataMapDistributable distributable: list) {
distributable.setDataMapName(dataMapName);
- distributable.setSegmentId(segmentsId);
+ distributable.setSegment(segment);
distributable.setTablePath(identifier.getTablePath());
distributable.setDataMapFactoryClass(dataMapFactory.getClass().getName());
}
@@ -118,7 +119,7 @@ public final class TableDataMap extends OperationEventListener {
* @return
*/
public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
- FilterResolverIntf filterExp, List<String> partitions) throws IOException {
+ FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException {
List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
List<Blocklet> blocklets = new ArrayList<>();
List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
@@ -127,8 +128,8 @@ public final class TableDataMap extends OperationEventListener {
}
for (Blocklet blocklet: blocklets) {
ExtendedBlocklet detailedBlocklet =
- blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegmentId());
- detailedBlocklet.setSegmentId(distributable.getSegmentId());
+ blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegment());
+ detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo());
detailedBlocklets.add(detailedBlocklet);
}
return detailedBlocklets;
@@ -136,11 +137,11 @@ public final class TableDataMap extends OperationEventListener {
/**
* Clear only the datamaps of the segments
- * @param segmentIds
+ * @param segments
*/
- public void clear(List<String> segmentIds) {
- for (String segmentId: segmentIds) {
- dataMapFactory.clear(segmentId);
+ public void clear(List<Segment> segments) {
+ for (Segment segment: segments) {
+ dataMapFactory.clear(segment);
}
}
@@ -170,21 +171,21 @@ public final class TableDataMap extends OperationEventListener {
/**
* Method to prune the segments based on task min/max values
*
- * @param segmentIds
+ * @param segments
* @param filterExp
* @return
* @throws IOException
*/
- public List<String> pruneSegments(List<String> segmentIds, FilterResolverIntf filterExp)
+ public List<String> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp)
throws IOException {
List<String> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (String segmentId : segmentIds) {
- List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+ for (Segment segment : segments) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
for (DataMap dataMap : dataMaps) {
if (dataMap.isScanRequired(filterExp)) {
// If any one task in a given segment contains the data that means the segment need to
// be scanned and we need to validate further data maps in the same segment
- prunedSegments.add(segmentId);
+ prunedSegments.add(segment.getSegmentNo());
break;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 16be1ac..f3642d6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -50,7 +51,7 @@ public interface DataMap {
* @param filterExp
* @return
*/
- List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions);
+ List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions);
// TODO Move this method to Abstract class
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index f5a7404..40cd436 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.events.Event;
@@ -37,12 +38,12 @@ public interface DataMapFactory {
/**
* Return a new write for this datamap
*/
- DataMapWriter createWriter(String segmentId);
+ DataMapWriter createWriter(Segment segment);
/**
* Get the datamap for segmentid
*/
- List<DataMap> getDataMaps(String segmentId) throws IOException;
+ List<DataMap> getDataMaps(Segment segment) throws IOException;
/**
* Get datamaps for distributable object.
@@ -53,7 +54,7 @@ public interface DataMapFactory {
* Get all distributable objects of a segmentid
* @return
*/
- List<DataMapDistributable> toDistributable(String segmentId);
+ List<DataMapDistributable> toDistributable(Segment segment);
/**
*
@@ -64,7 +65,7 @@ public interface DataMapFactory {
/**
* Clears datamap of the segment
*/
- void clear(String segmentId);
+ void clear(Segment segment);
/**
* Clear all datamaps from memory
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 927cef5..68eaa21 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -412,15 +413,21 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
@Override
public boolean createNewFile(String filePath, FileFactory.FileType fileType, boolean doAs,
- final FsPermission permission) throws IOException {
+ FsPermission permission) throws IOException {
filePath = filePath.replace("\\", "/");
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
- boolean result = fs.createNewFile(path);
- if (null != permission) {
- fs.setPermission(path, permission);
+ if (fs.exists(path)) {
+ return false;
+ } else {
+ if (permission == null) {
+ permission = FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(fs.getConf()));
+ }
+ // Pass the permissions duringg file creation itself
+ fs.create(path, permission, false, fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(path), fs.getDefaultBlockSize(path), null).close();
+ return true;
}
- return result;
}
@Override public boolean deleteFile(String filePath, FileFactory.FileType fileType)
@@ -453,11 +460,15 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
filePath = filePath.replace("\\", "/");
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
- if (fs.createNewFile(path)) {
- fs.deleteOnExit(path);
+ if (fs.exists(path)) {
+ return false;
+ } else {
+ // Pass the permissions duringg file creation itself
+ fs.create(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false,
+ fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(path),
+ fs.getDefaultBlockSize(path), null).close();
return true;
}
- return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 111a7a2..c6073d5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -28,14 +28,20 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.PartitionMapFileStore;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
/**
* Class to handle loading, unloading,clearing,storing of the table
@@ -75,22 +81,10 @@ public class BlockletDataMapIndexStore
BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
if (dataMap == null) {
try {
- String segmentPath = CarbonTablePath.getSegmentPath(
- identifier.getAbsoluteTableIdentifier().getTablePath(),
- identifier.getSegmentId());
- Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
- CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles();
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
- indexFileStore.readAllIIndexOfSegment(carbonFiles);
- PartitionMapFileStore partitionFileStore = new PartitionMapFileStore();
- partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath);
- for (CarbonFile file : carbonFiles) {
- blockMetaInfoMap
- .put(file.getAbsolutePath(), new BlockMetaInfo(file.getLocations(), file.getSize()));
- }
- dataMap =
- loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, blockMetaInfoMap);
+ Map<String, BlockMetaInfo> blockMetaInfoMap =
+ getBlockMetaInfoMap(identifier, indexFileStore);
+ dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
} catch (MemoryException e) {
LOGGER.error("memory exception when loading datamap: " + e.getMessage());
throw new RuntimeException(e.getMessage(), e);
@@ -99,6 +93,47 @@ public class BlockletDataMapIndexStore
return dataMap;
}
+ private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
+ SegmentIndexFileStore indexFileStore) throws IOException {
+ if (identifier.getMergeIndexFileName() != null) {
+ CarbonFile indexMergeFile = FileFactory.getCarbonFile(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getMergeIndexFileName());
+ if (indexMergeFile.exists()) {
+ indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
+ }
+ }
+ if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+ indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName()) });
+ }
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+ List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
+ for (DataFileFooter footer : indexInfo) {
+ String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
+ blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
+ }
+ return blockMetaInfoMap;
+ }
+
+ private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
+ if (carbonFile instanceof AbstractDFSCarbonFile) {
+ RemoteIterator<LocatedFileStatus> iter =
+ ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
+ LocatedFileStatus fileStatus = iter.next();
+ String[] location = fileStatus.getBlockLocations()[0].getHosts();
+ long len = fileStatus.getLen();
+ return new BlockMetaInfo(location, len);
+ } else {
+ return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
+ }
+ }
+
@Override
public List<BlockletDataMap> getAll(
List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
@@ -116,34 +151,13 @@ public class BlockletDataMapIndexStore
}
}
if (missedIdentifiers.size() > 0) {
- Map<String, SegmentIndexFileStore> segmentIndexFileStoreMap = new HashMap<>();
- Map<String, PartitionMapFileStore> partitionFileStoreMap = new HashMap<>();
- Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
- SegmentIndexFileStore indexFileStore =
- segmentIndexFileStoreMap.get(identifier.getSegmentId());
- PartitionMapFileStore partitionFileStore =
- partitionFileStoreMap.get(identifier.getSegmentId());
- String segmentPath = CarbonTablePath.getSegmentPath(
- identifier.getAbsoluteTableIdentifier().getTablePath(),
- identifier.getSegmentId());
- if (indexFileStore == null) {
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
- CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles();
- indexFileStore = new SegmentIndexFileStore();
- indexFileStore.readAllIIndexOfSegment(carbonFiles);
- segmentIndexFileStoreMap.put(identifier.getSegmentId(), indexFileStore);
- partitionFileStore = new PartitionMapFileStore();
- partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath);
- partitionFileStoreMap.put(identifier.getSegmentId(), partitionFileStore);
- for (CarbonFile file : carbonFiles) {
- blockMetaInfoMap.put(FileFactory.getUpdatedFilePath(file.getAbsolutePath()),
- new BlockMetaInfo(file.getLocations(), file.getSize()));
- }
- }
+ Map<String, BlockMetaInfo> blockMetaInfoMap =
+ getBlockMetaInfoMap(identifier, indexFileStore);
blockletDataMaps.add(
- loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, blockMetaInfoMap));
+ loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
}
}
} catch (Throwable e) {
@@ -194,7 +208,6 @@ public class BlockletDataMapIndexStore
private BlockletDataMap loadAndGetDataMap(
TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore,
- PartitionMapFileStore partitionFileStore,
Map<String, BlockMetaInfo> blockMetaInfoMap)
throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
@@ -206,10 +219,10 @@ public class BlockletDataMapIndexStore
BlockletDataMap dataMap;
synchronized (lock) {
dataMap = new BlockletDataMap();
- dataMap.init(new BlockletDataMapModel(identifier.getFilePath(),
- indexFileStore.getFileData(identifier.getCarbonIndexFileName()),
- partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()),
- partitionFileStore.isPartionedSegment(), blockMetaInfoMap));
+ dataMap.init(new BlockletDataMapModel(
+ identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
+ blockMetaInfoMap, identifier.getSegmentId()));
lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
dataMap.getMemorySize());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 21ecba1..b4d6db2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.indexstore;
import java.io.IOException;
import java.util.List;
+import org.apache.carbondata.core.datamap.Segment;
+
/**
* Fetches the detailed blocklet which has more information to execute the query
*/
@@ -28,20 +30,20 @@ public interface BlockletDetailsFetcher {
* Get the blocklet detail information based on blockletid, blockid and segmentid.
*
* @param blocklets
- * @param segmentId
+ * @param segment
* @return
* @throws IOException
*/
- List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId)
+ List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
throws IOException;
/**
* Get the blocklet detail information based on blockletid, blockid and segmentid.
*
* @param blocklet
- * @param segmentId
+ * @param segment
* @return
* @throws IOException
*/
- ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) throws IOException;
+ ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
new file mode 100644
index 0000000..87c875e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Holds partition information.
+ */
+public class PartitionSpec implements Serializable {
+
+ private static final long serialVersionUID = 4828007433384867678L;
+
+ /**
+ * It holds the partition information in columnName=partitionValue combination.
+ */
+ private List<String> partitions;
+
+ private transient Path locationPath;
+
+ private String location;
+
+ private String uuid;
+
+ public PartitionSpec(List<String> partitions, String location) {
+ this.partitions = partitions;
+ this.locationPath = new Path(FileFactory.getUpdatedFilePath(location));
+ this.location = locationPath.toString();
+ }
+
+ public PartitionSpec(List<String> partitions, URI location) {
+ this.partitions = partitions;
+ this.locationPath = new Path(FileFactory.getUpdatedFilePath(new Path(location).toString()));
+ this.location = locationPath.toString();
+ }
+
+ public List<String> getPartitions() {
+ return partitions;
+ }
+
+ public Path getLocation() {
+ if (locationPath == null) {
+ locationPath = new Path(location);
+ }
+ return locationPath;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PartitionSpec spec = (PartitionSpec) o;
+ return Objects.equals(getLocation(), spec.getLocation());
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(locationPath);
+ }
+
+ @Override public String toString() {
+ return "PartitionSpec{" + "partitions=" + partitions + ", locationPath=" + locationPath
+ + ", location='" + location + '\'' + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index 18357ac..c907fa8 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -17,91 +17,66 @@
package org.apache.carbondata.core.indexstore;
+import java.util.Objects;
+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
/**
- * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ * Class holds the indexFile information to uniquely identitify the carbon index
*/
public class TableBlockIndexUniqueIdentifier {
- /**
- * table fully qualified identifier
- */
- private AbsoluteTableIdentifier absoluteTableIdentifier;
- private String segmentId;
+ private String indexFilePath;
- private String carbonIndexFileName;
+ private String indexFileName;
- /**
- * Constructor to initialize the class instance
- *
- * @param absoluteTableIdentifier
- * @param segmentId
- */
- public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
- String segmentId, String carbonIndexFileName) {
- this.absoluteTableIdentifier = absoluteTableIdentifier;
+ private String mergeIndexFileName;
+
+ private String segmentId;
+
+ public TableBlockIndexUniqueIdentifier(String indexFilePath, String indexFileName,
+ String mergeIndexFileName, String segmentId) {
+ this.indexFilePath = indexFilePath;
+ this.indexFileName = indexFileName;
+ this.mergeIndexFileName = mergeIndexFileName;
this.segmentId = segmentId;
- this.carbonIndexFileName = carbonIndexFileName;
}
/**
- * returns AbsoluteTableIdentifier
+ * method returns the id to uniquely identify a key
*
* @return
*/
- public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
- return absoluteTableIdentifier;
+ public String getUniqueTableSegmentIdentifier() {
+ return indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName;
}
- public String getSegmentId() {
- return segmentId;
+ public String getIndexFilePath() {
+ return indexFilePath;
}
- /**
- * method returns the id to uniquely identify a key
- *
- * @return
- */
- public String getUniqueTableSegmentIdentifier() {
- CarbonTableIdentifier carbonTableIdentifier =
- absoluteTableIdentifier.getCarbonTableIdentifier();
- return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
- + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
- + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId
- + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName;
+ public String getIndexFileName() {
+ return indexFileName;
+ }
+
+ public String getMergeIndexFileName() {
+ return mergeIndexFileName;
}
- public String getFilePath() {
- return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/"
- + carbonIndexFileName;
+ public String getSegmentId() {
+ return segmentId;
}
@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
-
TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o;
-
- if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) {
- return false;
- }
- if (!segmentId.equals(that.segmentId)) {
- return false;
- }
- return carbonIndexFileName.equals(that.carbonIndexFileName);
+ return Objects.equals(indexFilePath, that.indexFilePath) && Objects
+ .equals(indexFileName, that.indexFileName) && Objects
+ .equals(mergeIndexFileName, that.mergeIndexFileName);
}
@Override public int hashCode() {
- int result = absoluteTableIdentifier.hashCode();
- result = 31 * result + segmentId.hashCode();
- result = 31 * result + carbonIndexFileName.hashCode();
- return result;
- }
-
- public String getCarbonIndexFileName() {
- return carbonIndexFileName;
+ return Objects.hash(indexFilePath, indexFileName, mergeIndexFileName);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 699f9e1..9ec7a46 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -65,8 +66,10 @@ import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
import org.xerial.snappy.Snappy;
/**
@@ -111,7 +114,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
private static int SCHEMA = 2;
- private static int PARTITION_INFO = 3;
+ private static int INDEX_PATH = 3;
+
+ private static int INDEX_FILE_NAME = 4;
+
+ private static int SEGMENTID = 5;
private UnsafeMemoryDMStore unsafeMemoryDMStore;
@@ -121,8 +128,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
private int[] columnCardinality;
- private boolean isPartitionedSegment;
-
@Override
public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
long startTime = System.currentTimeMillis();
@@ -131,7 +136,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
List<DataFileFooter> indexInfo = fileFooterConverter
.getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
- isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment();
+ Path path = new Path(blockletDataMapInfo.getFilePath());
+ byte[] filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+ byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+ byte[] segmentId =
+ blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
DataMapRowImpl summaryRow = null;
byte[] schemaBinary = null;
// below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID
@@ -145,7 +154,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
createSchema(segmentProperties);
- createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions(), schemaBinary);
+ createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
+ segmentId);
}
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
@@ -182,8 +192,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
if (null != unsafeMemorySummaryDMStore) {
addTaskSummaryRowToUnsafeMemoryStore(
summaryRow,
- blockletDataMapInfo.getPartitions(),
- schemaBinary);
+ schemaBinary,
+ filePath,
+ fileName,
+ segmentId);
unsafeMemorySummaryDMStore.finishWriting();
}
LOGGER.info(
@@ -354,8 +366,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
return summaryRow;
}
- private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow,
- List<String> partitions, byte[] schemaBinary) throws IOException {
+ private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, byte[] schemaBinary,
+ byte[] filePath, byte[] fileName, byte[] segmentId) {
// write the task summary info to unsafe memory store
if (null != summaryRow) {
// Add column schema , it is useful to generate segment properties in executor.
@@ -363,18 +375,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
if (schemaBinary != null) {
summaryRow.setByteArray(schemaBinary, SCHEMA);
}
- if (partitions != null && partitions.size() > 0) {
- CarbonRowSchema[] minSchemas =
- ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore
- .getSchema()[PARTITION_INFO]).getChildSchemas();
- DataMapRow partitionRow = new DataMapRowImpl(minSchemas);
- for (int i = 0; i < partitions.size(); i++) {
- partitionRow
- .setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
- i);
- }
- summaryRow.setRow(partitionRow, PARTITION_INFO);
- }
+ summaryRow.setByteArray(filePath, INDEX_PATH);
+ summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
+ summaryRow.setByteArray(segmentId, SEGMENTID);
try {
unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
} catch (Exception e) {
@@ -560,27 +563,25 @@ public class BlockletDataMap implements DataMap, Cacheable {
* once per datamap. It stores datamap level max/min of each column and partition information of
* datamap
* @param segmentProperties
- * @param partitions
* @throws MemoryException
*/
- private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions,
- byte[] schemaBinary)
+ private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
+ byte[] filePath, byte[] fileName, byte[] segmentId)
throws MemoryException {
List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
// for storing column schema
taskMinMaxSchemas.add(
new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length));
- if (partitions != null && partitions.size() > 0) {
- CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()];
- for (int i = 0; i < mapSchemas.length; i++) {
- mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
- }
- CarbonRowSchema mapSchema =
- new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
- mapSchemas);
- taskMinMaxSchemas.add(mapSchema);
- }
+ // for storing file path
+ taskMinMaxSchemas.add(
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, filePath.length));
+ // for storing file name
+ taskMinMaxSchemas.add(
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, fileName.length));
+ // for storing segmentid
+ taskMinMaxSchemas.add(
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
}
@@ -660,22 +661,24 @@ public class BlockletDataMap implements DataMap, Cacheable {
return blocklets;
}
- @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) {
+ @Override
+ public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
if (unsafeMemoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
- // First get the partitions which are stored inside datamap.
- List<String> storedPartitions = getPartitions();
// if it has partitioned datamap but there is no partitioned information stored, it means
// partitions are dropped so return empty list.
- if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) {
- return new ArrayList<>();
- }
- if (storedPartitions != null && storedPartitions.size() > 0) {
+ if (partitions != null) {
+ // First get the partitions which are stored inside datamap.
+ String[] fileDetails = getFileDetails();
// Check the exact match of partition information inside the stored partitions.
boolean found = false;
- if (partitions != null && partitions.size() > 0) {
- found = partitions.containsAll(storedPartitions);
+ Path folderPath = new Path(fileDetails[0]);
+ for (PartitionSpec spec : partitions) {
+ if (folderPath.equals(spec.getLocation()) && isCorrectUUID(fileDetails, spec)) {
+ found = true;
+ break;
+ }
}
if (!found) {
return new ArrayList<>();
@@ -685,6 +688,20 @@ public class BlockletDataMap implements DataMap, Cacheable {
return prune(filterExp);
}
+ private boolean isCorrectUUID(String[] fileDetails, PartitionSpec spec) {
+ boolean needToScan = false;
+ if (spec.getUuid() != null) {
+ String[] split = spec.getUuid().split("_");
+ if (split[0].equals(fileDetails[2]) && CarbonTablePath.DataFileUtil
+ .getTimeStampFromFileName(fileDetails[1]).equals(split[1])) {
+ needToScan = true;
+ }
+ } else {
+ needToScan = true;
+ }
+ return needToScan;
+ }
+
/**
* select the blocks based on column min and max value
*
@@ -767,6 +784,23 @@ public class BlockletDataMap implements DataMap, Cacheable {
return blocklet;
}
+ private String[] getFileDetails() {
+ try {
+ String[] fileDetails = new String[3];
+ DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+ fileDetails[0] =
+ new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
+ fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
+ CarbonCommonConstants.DEFAULT_CHARSET);
+ fileDetails[2] = new String(unsafeRow.getByteArray(SEGMENTID),
+ CarbonCommonConstants.DEFAULT_CHARSET);
+ return fileDetails;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
/**
* Binary search used to get the first tentative index row based on
* search key
@@ -874,20 +908,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
return dataMapRow;
}
- private List<String> getPartitions() {
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
- if (unsafeRow.getColumnCount() > PARTITION_INFO) {
- List<String> partitions = new ArrayList<>();
- DataMapRow row = unsafeRow.getRow(PARTITION_INFO);
- for (int i = 0; i < row.getColumnCount(); i++) {
- partitions.add(
- new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
- }
- return partitions;
- }
- return null;
- }
-
private byte[] getColumnSchemaBinary() {
DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
return unsafeRow.getByteArray(SCHEMA);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 63f45b5..99e48a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -31,8 +31,8 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
*/
private String filePath;
- public BlockletDataMapDistributable(String indexFileName) {
- this.filePath = indexFileName;
+ public BlockletDataMapDistributable(String indexFilePath) {
+ this.filePath = indexFilePath;
}
public String getFilePath() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2e2cab5..5eb077f 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -37,6 +38,7 @@ import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
@@ -65,30 +67,40 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
}
@Override
- public DataMapWriter createWriter(String segmentId) {
+ public DataMapWriter createWriter(Segment segment) {
throw new UnsupportedOperationException("not implemented");
}
@Override
- public List<DataMap> getDataMaps(String segmentId) throws IOException {
+ public List<DataMap> getDataMaps(Segment segment) throws IOException {
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
- getTableBlockIndexUniqueIdentifiers(segmentId);
+ getTableBlockIndexUniqueIdentifiers(segment);
return cache.getAll(tableBlockIndexUniqueIdentifiers);
}
private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
- String segmentId) throws IOException {
+ Segment segment) throws IOException {
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
- segmentMap.get(segmentId);
+ segmentMap.get(segment.getSegmentNo());
if (tableBlockIndexUniqueIdentifiers == null) {
tableBlockIndexUniqueIdentifiers = new ArrayList<>();
- String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
- List<String> indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
- for (int i = 0; i < indexFiles.size(); i++) {
+ Map<String, String> indexFiles;
+ if (segment.getSegmentFileName() == null) {
+ String path =
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+ indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+ } else {
+ SegmentFileStore fileStore =
+ new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+ indexFiles = fileStore.getIndexFiles();
+ }
+ for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
+ Path indexFile = new Path(indexFileEntry.getKey());
tableBlockIndexUniqueIdentifiers.add(
- new TableBlockIndexUniqueIdentifier(identifier, segmentId, indexFiles.get(i)));
+ new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
+ indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo()));
}
- segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers);
+ segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
}
return tableBlockIndexUniqueIdentifiers;
}
@@ -99,7 +111,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
* datamap.
*/
@Override
- public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId)
+ public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
throws IOException {
List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
// If it is already detailed blocklet then type cast and return same
@@ -110,7 +122,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
return detailedBlocklets;
}
List<TableBlockIndexUniqueIdentifier> identifiers =
- getTableBlockIndexUniqueIdentifiers(segmentId);
+ getTableBlockIndexUniqueIdentifiers(segment);
// Retrieve each blocklets detail information from blocklet datamap
for (Blocklet blocklet : blocklets) {
detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
@@ -119,13 +131,13 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
}
@Override
- public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId)
+ public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
throws IOException {
if (blocklet instanceof ExtendedBlocklet) {
return (ExtendedBlocklet) blocklet;
}
List<TableBlockIndexUniqueIdentifier> identifiers =
- getTableBlockIndexUniqueIdentifiers(segmentId);
+ getTableBlockIndexUniqueIdentifiers(segment);
return getExtendedBlocklet(identifiers, blocklet);
}
@@ -133,7 +145,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
Blocklet blocklet) throws IOException {
String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
- if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
+ if (identifier.getIndexFilePath().equals(carbonIndexFileName)) {
DataMap dataMap = cache.get(identifier);
return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
}
@@ -141,36 +153,51 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
}
-
@Override
- public List<DataMapDistributable> toDistributable(String segmentId) {
- CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId);
+ public List<DataMapDistributable> toDistributable(Segment segment) {
List<DataMapDistributable> distributables = new ArrayList<>();
- for (int i = 0; i < carbonIndexFiles.length; i++) {
- Path path = new Path(carbonIndexFiles[i].getPath());
- try {
+ try {
+ CarbonFile[] carbonIndexFiles;
+ if (segment.getSegmentFileName() == null) {
+ carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
+ } else {
+ SegmentFileStore fileStore =
+ new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+ Map<String, String> indexFiles = fileStore.getIndexFiles();
+ carbonIndexFiles = new CarbonFile[indexFiles.size()];
+ int i = 0;
+ for (String indexFile : indexFiles.keySet()) {
+ carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
+ }
+ }
+ for (int i = 0; i < carbonIndexFiles.length; i++) {
+ Path path = new Path(carbonIndexFiles[i].getPath());
+
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
LocatedFileStatus fileStatus = iter.next();
String[] location = fileStatus.getBlockLocations()[0].getHosts();
BlockletDataMapDistributable distributable =
- new BlockletDataMapDistributable(path.getName());
+ new BlockletDataMapDistributable(path.toString());
distributable.setLocations(location);
distributables.add(distributable);
- } catch (IOException e) {
- throw new RuntimeException(e);
+
}
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
return distributables;
}
- @Override public void fireEvent(Event event) {
+ @Override
+ public void fireEvent(Event event) {
}
@Override
- public void clear(String segmentId) {
- List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
+ public void clear(Segment segment) {
+ List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
if (blockIndexes != null) {
for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
DataMap dataMap = cache.getIfPresent(blockIndex);
@@ -185,7 +212,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
@Override
public void clear() {
for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
- clear(segmentId);
+ clear(new Segment(segmentId, null));
}
}
@@ -193,18 +220,21 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException {
BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
- if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
- identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
- mapDistributable.getFilePath()));
- } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ Path indexPath = new Path(mapDistributable.getFilePath());
+ String segmentNo = mapDistributable.getSegment().getSegmentNo();
+ if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ String parent = indexPath.getParent().toString();
+ identifiers
+ .add(new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo));
+ } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
- List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), mapDistributable.getSegmentId())
- + "/" + mapDistributable.getFilePath());
+ CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString());
+ String parentPath = carbonFile.getParentFile().getAbsolutePath();
+ List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath());
for (String indexFile : indexFiles) {
identifiers.add(
- new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
- indexFile));
+ new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(),
+ segmentNo));
}
}
List<DataMap> dataMaps;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index b3a7f8c..ebeb278 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -16,7 +16,6 @@
*/
package org.apache.carbondata.core.indexstore.blockletindex;
-import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
@@ -29,35 +28,27 @@ public class BlockletDataMapModel extends DataMapModel {
private byte[] fileData;
- private List<String> partitions;
+ private Map<String, BlockMetaInfo> blockMetaInfoMap;
- private boolean partitionedSegment;
+ private String segmentId;
- Map<String, BlockMetaInfo> blockMetaInfoMap;
-
- public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions,
- boolean partitionedSegment,
- Map<String, BlockMetaInfo> blockMetaInfoMap) {
+ public BlockletDataMapModel(String filePath, byte[] fileData,
+ Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
super(filePath);
this.fileData = fileData;
- this.partitions = partitions;
- this.partitionedSegment = partitionedSegment;
this.blockMetaInfoMap = blockMetaInfoMap;
+ this.segmentId = segmentId;
}
public byte[] getFileData() {
return fileData;
}
- public List<String> getPartitions() {
- return partitions;
- }
-
- public boolean isPartitionedSegment() {
- return partitionedSegment;
- }
-
public Map<String, BlockMetaInfo> getBlockMetaInfoMap() {
return blockMetaInfoMap;
}
+
+ public String getSegmentId() {
+ return segmentId;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index a30b04c..b88c1f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -28,10 +28,12 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
@@ -58,8 +60,14 @@ public class SegmentIndexFileStore {
*/
private Map<String, byte[]> carbonIndexMap;
+ /**
+ * Stores the indexfile name and related binary file data in it.
+ */
+ private Map<String, byte[]> carbonIndexMapWithFullPath;
+
public SegmentIndexFileStore() {
carbonIndexMap = new HashMap<>();
+ carbonIndexMapWithFullPath = new HashMap<>();
}
/**
@@ -80,6 +88,45 @@ public class SegmentIndexFileStore {
}
/**
+ * Read all index files and keep the cache in it.
+ *
+ * @param segmentFileStore
+ * @throws IOException
+ */
+ public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
+ boolean ignoreStatus) throws IOException {
+ List<CarbonFile> carbonIndexFiles = new ArrayList<>();
+ if (segmentFileStore.getLocationMap() == null) {
+ return;
+ }
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
+ .getLocationMap().entrySet()) {
+ String location = locations.getKey();
+
+ if (locations.getValue().getStatus().equals(status.getMessage()) || ignoreStatus) {
+ if (locations.getValue().isRelative()) {
+ location =
+ segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ for (String indexFile : locations.getValue().getFiles()) {
+ CarbonFile carbonFile = FileFactory
+ .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile);
+ if (carbonFile.exists()) {
+ carbonIndexFiles.add(carbonFile);
+ }
+ }
+ }
+ }
+ for (int i = 0; i < carbonIndexFiles.size(); i++) {
+ if (carbonIndexFiles.get(i).getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ readMergeFile(carbonIndexFiles.get(i).getCanonicalPath());
+ } else if (carbonIndexFiles.get(i).getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ readIndexFile(carbonIndexFiles.get(i));
+ }
+ }
+ }
+
+ /**
* read index file and fill the blocklet information
*
* @param segmentPath
@@ -120,17 +167,22 @@ public class SegmentIndexFileStore {
* @return
* @throws IOException
*/
- public List<String> getIndexFilesFromSegment(String segmentPath) throws IOException {
+ public Map<String, String> getIndexFilesFromSegment(String segmentPath) throws IOException {
CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
- Set<String> indexFiles = new HashSet<>();
+ Map<String, String> indexFiles = new HashMap<>();
for (int i = 0; i < carbonIndexFiles.length; i++) {
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- indexFiles.addAll(getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath()));
+ List<String> indexFilesFromMergeFile =
+ getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath());
+ for (String file: indexFilesFromMergeFile) {
+ indexFiles.put(carbonIndexFiles[i].getParentFile().getAbsolutePath()
+ + CarbonCommonConstants.FILE_SEPARATOR + file, carbonIndexFiles[i].getName());
+ }
} else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
- indexFiles.add(carbonIndexFiles[i].getName());
+ indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null);
}
}
- return new ArrayList<>(indexFiles);
+ return indexFiles;
}
/**
@@ -156,16 +208,23 @@ public class SegmentIndexFileStore {
*/
private void readMergeFile(String mergeFilePath) throws IOException {
ThriftReader thriftReader = new ThriftReader(mergeFilePath);
- thriftReader.open();
- MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
- MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
- List<String> file_names = indexHeader.getFile_names();
- List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
- assert (file_names.size() == fileData.size());
- for (int i = 0; i < file_names.size(); i++) {
- carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
+ try {
+ thriftReader.open();
+ MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
+ MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
+ List<String> file_names = indexHeader.getFile_names();
+ List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
+ CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
+ assert (file_names.size() == fileData.size());
+ for (int i = 0; i < file_names.size(); i++) {
+ carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
+ carbonIndexMapWithFullPath.put(
+ mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR
+ + file_names.get(i), fileData.get(i).array());
+ }
+ } finally {
+ thriftReader.close();
}
- thriftReader.close();
}
/**
@@ -181,6 +240,9 @@ public class SegmentIndexFileStore {
byte[] bytes = new byte[(int) indexFile.getSize()];
dataInputStream.readFully(bytes);
carbonIndexMap.put(indexFile.getName(), bytes);
+ carbonIndexMapWithFullPath.put(
+ indexFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR
+ + indexFile.getName(), bytes);
dataInputStream.close();
}
@@ -253,6 +315,10 @@ public class SegmentIndexFileStore {
return carbonIndexMap;
}
+ public Map<String, byte[]> getCarbonIndexMapWithFullPath() {
+ return carbonIndexMapWithFullPath;
+ }
+
/**
* This method will read the index information from carbon index file
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index cc98b03..be98f7d 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -102,21 +101,6 @@ public class HdfsFileLock extends AbstractCarbonLock {
status = true;
} catch (IOException e) {
status = false;
- } finally {
- CarbonFile carbonFile =
- FileFactory.getCarbonFile(location, FileFactory.getFileType(location));
- if (carbonFile.exists()) {
- if (carbonFile.delete()) {
- LOGGER.info("Deleted the lock file " + location);
- } else {
- LOGGER.error("Not able to delete the lock file " + location);
- status = false;
- }
- } else {
- LOGGER.error("Not able to delete the lock file because "
- + "it is not existed in location " + location);
- status = false;
- }
}
}
return status;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 4fee4c4..75ea074 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -26,7 +26,6 @@ import java.nio.channels.OverlappingFileLockException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -121,7 +120,6 @@ public class LocalFileLock extends AbstractCarbonLock {
LOGGER.info(e.getMessage());
return false;
}
-
}
/**
@@ -130,27 +128,18 @@ public class LocalFileLock extends AbstractCarbonLock {
* @return
*/
@Override public boolean unlock() {
- boolean status;
+ boolean status = false;
try {
if (null != fileLock) {
fileLock.release();
+ status = true;
}
- status = true;
} catch (IOException e) {
status = false;
} finally {
if (null != fileOutputStream) {
try {
fileOutputStream.close();
- // deleting the lock file after releasing the lock.
- CarbonFile lockFile = FileFactory
- .getCarbonFile(lockFilePath, FileFactory.getFileType(lockFilePath));
- if (!lockFile.exists() || lockFile.delete()) {
- LOGGER.info("Successfully deleted the lock file " + lockFilePath);
- } else {
- LOGGER.error("Not able to delete the lock file " + lockFilePath);
- status = false;
- }
} catch (IOException e) {
LOGGER.error(e.getMessage());
}
@@ -158,5 +147,4 @@ public class LocalFileLock extends AbstractCarbonLock {
}
return status;
}
-
}