You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:41:55 UTC
[08/50] [abbrv] incubator-carbondata git commit: Fixed driver btree
performance issue (#729)
Fixed driver btree performance issue (#729)
LGTM
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/82332b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/82332b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/82332b0e
Branch: refs/heads/master
Commit: 82332b0e4381ede8a39789ccf1d917af0c89ab39
Parents: be46423
Author: Kumar Vishal <ku...@gmail.com>
Authored: Sat Jun 25 16:05:07 2016 +0800
Committer: sujith71955 <su...@gmail.com>
Committed: Sat Jun 25 16:05:07 2016 +0800
----------------------------------------------------------------------
.../carbon/datastore/SegmentTaskIndexStore.java | 21 ++-
.../carbon/metadata/index/BlockIndexInfo.java | 92 ++++++++++++
.../core/carbon/path/CarbonTablePath.java | 74 +++++++---
.../core/reader/CarbonIndexFileReader.java | 95 ++++++++++++
.../carbondata/core/reader/ThriftReader.java | 47 ++++--
.../core/util/CarbonMetadataUtil.java | 78 ++++++++--
.../org/carbondata/core/util/CarbonUtil.java | 35 +++++
.../core/writer/CarbonIndexFileWriter.java | 64 +++++++++
.../query/util/DataFileFooterConverter.java | 44 ++++++
.../datastore/SegmentTaskIndexStoreTest.java | 143 -------------------
format/src/main/thrift/carbondataindex.thrift | 45 ++++++
.../org/carbondata/hadoop/CarbonPathFilter.java | 4 +-
.../store/writer/AbstractFactDataWriter.java | 135 ++++++++++++++---
13 files changed, 653 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index fde062c..28a892e 100644
--- a/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -116,6 +116,7 @@ public class SegmentTaskIndexStore {
addTableSegmentMap(absoluteTableIdentifier);
Map<String, AbstractIndex> taskIdToSegmentIndexMap = null;
String segmentId = null;
+ String taskId = null;
try {
while (iteratorOverSegmentBlocksInfos.hasNext()) {
// segment id to table block mapping
@@ -147,8 +148,9 @@ public class SegmentTaskIndexStore {
taskIdToTableBlockInfoMap.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
- taskIdToSegmentIndexMap
- .put(taskToBlockInfoList.getKey(), loadBlocks(taskToBlockInfoList.getValue()));
+ taskId = taskToBlockInfoList.getKey();
+ taskIdToSegmentIndexMap.put(taskId,
+ loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier));
}
// removing from segment lock map as once segment is loaded
//if concurrent query is coming for same segment
@@ -231,19 +233,12 @@ public class SegmentTaskIndexStore {
* @return loaded segment
* @throws CarbonUtilException
*/
- private AbstractIndex loadBlocks(List<TableBlockInfo> tableBlockInfoList)
- throws CarbonUtilException {
- DataFileFooter footer = null;
+ private AbstractIndex loadBlocks(String taskId, List<TableBlockInfo> tableBlockInfoList,
+ AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException {
// all the block of one task id will be loaded together
// so creating a list which will have all the data file meta data to of one task
- List<DataFileFooter> footerList = new ArrayList<DataFileFooter>();
- for (TableBlockInfo tableBlockInfo : tableBlockInfoList) {
- footer = CarbonUtil
- .readMetadatFile(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset(),
- tableBlockInfo.getBlockLength());
- footer.setTableBlockInfo(tableBlockInfo);
- footerList.add(footer);
- }
+ List<DataFileFooter> footerList =
+ CarbonUtil.readCarbonIndexFile(taskId, tableBlockInfoList, tableIdentifier);
AbstractIndex segment = new SegmentTaskIndex();
// file path of only first block is passed as it all table block info path of
// same task id will be same
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/carbon/metadata/index/BlockIndexInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/metadata/index/BlockIndexInfo.java b/core/src/main/java/org/carbondata/core/carbon/metadata/index/BlockIndexInfo.java
new file mode 100644
index 0000000..bfed3dd
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/carbon/metadata/index/BlockIndexInfo.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.carbon.metadata.index;
+
+import org.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
+
+/**
+ * Below class will be used hold the information
+ * about block index
+ */
+public class BlockIndexInfo {
+
+ /**
+ * total number of rows present in the file
+ */
+ private long numberOfRows;
+
+ /**
+ * file name
+ */
+ private String fileName;
+
+ /**
+ * offset of metadata in data file
+ */
+ private long offset;
+
+ /**
+ * to store min max and start and end key
+ */
+ private BlockletIndex blockletIndex;
+
+ /**
+ * Constructor
+ *
+ * @param numberOfRows number of rows
+ * @param fileName full qualified name
+ * @param offset offset of the metadata in data file
+ * @param blockletIndex block let index
+ */
+ public BlockIndexInfo(long numberOfRows, String fileName, long offset,
+ BlockletIndex blockletIndex) {
+ this.numberOfRows = numberOfRows;
+ this.fileName = fileName;
+ this.offset = offset;
+ this.blockletIndex = blockletIndex;
+ }
+
+ /**
+ * @return the numberOfRows
+ */
+ public long getNumberOfRows() {
+ return numberOfRows;
+ }
+
+ /**
+ * @return the fileName
+ */
+ public String getFileName() {
+ return fileName;
+ }
+
+ /**
+ * @return the offset
+ */
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * @return the blockletIndex
+ */
+ public BlockletIndex getBlockletIndex() {
+ return blockletIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
index 31bc464..8dcd207 100644
--- a/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
@@ -22,11 +22,14 @@ import java.io.File;
import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
import static org.carbondata.core.constants.CarbonCommonConstants.INVALID_SEGMENT_ID;
import org.apache.hadoop.fs.Path;
+
/**
* Helps to get Table content paths.
*/
@@ -44,6 +47,7 @@ public class CarbonTablePath extends Path {
private static final String PARTITION_PREFIX = "Part";
private static final String CARBON_DATA_EXT = ".carbondata";
private static final String DATA_PART_PREFIX = "part";
+ private static final String INDEX_FILE_EXT = ".carbonindex";
private String tablePath;
@@ -62,13 +66,6 @@ public class CarbonTablePath extends Path {
}
/**
- * gets table path
- */
- public String getPath() {
- return tablePath;
- }
-
- /**
* @param columnId unique column identifier
* @return name of dictionary file
*/
@@ -78,6 +75,7 @@ public class CarbonTablePath extends Path {
/**
* whether carbonFile is dictionary file or not
+ *
* @param carbonFile
* @return
*/
@@ -86,6 +84,27 @@ public class CarbonTablePath extends Path {
}
/**
+ * check if it is carbon data file matching extension
+ *
+ * @param fileNameWithPath
+ * @return boolean
+ */
+ public static boolean isCarbonDataFile(String fileNameWithPath) {
+ int pos = fileNameWithPath.lastIndexOf('.');
+ if (pos != -1) {
+ return fileNameWithPath.substring(pos).startsWith(CARBON_DATA_EXT);
+ }
+ return false;
+ }
+
+ /**
+ * gets table path
+ */
+ public String getPath() {
+ return tablePath;
+ }
+
+ /**
* @param columnId unique column identifier
* @return absolute path of dictionary file
*/
@@ -148,6 +167,29 @@ public class CarbonTablePath extends Path {
}
/**
+ * Below method will be used to get the index file present in the segment folder
+ * based on task id
+ *
+ * @param taskId task id of the file
+ * @param partitionId partition number
+ * @param segmentId segment number
+ * @return full qualified carbon index path
+ */
+ public String getCarbonIndexFilePath(final String taskId, final String partitionId,
+ final String segmentId) {
+ String segmentDir = getSegmentDir(partitionId, segmentId);
+ CarbonFile carbonFile =
+ FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
+
+ CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskId) && file.getName().endsWith(INDEX_FILE_EXT);
+ }
+ });
+ return files[0].getAbsolutePath();
+ }
+
+ /**
* Gets absolute path of data file
*
* @param partitionId unique partition identifier
@@ -189,16 +231,14 @@ public class CarbonTablePath extends Path {
}
/**
- * check if it is carbon data file matching extension
- * @param fileNameWithPath
- * @return boolean
+ * Below method will be used to get the carbon index filename
+ *
+ * @param taskNo task number
+ * @param factUpdatedTimeStamp time stamp
+ * @return filename
*/
- public static boolean isCarbonDataFile(String fileNameWithPath) {
- int pos = fileNameWithPath.lastIndexOf('.');
- if( pos != -1 ) {
- return fileNameWithPath.substring(pos).startsWith(CARBON_DATA_EXT);
- }
- return false;
+ public String getCarbonIndexFileName(int taskNo, String factUpdatedTimeStamp) {
+ return taskNo + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT;
}
private String getSegmentDir(String partitionId, String segmentId) {
@@ -234,7 +274,7 @@ public class CarbonTablePath extends Path {
if (!(o instanceof CarbonTablePath)) {
return false;
}
- CarbonTablePath path = (CarbonTablePath)o;
+ CarbonTablePath path = (CarbonTablePath) o;
return tablePath.equals(path.tablePath) && super.equals(o);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/reader/CarbonIndexFileReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/CarbonIndexFileReader.java b/core/src/main/java/org/carbondata/core/reader/CarbonIndexFileReader.java
new file mode 100644
index 0000000..bb18e9f
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/reader/CarbonIndexFileReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.reader;
+
+import java.io.IOException;
+
+import org.carbondata.format.BlockIndex;
+import org.carbondata.format.IndexHeader;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reader class which will be used to read the index file
+ */
+public class CarbonIndexFileReader {
+
+ /**
+ * reader
+ */
+ private ThriftReader thriftReader;
+
+ /**
+ * Below method will be used to read the index header
+ *
+ * @return index header
+ * @throws IOException if any problem while reader the header
+ */
+ public IndexHeader readIndexHeader() throws IOException {
+ IndexHeader indexHeader = (IndexHeader) thriftReader.read(new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new IndexHeader();
+ }
+ });
+ return indexHeader;
+ }
+
+ /**
+ * Below method will be used to close the reader
+ */
+ public void closeThriftReader() {
+ thriftReader.close();
+ }
+
+ /**
+ * Below method will be used to read the block index from fie
+ *
+ * @return block index info
+ * @throws IOException if problem while reading the block index
+ */
+ public BlockIndex readBlockIndexInfo() throws IOException {
+ BlockIndex blockInfo = (BlockIndex) thriftReader.read(new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new BlockIndex();
+ }
+ });
+ return blockInfo;
+ }
+
+ /**
+ * Open the thrift reader
+ *
+ * @param filePath
+ * @throws IOException
+ */
+ public void openThriftReader(String filePath) throws IOException {
+ thriftReader = new ThriftReader(filePath);
+ thriftReader.open();
+ }
+
+ /**
+ * check if any more object is present
+ *
+ * @return true if any more object can be read
+ * @throws IOException
+ */
+ public boolean hasNext() throws IOException {
+ return thriftReader.hasNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
index 3683a2b..e659919 100644
--- a/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
@@ -35,34 +35,21 @@ import org.apache.thrift.transport.TIOStreamTransport;
*/
public class ThriftReader {
/**
- * Thrift deserializes by taking an existing object and populating it. ThriftReader
- * needs a way of obtaining instances of the class to be populated and this interface
- * defines the mechanism by which a client provides these instances.
- */
- public static interface TBaseCreator {
- TBase create();
- }
-
- /**
* buffer size
*/
private static final int bufferSize = 2048;
-
/**
* File containing the objects.
*/
private String fileName;
-
/**
* Used to create empty objects that will be initialized with values from the fileName.
*/
- private final TBaseCreator creator;
-
+ private TBaseCreator creator;
/**
* For reading the fileName.
*/
private DataInputStream dataInputStream;
-
/**
* For reading the binary thrift objects.
*/
@@ -77,6 +64,13 @@ public class ThriftReader {
}
/**
+ * Constructor.
+ */
+ public ThriftReader(String fileName) {
+ this.fileName = fileName;
+ }
+
+ /**
* Opens the fileName for reading.
*/
public void open() throws IOException {
@@ -118,9 +112,34 @@ public class ThriftReader {
}
/**
+ * Reads the next object from the fileName.
+ *
+ * @param creator type of object which will be returned
+ * @throws IOException any problem while reading
+ */
+ public TBase read(TBaseCreator creator) throws IOException {
+ TBase t = creator.create();
+ try {
+ t.read(binaryIn);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ return t;
+ }
+
+ /**
* Close the fileName.
*/
public void close() {
CarbonUtil.closeStreams(dataInputStream);
}
+
+ /**
+ * Thrift deserializes by taking an existing object and populating it. ThriftReader
+ * needs a way of obtaining instances of the class to be populated and this interface
+ * defines the mechanism by which a client provides these instances.
+ */
+ public static interface TBaseCreator {
+ TBase create();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java
index 5b9c291..5e8e8a2 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java
@@ -11,10 +11,12 @@ import java.util.List;
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.metadata.index.BlockIndexInfo;
import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.core.datastorage.store.compression.ValueCompressionModel;
import org.carbondata.core.metadata.BlockletInfoColumnar;
import org.carbondata.core.metadata.ValueEncoderMeta;
+import org.carbondata.format.BlockIndex;
import org.carbondata.format.BlockletBTreeIndex;
import org.carbondata.format.BlockletIndex;
import org.carbondata.format.BlockletInfo;
@@ -25,6 +27,7 @@ import org.carbondata.format.CompressionCodec;
import org.carbondata.format.DataChunk;
import org.carbondata.format.Encoding;
import org.carbondata.format.FileFooter;
+import org.carbondata.format.IndexHeader;
import org.carbondata.format.PresenceMeta;
import org.carbondata.format.SegmentInfo;
import org.carbondata.format.SortState;
@@ -68,6 +71,23 @@ public class CarbonMetadataUtil {
return footer;
}
+ private static BlockletIndex getBlockletIndex(
+ org.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex info) {
+ BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+
+ for (int i = 0; i < info.getMinMaxIndex().getMaxValues().length; i++) {
+ blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(info.getMinMaxIndex().getMaxValues()[i]));
+ blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(info.getMinMaxIndex().getMinValues()[i]));
+ }
+ BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+ blockletBTreeIndex.setStart_key(info.getBtreeIndex().getStartKey());
+ blockletBTreeIndex.setEnd_key(info.getBtreeIndex().getEndKey());
+ BlockletIndex blockletIndex = new BlockletIndex();
+ blockletIndex.setMin_max_index(blockletMinMaxIndex);
+ blockletIndex.setB_tree_index(blockletBTreeIndex);
+ return blockletIndex;
+ }
+
/**
* Get total number of rows for the file.
*
@@ -334,21 +354,53 @@ public class CarbonMetadataUtil {
min[j] = minMaxIndexList.getMin_values().get(j).array();
max[j] = minMaxIndexList.getMax_values().get(j).array();
}
-
- // byte[][] min = new byte[minMaxIndexList.getMin_values().size()][];
- // List<ByteBuffer> minValues = minMaxIndexList.getMin_values();
- // for (int j = 0; j < minValues.size(); j++) {
- // min[j] = minValues.get(j).array();
- // }
- // listOfNodeInfo.get(i).setColumnMinData(min);
- //
- // byte[][] max = new byte[minMaxIndexList.getMax_values().size()][];
- // List<ByteBuffer> maxValues = minMaxIndexList.getMax_values();
- // for (int j = 0; j < maxValues.size(); j++) {
- // max[j] = maxValues.get(j).array();
- // }
listOfNodeInfo.get(i).setColumnMaxData(max);
}
}
+ /**
+ * Below method will be used to get the index header
+ *
+ * @param columnCardinality cardinality of each column
+ * @param columnSchemaList list of column present in the table
+ * @return Index header object
+ */
+ public static IndexHeader getIndexHeader(int[] columnCardinality,
+ List<ColumnSchema> columnSchemaList) {
+ // create segment info object
+ SegmentInfo segmentInfo = new SegmentInfo();
+ // set the number of columns
+ segmentInfo.setNum_cols(columnSchemaList.size());
+ // setting the column cardinality
+ segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
+ // create index header object
+ IndexHeader indexHeader = new IndexHeader();
+ // set the segment info
+ indexHeader.setSegment_info(segmentInfo);
+ // set the column names
+ indexHeader.setTable_columns(columnSchemaList);
+ return indexHeader;
+ }
+
+ /**
+ * Below method will be used to get the block index info thrift object for each block
+ * present in the segment
+ *
+ * @param blockIndexInfoList block index info list
+ * @return list of block index
+ */
+ public static List<BlockIndex> getBlockIndexInfo(List<BlockIndexInfo> blockIndexInfoList) {
+ List<BlockIndex> thriftBlockIndexList = new ArrayList<BlockIndex>();
+ BlockIndex blockIndex = null;
+ // below code to create block index info object for each block
+ for (BlockIndexInfo blockIndexInfo : blockIndexInfoList) {
+ blockIndex = new BlockIndex();
+ blockIndex.setNum_rows(blockIndexInfo.getNumberOfRows());
+ blockIndex.setOffset(blockIndexInfo.getNumberOfRows());
+ blockIndex.setFile_name(blockIndexInfo.getFileName());
+ blockIndex.setBlock_index(getBlockletIndex(blockIndexInfo.getBlockletIndex()));
+ thriftBlockIndexList.add(blockIndex);
+ }
+ return thriftBlockIndexList;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
index 58e6f42..427fb97 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
@@ -48,12 +48,16 @@ import java.util.concurrent.Executors;
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
import org.carbondata.core.cache.dictionary.Dictionary;
+import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
import org.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
import org.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
import org.carbondata.core.carbon.metadata.datatype.DataType;
import org.carbondata.core.carbon.metadata.encoder.Encoding;
import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.carbondata.core.carbon.path.CarbonStorePath;
+import org.carbondata.core.carbon.path.CarbonTablePath;
import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.core.datastorage.store.FileHolder;
import org.carbondata.core.datastorage.store.columnar.ColumnarKeyStoreDataHolder;
@@ -1683,5 +1687,36 @@ public final class CarbonUtil {
}
}
+ /**
+ * Below method will be used to get all the block index info from index file
+ *
+ * @param taskId task id of the file
+ * @param tableBlockInfoList list of table block
+ * @param absoluteTableIdentifier absolute table identifier
+ * @return list of block info
+ * @throws CarbonUtilException if any problem while reading
+ */
+ public static List<DataFileFooter> readCarbonIndexFile(String taskId,
+ List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier)
+ throws CarbonUtilException {
+ // need to sort the block info list based for task in ascending order so
+ // it will be sinkup with block index read from file
+ Collections.sort(tableBlockInfoList);
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
+ // geting the index file path
+ //TODO need to pass proper partition number when partiton will be supported
+ String carbonIndexFilePath = carbonTablePath
+ .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId());
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ try {
+ // read the index info and return
+ return fileFooterConverter.getIndexInfo(carbonIndexFilePath, tableBlockInfoList);
+ } catch (IOException e) {
+ throw new CarbonUtilException("Problem while reading the file metadata", e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java b/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
new file mode 100644
index 0000000..5ae7b33
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.writer;
+
+import java.io.IOException;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reader class which will be used to read the index file
+ */
+public class CarbonIndexFileWriter {
+
+ /**
+ * thrift writer object
+ */
+ private ThriftWriter thriftWriter;
+
+ /**
+ * It writes thrift object to file
+ *
+ * @param footer
+ * @throws IOException
+ */
+ public void writeThrift(TBase indexObject) throws IOException {
+ thriftWriter.write(indexObject);
+ }
+
+ /**
+ * Below method will be used to open the thrift writer
+ *
+ * @param filePath file path where data need to be written
+ * @throws IOException throws io exception in case of any failure
+ */
+ public void openThriftWriter(String filePath) throws IOException {
+ // create thrift writer instance
+ thriftWriter = new ThriftWriter(filePath, true);
+ // open the file stream
+ thriftWriter.open();
+ }
+
+ /**
+ * Below method will be used to close the thrift object
+ */
+ public void close() {
+ thriftWriter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java b/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java
index 7ca9a3f..fc23a0e 100644
--- a/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java
@@ -29,6 +29,7 @@ import java.util.List;
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
import org.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
import org.carbondata.core.carbon.metadata.blocklet.SegmentInfo;
@@ -47,8 +48,10 @@ import org.carbondata.core.datastorage.store.FileHolder;
import org.carbondata.core.datastorage.store.impl.FileFactory;
import org.carbondata.core.metadata.ValueEncoderMeta;
import org.carbondata.core.reader.CarbonFooterReader;
+import org.carbondata.core.reader.CarbonIndexFileReader;
import org.carbondata.core.util.ByteUtil;
import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.format.BlockIndex;
import org.carbondata.format.FileFooter;
/**
@@ -61,6 +64,47 @@ public class DataFileFooterConverter {
LogServiceFactory.getLogService(DataFileFooterConverter.class.getName());
/**
+ * Below method will be used to get the index info from index file
+ *
+ * @param filePath file path of the index file
+ * @param tableBlockInfoList table block index
+ * @return list of index info
+ * @throws IOException problem while reading the index file
+ */
+ public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
+ throws IOException {
+ CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ // open the reader
+ indexReader.openThriftReader(filePath);
+ // get the index header
+ org.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.carbondata.format.ColumnSchema> table_columns = readIndexHeader.getTable_columns();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ // get the segment info
+ SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+ BlockletIndex blockletIndex = null;
+ int counter = 0;
+ DataFileFooter dataFileFooter = null;
+ List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+ // read the block info from file
+ while (indexReader.hasNext()) {
+ BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+ blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+ dataFileFooter = new DataFileFooter();
+ dataFileFooter.setBlockletIndex(blockletIndex);
+ dataFileFooter.setColumnInTable(columnSchemaList);
+ dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+ dataFileFooter.setTableBlockInfo(tableBlockInfoList.get(counter++));
+ dataFileFooter.setSegmentInfo(segmentInfo);
+ dataFileFooters.add(dataFileFooter);
+ }
+ return dataFileFooters;
+ }
+
+ /**
* Below method will be used to convert thrift file meta to wrapper file meta
*/
public DataFileFooter readDataFileFooter(String filePath, long blockOffset, long blockLength)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/test/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
deleted file mode 100644
index 328917d..0000000
--- a/core/src/test/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package org.carbondata.core.carbon.datastore;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.carbondata.core.carbon.CarbonTableIdentifier;
-import org.carbondata.core.carbon.datastore.block.AbstractIndex;
-import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.carbondata.core.carbon.datastore.exception.IndexBuilderException;
-
-import junit.framework.TestCase;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class SegmentTaskIndexStoreTest extends TestCase {
-
- private SegmentTaskIndexStore indexStore;
-
- @BeforeClass public void setUp() {
- indexStore = SegmentTaskIndexStore.getInstance();
- }
-
- @Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws IOException {
- String canonicalPath =
- new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
- File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
- TableBlockInfo info =
- new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length());
- CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
- AbsoluteTableIdentifier absoluteTableIdentifier =
- new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
- Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList = new HashMap<>();
- mapOfSegmentToTableBlockInfoList.put("0", Arrays.asList(new TableBlockInfo[] { info }));
- Map<String, AbstractIndex> loadAndGetTaskIdToSegmentsMap = null;
- try {
- loadAndGetTaskIdToSegmentsMap = indexStore
- .loadAndGetTaskIdToSegmentsMap(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier);
- } catch (IndexBuilderException e) {
- assertTrue(false);
- }
- assertTrue(loadAndGetTaskIdToSegmentsMap.size() == 1);
- indexStore.removeTableBlocks(Arrays.asList(new String[] { "0" }), absoluteTableIdentifier);
- }
-
- //@Test
- public void testloadAndGetTaskIdToSegmentsMapForSameSegmentLoadedConcurrently()
- throws IOException {
- String canonicalPath =
- new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
- File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
- TableBlockInfo info =
- new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length());
- CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
- AbsoluteTableIdentifier absoluteTableIdentifier =
- new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
- Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList = new HashMap<>();
- mapOfSegmentToTableBlockInfoList.put("0", Arrays.asList(new TableBlockInfo[] { info }));
- ExecutorService executor = Executors.newFixedThreadPool(2);
-
- executor
- .submit(new SegmentLoaderThread(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier));
- executor
- .submit(new SegmentLoaderThread(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier));
- executor.shutdown();
- try {
- executor.awaitTermination(1, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- assertTrue(indexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "0").size() == 1);
- indexStore.removeTableBlocks(Arrays.asList(new String[] { "0" }), absoluteTableIdentifier);
- }
-
- @Test public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
- throws IOException {
- String canonicalPath =
- new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
- File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
- TableBlockInfo info =
- new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length());
- TableBlockInfo info1 =
- new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length());
- CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
- AbsoluteTableIdentifier absoluteTableIdentifier =
- new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
- Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList = new HashMap<>();
- Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList1 = new HashMap<>();
- mapOfSegmentToTableBlockInfoList.put("0", Arrays.asList(new TableBlockInfo[] { info }));
- mapOfSegmentToTableBlockInfoList1.put("1", Arrays.asList(new TableBlockInfo[] { info1 }));
- ExecutorService executor = Executors.newFixedThreadPool(2);
-
- executor
- .submit(new SegmentLoaderThread(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier));
- executor.submit(
- new SegmentLoaderThread(mapOfSegmentToTableBlockInfoList1, absoluteTableIdentifier));
-
- executor.shutdown();
- try {
- executor.awaitTermination(1, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- assertTrue(indexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "0").size() == 1);
- assertTrue(indexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "1").size() == 1);
- indexStore.removeTableBlocks(Arrays.asList(new String[] { "0" }), absoluteTableIdentifier);
- indexStore.removeTableBlocks(Arrays.asList(new String[] { "1" }), absoluteTableIdentifier);
- }
-
- private class SegmentLoaderThread implements Callable<Void> {
- private Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList;
-
- private AbsoluteTableIdentifier absoluteTableIdentifier;
-
- public SegmentLoaderThread(Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
- // TODO Auto-generated constructor stub
- this.mapOfSegmentToTableBlockInfoList = mapOfSegmentToTableBlockInfoList;
- this.absoluteTableIdentifier = absoluteTableIdentifier;
- }
-
- @Override public Void call() throws Exception {
- indexStore
- .loadAndGetTaskIdToSegmentsMap(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier);
- return null;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/format/src/main/thrift/carbondataindex.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondataindex.thrift b/format/src/main/thrift/carbondataindex.thrift
new file mode 100644
index 0000000..14159f1
--- /dev/null
+++ b/format/src/main/thrift/carbondataindex.thrift
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * File format description for the carbon file format
+ */
+namespace java org.carbondata.format
+
+include "schema.thrift"
+include "carbondata.thrift"
+
+/**
+ * header information stored in index file
+ */
+struct IndexHeader{
+ 1: required i32 version; // version used for data compatibility
+ 2: required list<schema.ColumnSchema> table_columns; // Description of columns in this file
+ 3: required carbondata.SegmentInfo segment_info; // Segment info (will be same/repeated for all files in this segment)
+}
+
+/**
+ * block index information stored in index file for every block
+ */
+struct BlockIndex{
+ 1: required i64 num_rows; // Total number of rows in this file
+ 2: required string file_name; // Block file name
+ 3: required i64 offset; // Offset of block
+ 4: required carbondata.BlockletIndex block_index; // Block index
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java b/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java
index 0e310f3..1bdd3d1 100644
--- a/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java
+++ b/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java
@@ -18,6 +18,8 @@
*/
package org.carbondata.hadoop;
+import org.carbondata.core.carbon.path.CarbonTablePath;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -37,6 +39,6 @@ public class CarbonPathFilter implements PathFilter {
}
@Override public boolean accept(Path path) {
- return true;
+ return CarbonTablePath.isCarbonDataFile(path.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a0fa842..4e49806 100644
--- a/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -40,8 +40,12 @@ import java.util.concurrent.TimeUnit;
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
import org.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex;
+import org.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
+import org.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex;
import org.carbondata.core.carbon.metadata.converter.SchemaConverter;
import org.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.carbondata.core.carbon.metadata.index.BlockIndexInfo;
import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
import org.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
import org.carbondata.core.carbon.path.CarbonStorePath;
@@ -52,17 +56,20 @@ import org.carbondata.core.datastorage.store.impl.FileFactory;
import org.carbondata.core.file.manager.composite.FileData;
import org.carbondata.core.file.manager.composite.IFileManagerComposite;
import org.carbondata.core.metadata.BlockletInfoColumnar;
+import org.carbondata.core.util.ByteUtil;
import org.carbondata.core.util.CarbonMergerUtil;
import org.carbondata.core.util.CarbonMetadataUtil;
import org.carbondata.core.util.CarbonProperties;
import org.carbondata.core.util.CarbonUtil;
import org.carbondata.core.writer.CarbonFooterWriter;
+import org.carbondata.core.writer.CarbonIndexFileWriter;
+import org.carbondata.format.BlockIndex;
import org.carbondata.format.FileFooter;
+import org.carbondata.format.IndexHeader;
import org.carbondata.processing.store.CarbonDataFileAttributes;
import org.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.commons.lang3.ArrayUtils;
-
import org.apache.hadoop.io.IOUtils;
public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<T>
@@ -164,6 +171,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
private int spaceReservedForBlockMetaSize;
private FileOutputStream fileOutputStream;
+ private List<BlockIndexInfo> blockIndexInfoList;
+
public AbstractFactDataWriter(String storeLocation, int measureCount, int mdKeyLength,
String databaseName, String tableName, IFileManagerComposite fileManager, int[] keyBlockSize,
CarbonDataFileAttributes carbonDataFileAttributes, List<ColumnSchema> columnSchema,
@@ -180,6 +189,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.storeLocation = storeLocation;
this.blockletInfoList =
new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ blockIndexInfoList = new ArrayList<>();
// get max file size;
CarbonProperties propInstance = CarbonProperties.getInstance();
this.fileSizeInBytes = Long.parseLong(propInstance
@@ -223,6 +233,27 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
}
/**
+ * This method will return max of block size and file size
+ *
+ * @param blockSize
+ * @param fileSize
+ * @return
+ */
+ private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
+ long maxSize = blockSize;
+ if (fileSize > blockSize) {
+ maxSize = fileSize;
+ }
+ // block size should be exactly divisible by 512 which is maintained by HDFS as bytes
+ // per checksum, dfs.bytes-per-checksum=512 must divide block size
+ long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
+ if (remainder > 0) {
+ maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
+ }
+ return maxSize;
+ }
+
+ /**
* @param isNoDictionary the isNoDictionary to set
*/
public void setIsNoDictionary(boolean[] isNoDictionary) {
@@ -237,7 +268,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* current file size to 0 close the existing file channel get the new file
* name and get the channel for new file
*
- * @param blockletDataSize data size of one block
+ * @param blockletDataSize data size of one block
* @throws CarbonDataWriterException if any problem
*/
protected void updateBlockletFileChannel(long blockletDataSize) throws CarbonDataWriterException {
@@ -326,11 +357,56 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
FileFooter convertFileMeta = CarbonMetadataUtil
.convertFileFooter(infoList, localCardinality.length, localCardinality,
thriftColumnSchemaList);
+ fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
writer.writeFooter(convertFileMeta, currentPosition);
} catch (IOException e) {
throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
}
+ }
+ /**
+ * Below method will be used to fill the vlock info details
+ *
+ * @param infoList info list
+ * @param numberOfRows number of rows in file
+ * @param filePath file path
+ * @param currentPosition current offset
+ */
+ private void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows,
+ String filePath, long currentPosition) {
+
+ // as min-max will change for each blocklet and second blocklet min-max can be lesser than
+ // the first blocklet so we need to calculate the complete block level min-max by taking
+ // the min value of each column and max value of each column
+ byte[][] currentMinValue = infoList.get(0).getColumnMinData().clone();
+ byte[][] currentMaxValue = infoList.get(0).getColumnMaxData().clone();
+ byte[][] minValue = null;
+ byte[][] maxValue = null;
+ for (int i = 1; i < infoList.size(); i++) {
+ minValue = infoList.get(i).getColumnMinData();
+ maxValue = infoList.get(i).getColumnMaxData();
+ for (int j = 0; j < maxValue.length; j++) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
+ currentMinValue[j] = minValue[j].clone();
+ }
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue[j]) < 0) {
+ currentMaxValue[j] = maxValue[j].clone();
+ }
+ }
+ }
+ // start and end key we can take based on first blocklet
+ // start key will be the block start key as
+ // it is the least key and end blocklet end key will be the block end key as it is the max key
+ BlockletBTreeIndex btree = new BlockletBTreeIndex(infoList.get(0).getStartKey(),
+ infoList.get(infoList.size() - 1).getEndKey());
+ BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
+ minmax.setMinValues(currentMinValue);
+ minmax.setMaxValues(currentMaxValue);
+ BlockletIndex blockletIndex = new BlockletIndex(btree, minmax);
+ BlockIndexInfo blockIndexInfo =
+ new BlockIndexInfo(numberOfRows, filePath.substring(0, filePath.lastIndexOf('.')),
+ currentPosition, blockletIndex);
+ blockIndexInfoList.add(blockIndexInfo);
}
protected List<org.carbondata.format.ColumnSchema> getColumnSchemaListAndCardinality(
@@ -422,10 +498,44 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
renameCarbonDataFile();
copyCarbonDataFileToCarbonStorePath(this.fileName.substring(0, this.fileName.lastIndexOf('.')));
+ try {
+ writeIndexFile();
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the index file", e);
+ }
closeExecutorService();
}
/**
+ * Below method will be used to write the idex file
+ *
+ * @throws IOException throws io exception if any problem while writing
+ * @throws CarbonDataWriterException data writing
+ */
+ private void writeIndexFile() throws IOException, CarbonDataWriterException {
+ // get the header
+ IndexHeader indexHeader =
+ CarbonMetadataUtil.getIndexHeader(localCardinality, thriftColumnSchemaList);
+ // get the block index info thrift
+ List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
+ String fileName = storeLocation + File.separator + carbonTablePath
+ .getCarbonIndexFileName(carbonDataFileAttributes.getTaskId(),
+ carbonDataFileAttributes.getFactTimeStamp());
+ CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
+ // open file
+ writer.openThriftWriter(fileName);
+ // write the header first
+ writer.writeThrift(indexHeader);
+ // write the indexes
+ for (BlockIndex blockIndex : blockIndexThrift) {
+ writer.writeThrift(blockIndex);
+ }
+ writer.close();
+ // copy from temp to actual store location
+ copyCarbonDataFileToCarbonStorePath(fileName);
+ }
+
+ /**
* This method will close the executor service which is used for copying carbon
* data files to carbon store path
*
@@ -517,27 +627,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
}
/**
- * This method will return max of block size and file size
- *
- * @param blockSize
- * @param fileSize
- * @return
- */
- private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
- long maxSize = blockSize;
- if (fileSize > blockSize) {
- maxSize = fileSize;
- }
- // block size should be exactly divisible by 512 which is maintained by HDFS as bytes
- // per checksum, dfs.bytes-per-checksum=512 must divide block size
- long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
- if (remainder > 0) {
- maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
- }
- return maxSize;
- }
-
- /**
* Write leaf meta data to File.
*
* @throws CarbonDataWriterException