You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2016/08/23 15:01:58 UTC
[1/2] incubator-carbondata git commit: CARBONDATA-117 BlockLet
distribution for optimum resource usage
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 2d4609cdf -> 5ebf90a87
CARBONDATA-117 BlockLet distribution for optimum resource usage
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/fe1b0f07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/fe1b0f07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/fe1b0f07
Branch: refs/heads/master
Commit: fe1b0f07deda03fe21b98191be7750bf61d8520c
Parents: 2d4609c
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Wed Jul 20 16:02:18 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Aug 23 20:19:07 2016 +0530
----------------------------------------------------------------------
conf/carbon.properties.template | 2 +
.../core/carbon/datastore/BlockIndexStore.java | 6 +-
.../carbon/datastore/block/BlockletInfos.java | 112 +++++++++++++++++++
.../carbon/datastore/block/TableBlockInfo.java | 50 ++++++++-
.../core/constants/CarbonCommonConstants.java | 11 ++
.../core/util/DataFileFooterConverter.java | 28 ++++-
.../executor/impl/AbstractQueryExecutor.java | 17 ++-
.../scan/executor/infos/BlockExecutionInfo.java | 46 ++++++++
.../AbstractDetailQueryResultIterator.java | 14 ++-
.../carbondata/hadoop/CarbonInputFormat.java | 23 +++-
.../carbondata/hadoop/CarbonInputSplit.java | 18 +++
.../carbondata/hadoop/CarbonRecordReader.java | 5 +-
.../hadoop/ft/CarbonInputMapperTest.java | 3 +-
.../carbondata/spark/load/CarbonLoaderUtil.java | 74 ++++++++++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 11 +-
15 files changed, 397 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/conf/carbon.properties.template
----------------------------------------------------------------------
diff --git a/conf/carbon.properties.template b/conf/carbon.properties.template
index b94fbe4..bd26252 100644
--- a/conf/carbon.properties.template
+++ b/conf/carbon.properties.template
@@ -85,6 +85,8 @@ carbon.enable.quick.filter=false
#carbon.load.metadata.lock.retries=3
##Maximum number of blocklets written in a single file :Min=1:Max=1000
#carbon.max.file.size=100
+##Minimum blocklets needed for distribution.
+#carbon.blockletdistribution.min.blocklet.size=10
##Interval between the retries to get the lock
#carbon.load.metadata.lock.retry.timeout.sec=5
##Temporary store location, By default it will take System.getProperty("java.io.tmpdir")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index be48ce5..07815c0 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.core.carbon.datastore;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -108,10 +107,7 @@ public class BlockIndexStore {
AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()];
addTableLockObject(absoluteTableIdentifier);
- // sort the block info
- // so block will be loaded in sorted order this will be required for
- // query execution
- Collections.sort(tableBlocksInfos);
+
// get the instance
Object lockObject = tableLockMap.get(absoluteTableIdentifier);
Map<TableBlockInfo, AbstractIndex> tableBlockMapTemp = null;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockletInfos.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockletInfos.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockletInfos.java
new file mode 100644
index 0000000..4251888
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockletInfos.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import java.io.Serializable;
+
+/**
+ * The class holds the blocks blocklets info
+ */
+public class BlockletInfos implements Serializable {
+ /**
+ * no of blockLets
+ */
+ private int noOfBlockLets = 0;
+
+ /**
+ * start blocklet number
+ */
+ private int startBlockletNumber;
+ /**
+ * end blocklet number
+ */
+ private int numberOfBlockletToScan;
+ /**
+ * default constructor
+ */
+ public BlockletInfos(){
+ }
+ /**
+ * constructor to initialize the blockletinfo
+ * @param noOfBlockLets
+ * @param startBlockletNumber
+ * @param numberOfBlockletToScan
+ */
+ public BlockletInfos(int noOfBlockLets, int startBlockletNumber, int numberOfBlockletToScan) {
+ this.noOfBlockLets = noOfBlockLets;
+ this.startBlockletNumber = startBlockletNumber;
+ this.numberOfBlockletToScan = numberOfBlockletToScan;
+ }
+
+ /**
+ * returns the number of blockLets
+ *
+ * @return
+ */
+ public int getNoOfBlockLets() {
+ return noOfBlockLets;
+ }
+
+ /**
+ * sets the number of blockLets
+ *
+ * @param noOfBlockLets
+ */
+ public void setNoOfBlockLets(int noOfBlockLets) {
+ this.noOfBlockLets = noOfBlockLets;
+ }
+
+ /**
+ * returns start blocklet number
+ *
+ * @return
+ */
+ public int getStartBlockletNumber() {
+ return startBlockletNumber;
+ }
+
+ /**
+ * set start blocklet number
+ *
+ * @param startBlockletNumber
+ */
+ public void setStartBlockletNumber(int startBlockletNumber) {
+ this.startBlockletNumber = startBlockletNumber;
+ }
+
+ /**
+ * returns end blocklet number
+ *
+ * @return
+ */
+ public int getNumberOfBlockletToScan() {
+ return numberOfBlockletToScan;
+ }
+
+ /**
+ * set end blocklet number to be scaned
+ *
+ * @param numberOfBlockletToScan
+ */
+ public void setNumberOfBlockletToScan(int numberOfBlockletToScan) {
+ this.numberOfBlockletToScan = numberOfBlockletToScan;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index 06166fd..3d393b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -58,7 +58,10 @@ public class TableBlockInfo extends Distributable
private String segmentId;
private String[] locations;
-
+ /**
+ * The class holds the blockletsinfo
+ */
+ private BlockletInfos blockletInfos = new BlockletInfos();
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
long blockLength) {
@@ -70,6 +73,25 @@ public class TableBlockInfo extends Distributable
}
/**
+ * constructor to initialize the TbaleBlockInfo with BlockletInfos
+ * @param filePath
+ * @param blockOffset
+ * @param segmentId
+ * @param locations
+ * @param blockLength
+ * @param blockletInfos
+ */
+ public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
+ long blockLength, BlockletInfos blockletInfos) {
+ this.filePath = FileFactory.getUpdatedFilePath(filePath);
+ this.blockOffset = blockOffset;
+ this.segmentId = segmentId;
+ this.locations = locations;
+ this.blockLength = blockLength;
+ this.blockletInfos = blockletInfos;
+ }
+
+ /**
* @return the filePath
*/
public String getFilePath() {
@@ -185,6 +207,16 @@ public class TableBlockInfo extends Distributable
> ((TableBlockInfo) other).blockOffset + ((TableBlockInfo) other).blockLength) {
return 1;
}
+ //compare the startBlockLetNumber
+ int diffStartBlockLetNumber =
+ blockletInfos.getStartBlockletNumber() - ((TableBlockInfo) other).blockletInfos
+ .getStartBlockletNumber();
+ if (diffStartBlockLetNumber < 0) {
+ return -1;
+ }
+ if (diffStartBlockLetNumber > 0) {
+ return 1;
+ }
return 0;
}
@@ -194,6 +226,7 @@ public class TableBlockInfo extends Distributable
result = 31 * result + (int) (blockLength ^ (blockLength >>> 32));
result = 31 * result + segmentId.hashCode();
result = 31 * result + Arrays.hashCode(locations);
+ result = 31 * result + blockletInfos.getStartBlockletNumber();
return result;
}
@@ -201,4 +234,19 @@ public class TableBlockInfo extends Distributable
return locations;
}
+ /**
+ * returns BlockletInfos
+ * @return
+ */
+ public BlockletInfos getBlockletInfos() {
+ return blockletInfos;
+ }
+
+ /**
+ * set the blocklestinfos
+ * @param blockletInfos
+ */
+ public void setBlockletInfos(BlockletInfos blockletInfos) {
+ this.blockletInfos = blockletInfos;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 4fa77ba..21e5562 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -885,6 +885,17 @@ public final class CarbonCommonConstants {
*/
public static final String ZOOKEEPER_URL = "spark.deploy.zookeeper.url";
+ /**
+ * configure the minimum blocklet size eligible for blocklet distribution
+ */
+ public static final String CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE =
+ "carbon.blockletdistribution.min.blocklet.size";
+
+ /**
+ * default blocklet size eligible for blocklet distribution
+ */
+ public static final int DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE = 2;
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 3a1da8c..8d7e893 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.carbon.metadata.blocklet.sort.SortState;
import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastorage.store.FileHolder;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -70,7 +71,7 @@ public class DataFileFooterConverter {
* @throws IOException problem while reading the index file
*/
public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
- throws IOException {
+ throws IOException, CarbonUtilException {
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
try {
@@ -94,10 +95,13 @@ public class DataFileFooterConverter {
BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
dataFileFooter = new DataFileFooter();
+ TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++);
+ int blockletSize = getBlockletSize(readBlockIndexInfo);
+ tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
dataFileFooter.setBlockletIndex(blockletIndex);
dataFileFooter.setColumnInTable(columnSchemaList);
dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
- dataFileFooter.setTableBlockInfo(tableBlockInfoList.get(counter++));
+ dataFileFooter.setTableBlockInfo(tableBlockInfo);
dataFileFooter.setSegmentInfo(segmentInfo);
dataFileFooters.add(dataFileFooter);
}
@@ -108,6 +112,26 @@ public class DataFileFooterConverter {
}
/**
+ * the methods returns the number of blocklets in a block
+ * @param readBlockIndexInfo
+ * @return
+ */
+ private int getBlockletSize(BlockIndex readBlockIndexInfo) {
+ long num_rows = readBlockIndexInfo.getNum_rows();
+ int blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+ CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
+ int remainder = (int) (num_rows % blockletSize);
+ int noOfBlockLet = (int) (num_rows / blockletSize);
+ // there could be some blocklets which will not
+ // contain the total records equal to the blockletSize
+ if (remainder > 0) {
+ noOfBlockLet = noOfBlockLet + 1;
+ }
+ return noOfBlockLet;
+ }
+
+ /**
* Below method will be used to convert thrift file meta to wrapper file meta
*/
public DataFileFooter readDataFileFooter(String filePath, long blockOffset, long blockLength)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
index e204572..832b2fa 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.scan.executor.impl;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -94,6 +95,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
QueryUtil.resolveQueryModel(queryModel);
QueryStatistic queryStatistic = new QueryStatistic();
+ // sort the block info
+ // so block will be loaded in sorted order this will be required for
+ // query execution
+ Collections.sort(queryModel.getTableBlockInfos());
// get the table blocks
try {
queryProperties.dataBlocks = BlockIndexStore.getInstance()
@@ -194,8 +199,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// query
// and query will be executed based on that infos
for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
- blockExecutionInfoList
- .add(getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i)));
+ blockExecutionInfoList.add(
+ getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i),
+ queryModel.getTableBlockInfos().get(i).getBlockletInfos().getStartBlockletNumber(),
+ queryModel.getTableBlockInfos().get(i).getBlockletInfos()
+ .getNumberOfBlockletToScan()));
}
queryProperties.complexDimensionInfoMap =
blockExecutionInfoList.get(blockExecutionInfoList.size() - 1).getComlexDimensionInfoMap();
@@ -212,7 +220,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
* @throws QueryExecutionException any failure during block info creation
*/
protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
- AbstractIndex blockIndex) throws QueryExecutionException {
+ AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan)
+ throws QueryExecutionException {
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
List<CarbonDimension> tableBlockDimensions = segmentProperties.getDimensions();
@@ -228,6 +237,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
QueryUtil.getMaskedByteRange(updatedQueryDimension, blockKeyGenerator);
int[] maksedByte =
QueryUtil.getMaskedByte(blockKeyGenerator.getKeySizeInBytes(), maskByteRangesForBlock);
+ blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
+ blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
blockExecutionInfo.setQueryDimensions(
updatedQueryDimension.toArray(new QueryDimension[updatedQueryDimension.size()]));
blockExecutionInfo.setQueryMeasures(queryModel.getQueryMeasures()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
index ca5c2e0..d84a183 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
@@ -200,6 +200,16 @@ public class BlockExecutionInfo {
private boolean isRawRecordDetailQuery;
/**
+ * start index of blocklets
+ */
+ private int startBlockletIndex;
+
+ /**
+ * number of blocklet to be scanned
+ */
+ private int numberOfBlockletToScan;
+
+ /**
* complexParentIndexToQueryMap
*/
private Map<Integer, GenericQueryType> complexParentIndexToQueryMap;
@@ -678,4 +688,40 @@ public class BlockExecutionInfo {
public void setQueryMeasures(QueryMeasure[] queryMeasures) {
this.queryMeasures = queryMeasures;
}
+
+ /**
+ * The method to set the number of blocklets to be scanned
+ *
+ * @param numberOfBlockletToScan
+ */
+ public void setNumberOfBlockletToScan(int numberOfBlockletToScan) {
+ this.numberOfBlockletToScan = numberOfBlockletToScan;
+ }
+
+ /**
+ * get the no of blocklet to be scanned
+ *
+ * @return
+ */
+ public int getNumberOfBlockletToScan() {
+ return numberOfBlockletToScan;
+ }
+
+ /**
+ * returns the blocklet index to be scanned
+ *
+ * @return
+ */
+ public int getStartBlockletIndex() {
+ return startBlockletIndex;
+ }
+
+ /**
+ * set the blocklet index to be scanned
+ *
+ * @param startBlockletIndex
+ */
+ public void setStartBlockletIndex(int startBlockletIndex) {
+ this.startBlockletIndex = startBlockletIndex;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 2abe39a..d583560 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -92,9 +92,17 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
DataRefNode startDataBlock = finder
.findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
- DataRefNode endDataBlock = finder
- .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
- long numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ while(startDataBlock.nodeNumber()!= blockInfo.getStartBlockletIndex()) {
+ startDataBlock = startDataBlock.getNextDataRefNode();
+ }
+
+ long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
+ //if number of block is less than 0 then take end block.
+ if (numberOfBlockToScan <= 0) {
+ DataRefNode endDataBlock = finder
+ .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
+ numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ }
blockInfo.setFirstDataBlock(startDataBlock);
blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 04cbd53..6bc692f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
import org.apache.carbondata.core.carbon.datastore.IndexKey;
import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
@@ -273,7 +274,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
}
if (filterPredicates == null) {
- return getSplitsInternal(job);
+ return getSplitsNonFilter(job);
} else {
if (filterPredicates instanceof Expression) {
//process and resolve the expression.
@@ -290,6 +291,19 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
}
}
+ /**
+ * the method will return the blocks to be scanned with blocklets info
+ *
+ * @param job
+ * @return
+ * @throws IOException
+ * @throws IndexBuilderException
+ */
+ private List<InputSplit> getSplitsNonFilter(JobContext job)
+ throws IOException, IndexBuilderException {
+ return getSplits(job, null);
+ }
+
private List<InputSplit> getSplitsInternal(JobContext job) throws IOException {
List<InputSplit> splits = super.getSplits(job);
List<InputSplit> carbonSplits = new ArrayList<InputSplit>(splits.size());
@@ -333,7 +347,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()),
tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
- tableBlockInfo.getLocations()));
+ tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets()));
}
}
return result;
@@ -496,9 +510,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
// identify table blocks
for (InputSplit inputSplit : getSplitsInternal(newJob)) {
CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+ BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
+ carbonInputSplit.getNumberOfBlocklets());
tableBlockInfoList.add(
new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
- segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength()));
+ segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
+ blockletInfos));
}
Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index d215f13..01744f8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -34,6 +34,10 @@ public class CarbonInputSplit extends FileSplit implements Serializable, Writabl
private static final long serialVersionUID = 3520344046772190207L;
private String segmentId;
+ /**
+ * Number of BlockLets in a block
+ */
+ private int numberOfBlocklets = 0;
public CarbonInputSplit() {
super(null, 0, 0, new String[0]);
@@ -45,6 +49,12 @@ public class CarbonInputSplit extends FileSplit implements Serializable, Writabl
this.segmentId = segmentId;
}
+ public CarbonInputSplit(String segmentId, Path path, long start, long length,
+ String[] locations, int numberOfBlocklets) {
+ this(segmentId, path, start, length, locations);
+ this.numberOfBlocklets = numberOfBlocklets;
+ }
+
public static CarbonInputSplit from(String segmentId, FileSplit split) throws IOException {
return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
split.getLocations());
@@ -66,4 +76,12 @@ public class CarbonInputSplit extends FileSplit implements Serializable, Writabl
out.writeUTF(segmentId);
}
+ /**
+ * returns the number of blocklets
+ * @return
+ */
+ public int getNumberOfBlocklets() {
+ return numberOfBlocklets;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index caddfd1..fd0a438 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
@@ -58,10 +59,12 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
throws IOException, InterruptedException {
CarbonInputSplit carbonInputSplit = (CarbonInputSplit) split;
List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+ BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
+ carbonInputSplit.getNumberOfBlocklets());
tableBlockInfoList.add(
new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
carbonInputSplit.getSegmentId(), carbonInputSplit.getLocations(),
- carbonInputSplit.getLength()));
+ carbonInputSplit.getLength(), blockletInfos));
queryModel.setTableBlockInfos(tableBlockInfoList);
readSupport
.intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index cdfd325..435b9c1 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -52,7 +52,8 @@ import org.junit.Test;
public class CarbonInputMapperTest extends TestCase {
- @Before public void setUp() throws Exception {
+ // changed setUp to static init block to avoid un wanted multiple time store creation
+ static {
StoreCreator.createCarbonStore();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index b934b1d..918702d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -49,7 +49,9 @@ import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -93,6 +95,22 @@ public final class CarbonLoaderUtil {
private static final LogService LOGGER =
LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+ /**
+ * minimum no of blocklet required for distribution
+ */
+ private static int minBlockLetsReqForDistribution = 0;
+
+ static {
+ String property = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE);
+ try {
+ minBlockLetsReqForDistribution = Integer.parseInt(property);
+ } catch (NumberFormatException ne) {
+ LOGGER.info("Invalid configuration. Consisering the defaul");
+ minBlockLetsReqForDistribution =
+ CarbonCommonConstants.DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE;
+ }
+ }
/**
* dfs.bytes-per-checksum
@@ -1429,4 +1447,60 @@ public final class CarbonLoaderUtil {
public static String[] getConfiguredLocalDirs(SparkConf conf) {
return Utils.getConfiguredLocalDirs(conf);
}
+
+ /**
+ * method to distribute the blocklets of a block in multiple blocks
+ * @param blockInfoList
+ * @param defaultParallelism
+ * @return
+ */
+ public static List<Distributable> distributeBlockLets(List<TableBlockInfo> blockInfoList,
+ int defaultParallelism) {
+ LOGGER.info("No.Of Blocks before Blocklet distribution: " + blockInfoList.size());
+ List<Distributable> tableBlockInfos = new ArrayList<Distributable>();
+ if (blockInfoList.size() < defaultParallelism) {
+ for (TableBlockInfo tableBlockInfo : blockInfoList) {
+ int noOfBlockLets = tableBlockInfo.getBlockletInfos().getNoOfBlockLets();
+ LOGGER.info(
+ "No.Of blocklet : " + noOfBlockLets + ".Minimum blocklets required for distribution : "
+ + minBlockLetsReqForDistribution);
+ if (noOfBlockLets < minBlockLetsReqForDistribution) {
+ tableBlockInfos.add(tableBlockInfo);
+ continue;
+ }
+ TableBlockInfo tableBlockInfo1 = null;
+ int rem = noOfBlockLets % minBlockLetsReqForDistribution;
+ int count = noOfBlockLets / minBlockLetsReqForDistribution;
+ if (rem > 0) {
+ count = count + 1;
+ }
+ for (int i = 0; i < count; i++) {
+ BlockletInfos blockletInfos = new BlockletInfos();
+ blockletInfos.setStartBlockletNumber(i * minBlockLetsReqForDistribution);
+ blockletInfos.setNumberOfBlockletToScan(minBlockLetsReqForDistribution);
+ blockletInfos.setNoOfBlockLets(blockletInfos.getNoOfBlockLets());
+ tableBlockInfo1 =
+ new TableBlockInfo(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset(),
+ tableBlockInfo.getSegmentId(), tableBlockInfo.getLocations(),
+ tableBlockInfo.getBlockLength(), blockletInfos);
+ tableBlockInfos.add(tableBlockInfo1);
+ }
+ //if rem is greater than 0 then for the last block
+ if (rem > 0) {
+ tableBlockInfo1.getBlockletInfos().setNumberOfBlockletToScan(rem);
+ }
+ }
+ }
+ if (tableBlockInfos.size() == 0) {
+ {
+ for (TableBlockInfo tableBlockInfo : blockInfoList) {
+ tableBlockInfos.add(tableBlockInfo);
+ }
+ LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size());
+ return tableBlockInfos;
+ }
+ }
+ LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size());
+ return tableBlockInfos;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 23e03e4..6c0438f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo}
import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsRecorder}
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
import org.apache.carbondata.scan.executor.QueryExecutorFactory
@@ -103,12 +103,15 @@ class CarbonScanRDD[V: ClassTag](
if (!splits.isEmpty) {
val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
- val blockList = carbonInputSplits.map(inputSplit =>
+ val blockListTemp = carbonInputSplits.map(inputSplit =>
new TableBlockInfo(inputSplit.getPath.toString,
inputSplit.getStart, inputSplit.getSegmentId,
- inputSplit.getLocations, inputSplit.getLength
- ).asInstanceOf[Distributable]
+ inputSplit.getLocations, inputSplit.getLength,
+ new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets)
+ )
)
+ val blockList = CarbonLoaderUtil.
+ distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala
if (blockList.nonEmpty) {
// group blocks to nodes, tasks
val startTime = System.currentTimeMillis
[2/2] incubator-carbondata git commit: [CARBONDATA-117] This closes
#56
Posted by gv...@apache.org.
[CARBONDATA-117] This closes #56
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5ebf90a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5ebf90a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5ebf90a8
Branch: refs/heads/master
Commit: 5ebf90a87999b9dd5ec484e54aceb7487ca3096f
Parents: 2d4609c fe1b0f0
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Tue Aug 23 20:30:07 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Aug 23 20:30:07 2016 +0530
----------------------------------------------------------------------
conf/carbon.properties.template | 2 +
.../core/carbon/datastore/BlockIndexStore.java | 6 +-
.../carbon/datastore/block/BlockletInfos.java | 112 +++++++++++++++++++
.../carbon/datastore/block/TableBlockInfo.java | 50 ++++++++-
.../core/constants/CarbonCommonConstants.java | 11 ++
.../core/util/DataFileFooterConverter.java | 28 ++++-
.../executor/impl/AbstractQueryExecutor.java | 17 ++-
.../scan/executor/infos/BlockExecutionInfo.java | 46 ++++++++
.../AbstractDetailQueryResultIterator.java | 14 ++-
.../carbondata/hadoop/CarbonInputFormat.java | 23 +++-
.../carbondata/hadoop/CarbonInputSplit.java | 18 +++
.../carbondata/hadoop/CarbonRecordReader.java | 5 +-
.../hadoop/ft/CarbonInputMapperTest.java | 3 +-
.../carbondata/spark/load/CarbonLoaderUtil.java | 74 ++++++++++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 11 +-
15 files changed, 397 insertions(+), 23 deletions(-)
----------------------------------------------------------------------