You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/03/16 09:28:53 UTC
[10/14] incubator-carbondata git commit: Handled review comments
Handled review comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6b3b16c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6b3b16c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6b3b16c5
Branch: refs/heads/master
Commit: 6b3b16c5971bd383f7ed4bb3ae2164b378044020
Parents: 35739e5
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Mar 15 10:50:49 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Mar 16 14:50:43 2017 +0530
----------------------------------------------------------------------
.../core/cache/dictionary/ManageDictionary.java | 113 ++++++++++
.../core/datastore/block/SegmentProperties.java | 20 ++
.../core/metadata/schema/table/CarbonTable.java | 139 +++++++++++-
.../impl/DictionaryBasedResultCollector.java | 215 ++++++++++--------
.../DictionaryBasedVectorResultCollector.java | 71 +++---
.../collector/impl/RawBasedResultCollector.java | 44 +++-
...structureBasedDictionaryResultCollector.java | 119 ++--------
.../RestructureBasedRawResultCollector.java | 80 +++----
.../RestructureBasedVectorResultCollector.java | 221 +++++--------------
.../executor/impl/AbstractQueryExecutor.java | 103 ++++++---
.../core/scan/executor/infos/DimensionInfo.java | 54 +++++
.../core/scan/executor/util/QueryUtil.java | 67 ------
.../scan/executor/util/RestructureUtil.java | 58 +++--
.../carbondata/core/scan/filter/FilterUtil.java | 82 ++-----
.../executer/RestructureEvaluatorImpl.java | 81 +++++++
.../RestructureExcludeFilterExecutorImpl.java | 12 +-
.../RestructureIncludeFilterExecutorImpl.java | 12 +-
.../executer/RowLevelFilterExecuterImpl.java | 27 +--
.../RowLevelRangeGrtThanFiterExecuterImpl.java | 10 +-
...elRangeGrtrThanEquaToFilterExecuterImpl.java | 10 +-
...velRangeLessThanEqualFilterExecuterImpl.java | 9 +-
.../RowLevelRangeLessThanFiterExecuterImpl.java | 9 +-
.../RowLevelRangeFilterResolverImpl.java | 6 +-
.../visitor/NoDictionaryTypeVisitor.java | 17 +-
.../carbondata/core/scan/model/QueryModel.java | 2 +-
.../scan/result/vector/CarbonColumnVector.java | 14 ++
.../apache/carbondata/core/util/CarbonUtil.java | 113 ++--------
.../carbondata/core/util/DataTypeUtil.java | 40 +---
.../scan/executor/util/RestructureUtilTest.java | 5 +-
.../spark/merger/CarbonCompactionExecutor.java | 16 +-
.../spark/merger/CarbonCompactionUtil.java | 18 +-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 14 +-
.../spark/tasks/DictionaryWriterTask.scala | 30 ++-
.../spark/tasks/SortIndexWriterTask.scala | 22 +-
.../spark/util/GlobalDictionaryUtil.scala | 49 ++--
.../vectorreader/ColumnarVectorWrapper.java | 34 +++
.../spark/sql/CarbonDictionaryDecoder.scala | 6 +-
.../execution/command/carbonTableSchema.scala | 3 +-
.../apache/spark/sql/hive/CarbonMetastore.scala | 4 +-
.../newflow/DataLoadProcessBuilder.java | 16 +-
.../util/CarbonDataProcessorUtil.java | 8 +-
41 files changed, 1054 insertions(+), 919 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
new file mode 100644
index 0000000..706bc20
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.cache.dictionary;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * This class is aimed at managing dictionary files for any new addition and deletion
+ * and calling of clear cache for the non existing dictionary files
+ */
+public class ManageDictionary {
+
+ /**
+ * Attribute for Carbon LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ManageDictionary.class.getName());
+
+ /**
+ * This method will delete the dictionary files for the given column IDs and
+ * clear the dictionary cache
+ *
+ * @param dictionaryColumns
+ * @param carbonTable
+ */
+ public static void deleteDictionaryFileAndCache(List<CarbonColumn> dictionaryColumns,
+ CarbonTable carbonTable) {
+ if (!dictionaryColumns.isEmpty()) {
+ CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
+ CarbonTablePath carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath(), carbonTableIdentifier);
+ String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
+ CarbonFile metadataDir = FileFactory
+ .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
+ for (final CarbonColumn column : dictionaryColumns) {
+ // sort index file is created with dictionary size appended to it. So all the files
+ // with a given column ID need to be listed
+ CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile path) {
+ if (path.getName().startsWith(column.getColumnId())) {
+ return true;
+ }
+ return false;
+ }
+ });
+ for (CarbonFile file : listFiles) {
+ // try catch is inside for loop because even if one deletion fails, other files
+ // still need to be deleted
+ try {
+ FileFactory.deleteFile(file.getCanonicalPath(),
+ FileFactory.getFileType(file.getCanonicalPath()));
+ } catch (IOException e) {
+ LOGGER.error(
+ "Failed to delete dictionary or sortIndex file for column " + column.getColName()
+ + "with column ID " + column.getColumnId());
+ }
+ }
+ // remove dictionary cache
+ removeDictionaryColumnFromCache(carbonTable, column.getColumnId());
+ }
+ }
+ }
+
+ /**
+ * This method will remove dictionary cache from driver for both reverse and forward dictionary
+ *
+ * @param carbonTable
+ * @param columnId
+ */
+ public static void removeDictionaryColumnFromCache(CarbonTable carbonTable, String columnId) {
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache = CacheProvider.getInstance()
+ .createCache(CacheType.REVERSE_DICTIONARY, carbonTable.getStorePath());
+ DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+ new DictionaryColumnUniqueIdentifier(carbonTable.getCarbonTableIdentifier(),
+ new ColumnIdentifier(columnId, null, null));
+ dictCache.invalidate(dictionaryColumnUniqueIdentifier);
+ dictCache = CacheProvider.getInstance()
+ .createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath());
+ dictCache.invalidate(dictionaryColumnUniqueIdentifier);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 57cd145..84bf208 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -761,4 +761,24 @@ public class SegmentProperties {
return blockTodimensionOrdinalMapping;
}
+ /**
+ * This method will search a given dimension and return the dimension from current block
+ *
+ * @param queryDimension
+ * @return
+ */
+ public CarbonDimension getDimensionFromCurrentBlock(CarbonDimension queryDimension) {
+ return CarbonUtil.getDimensionFromCurrentBlock(this.dimensions, queryDimension);
+ }
+
+ /**
+ * This method will search for a given measure in the current block measures list
+ *
+ * @param columnId
+ * @return
+ */
+ public CarbonMeasure getMeasureFromCurrentBlock(String columnId) {
+ return CarbonUtil.getMeasureFromCurrentBlock(this.measures, columnId);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index fd6992c..4527b77 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -55,11 +55,22 @@ public class CarbonTable implements Serializable {
private AbsoluteTableIdentifier absoluteTableIdentifier;
/**
- * TableName, Dimensions list
+ * TableName, Dimensions list. This map will contain dimensions which are visible
*/
- private Map<String, List<CarbonDimension>> tableDimensionsMap;
+ private transient Map<String, List<CarbonDimension>> tableDimensionsMap;
+
+ /**
+ * TableName, Dimensions list. This map will contain both visible and not visible dimensions
+ */
+ private transient Map<String, List<CarbonDimension>> tableDimensionsMapWithDeletedColumns;
+
+ /**
+ * list of all the dimensions
+ */
+ private List<CarbonDimension> dimensions;
private Map<String, List<CarbonColumn>> createOrderColumn;
+
/**
* TableName, Dimensions and children dimensions list
*/
@@ -71,9 +82,19 @@ public class CarbonTable implements Serializable {
private Map<String, List<CarbonDimension>> tableImplicitDimensionsMap;
/**
- * table measures list.
+ * table measures list. This map will contain dimensions which are visible
*/
- private Map<String, List<CarbonMeasure>> tableMeasuresMap;
+ private transient Map<String, List<CarbonMeasure>> tableMeasuresMap;
+
+ /**
+ * table measures list. This map will contain both visible and not visible measures
+ */
+ private transient Map<String, List<CarbonMeasure>> tableMeasuresMapWithDeletedMeasures;
+
+ /**
+ * list of measures
+ */
+ private List<CarbonMeasure> measures;
/**
* table bucket map.
@@ -106,9 +127,7 @@ public class CarbonTable implements Serializable {
private int blockSize;
public CarbonTable() {
- this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
- this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>();
this.tableBucketMap = new HashMap<>();
this.aggregateTablesName = new ArrayList<String>();
this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
@@ -148,6 +167,8 @@ public class CarbonTable implements Serializable {
*/
private void fillCreateOrderColumn(String tableName) {
List<CarbonColumn> columns = new ArrayList<CarbonColumn>();
+ fillVisibleDimensions(tableName);
+ fillVisibleMeasures(tableName);
List<CarbonDimension> dimensions = this.tableDimensionsMap.get(tableName);
List<CarbonMeasure> measures = this.tableMeasuresMap.get(tableName);
Iterator<CarbonDimension> dimItr = dimensions.iterator();
@@ -199,13 +220,11 @@ public class CarbonTable implements Serializable {
* @param tableSchema
*/
private void fillDimensionsAndMeasuresForTables(TableSchema tableSchema) {
- List<CarbonDimension> dimensions = new ArrayList<CarbonDimension>();
List<CarbonDimension> primitiveDimensions = new ArrayList<CarbonDimension>();
List<CarbonDimension> implicitDimensions = new ArrayList<CarbonDimension>();
- List<CarbonMeasure> measures = new ArrayList<CarbonMeasure>();
- this.tableDimensionsMap.put(tableSchema.getTableName(), dimensions);
+ dimensions = new ArrayList<>();
+ measures = new ArrayList<>();
this.tablePrimitiveDimensionsMap.put(this.tableUniqueName, primitiveDimensions);
- this.tableMeasuresMap.put(tableSchema.getTableName(), measures);
this.tableImplicitDimensionsMap.put(tableSchema.getTableName(), implicitDimensions);
int dimensionOrdinal = 0;
int measureOrdinal = 0;
@@ -385,6 +404,7 @@ public class CarbonTable implements Serializable {
* @return number of dimension present the table
*/
public int getNumberOfDimensions(String tableName) {
+ fillVisibleDimensions(tableName);
return tableDimensionsMap.get(tableName).size();
}
@@ -395,6 +415,7 @@ public class CarbonTable implements Serializable {
* @return number of measures present the table
*/
public int getNumberOfMeasures(String tableName) {
+ fillVisibleMeasures(tableName);
return tableMeasuresMap.get(tableName).size();
}
@@ -405,6 +426,7 @@ public class CarbonTable implements Serializable {
* @return all dimension of a table
*/
public List<CarbonDimension> getDimensionByTableName(String tableName) {
+ fillVisibleDimensions(tableName);
return tableDimensionsMap.get(tableName);
}
@@ -415,6 +437,7 @@ public class CarbonTable implements Serializable {
* @return all measure of a table
*/
public List<CarbonMeasure> getMeasureByTableName(String tableName) {
+ fillVisibleMeasures(tableName);
return tableMeasuresMap.get(tableName);
}
@@ -445,6 +468,7 @@ public class CarbonTable implements Serializable {
* @return
*/
public CarbonMeasure getMeasureByName(String tableName, String columnName) {
+ fillVisibleMeasures(tableName);
List<CarbonMeasure> measureList = tableMeasuresMap.get(tableName);
for (CarbonMeasure measure : measureList) {
if (!measure.isInvisible() && measure.getColName().equalsIgnoreCase(columnName)) {
@@ -463,6 +487,7 @@ public class CarbonTable implements Serializable {
*/
public CarbonDimension getDimensionByName(String tableName, String columnName) {
CarbonDimension carbonDimension = null;
+ fillVisibleDimensions(tableName);
List<CarbonDimension> dimList = tableDimensionsMap.get(tableName);
for (CarbonDimension dim : dimList) {
if (!dim.isInvisible() && dim.getColName().equalsIgnoreCase(columnName)) {
@@ -503,6 +528,7 @@ public class CarbonTable implements Serializable {
* @return list of child dimensions
*/
public List<CarbonDimension> getChildren(String dimName) {
+ fillVisibleDimensions(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
for (List<CarbonDimension> list : tableDimensionsMap.values()) {
List<CarbonDimension> childDims = getChildren(dimName, list);
if (childDims != null) {
@@ -580,4 +606,97 @@ public class CarbonTable implements Serializable {
}
return null;
}
+
+ /**
+ * fill all dimensions if empty and return the map
+ *
+ * @param tableName
+ * @return
+ */
+ public Map<String, List<CarbonDimension>> getTableDimensionsMapWithDeletedColumns(
+ String tableName) {
+ if (null == tableDimensionsMapWithDeletedColumns || null == tableDimensionsMapWithDeletedColumns
+ .get(tableName)) {
+ synchronized (this) {
+ if (null == tableDimensionsMapWithDeletedColumns
+ || null == tableDimensionsMapWithDeletedColumns.get(tableName)) {
+ if (null == tableDimensionsMapWithDeletedColumns) {
+ tableDimensionsMapWithDeletedColumns = new HashMap<>();
+ }
+ tableDimensionsMapWithDeletedColumns.put(tableName, dimensions);
+ }
+ }
+ }
+ return tableDimensionsMapWithDeletedColumns;
+ }
+
+ /**
+ * This method will all the visible dimensions
+ *
+ * @param tableName
+ */
+ private void fillVisibleDimensions(String tableName) {
+ if (null == tableDimensionsMap || null == tableDimensionsMap.get(tableName)) {
+ synchronized (this) {
+ if (null == tableDimensionsMap || null == tableDimensionsMap.get(tableName)) {
+ if (null == tableDimensionsMap) {
+ tableDimensionsMap = new HashMap<>();
+ }
+ List<CarbonDimension> visibleDimensions = new ArrayList<>(dimensions.size());
+ for (CarbonDimension dimension : dimensions) {
+ if (!dimension.isInvisible()) {
+ visibleDimensions.add(dimension);
+ }
+ }
+ tableDimensionsMap.put(tableName, visibleDimensions);
+ }
+ }
+ }
+ }
+
+ /**
+ * fill all measures if empty and return the map
+ *
+ * @param tableName
+ * @return
+ */
+ public Map<String, List<CarbonMeasure>> getTableMeasuresMapWithDeletedMeasures(String tableName) {
+ if (null == tableMeasuresMapWithDeletedMeasures || null == tableMeasuresMapWithDeletedMeasures
+ .get(tableName)) {
+ synchronized (this) {
+ if (null == tableMeasuresMapWithDeletedMeasures
+ || null == tableMeasuresMapWithDeletedMeasures.get(tableName)) {
+ if (null == tableMeasuresMapWithDeletedMeasures) {
+ tableMeasuresMapWithDeletedMeasures = new HashMap<>();
+ }
+ tableMeasuresMapWithDeletedMeasures.put(tableName, measures);
+ }
+ }
+ }
+ return tableMeasuresMapWithDeletedMeasures;
+ }
+
+ /**
+ * This method will all the visible measures
+ *
+ * @param tableName
+ */
+ private void fillVisibleMeasures(String tableName) {
+ if (null == tableMeasuresMap || null == tableMeasuresMap.get(tableName)) {
+ synchronized (this) {
+ if (null == tableMeasuresMap || null == tableMeasuresMap.get(tableName)) {
+ if (null == tableMeasuresMap) {
+ tableMeasuresMap = new HashMap<>();
+ }
+ List<CarbonMeasure> visibleMeasures = new ArrayList<>(measures.size());
+ for (CarbonMeasure measure : measures) {
+ if (!measure.isInvisible()) {
+ visibleMeasures.add(measure);
+ }
+ }
+ tableMeasuresMap.put(tableName, visibleMeasures);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index 14e1ab9..c5539b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -40,6 +40,31 @@ import org.apache.commons.lang3.ArrayUtils;
*/
public class DictionaryBasedResultCollector extends AbstractScannedResultCollector {
+ protected QueryDimension[] queryDimensions;
+
+ protected QueryMeasure[] queryMeasures;
+
+ protected DirectDictionaryGenerator[] directDictionaryGenerators;
+
+ /**
+ * query order
+ */
+ protected int[] order;
+
+ protected int[] actualIndexInSurrogateKey;
+
+ protected boolean[] dictionaryEncodingArray;
+
+ protected boolean[] directDictionaryEncodingArray;
+
+ protected boolean[] implictColumnArray;
+
+ protected boolean[] complexDataTypeArray;
+
+ protected int dictionaryColumnIndex;
+ protected int noDictionaryColumnIndex;
+ protected int complexTypeColumnIndex;
+
public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
}
@@ -49,66 +74,22 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
* it will keep track of how many record is processed, to handle limit scenario
*/
@Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
-
- List<Object[]> listBasedResult = new ArrayList<>(batchSize);
- boolean isMsrsPresent = measureInfo.getMeasureDataTypes().length > 0;
-
- QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
- List<Integer> dictionaryIndexes = new ArrayList<Integer>();
- for (int i = 0; i < queryDimensions.length; i++) {
- if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
- .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
- }
- }
- int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray(
- new Integer[dictionaryIndexes.size()]));
- Arrays.sort(primitive);
- int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
- int index = 0;
- for (int i = 0; i < queryDimensions.length; i++) {
- if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
- .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- actualIndexInSurrogateKey[index++] =
- Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
- }
- }
-
- QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
- BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
- scannedResult.getDeleteDeltaDataCache();
- Map<Integer, GenericQueryType> comlexDimensionInfoMap =
- tableBlockExecutionInfos.getComlexDimensionInfoMap();
- boolean[] dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
- boolean[] directDictionaryEncodingArray =
- CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
- boolean[] implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
- boolean[] complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
- int dimSize = queryDimensions.length;
- boolean isDimensionsExist = dimSize > 0;
- int[] order = new int[dimSize + queryMeasures.length];
- for (int i = 0; i < dimSize; i++) {
- order[i] = queryDimensions[i].getQueryOrder();
- }
- for (int i = 0; i < queryMeasures.length; i++) {
- order[i + dimSize] = queryMeasures[i].getQueryOrder();
- }
+ queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+ queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+ initDimensionAndMeasureIndexesForFillingData();
// scan the record and add to list
+ List<Object[]> listBasedResult = new ArrayList<>(batchSize);
int rowCounter = 0;
- int dictionaryColumnIndex = 0;
- int noDictionaryColumnIndex = 0;
- int complexTypeColumnIndex = 0;
int[] surrogateResult;
String[] noDictionaryKeys;
byte[][] complexTypeKeyArray;
-
- DirectDictionaryGenerator[] directDictionaryGenerators = new DirectDictionaryGenerator[dimSize];
- for (int i = 0; i < dimSize; i++) {
- directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
- }
+ boolean isDimensionsExist = queryDimensions.length > 0;
+ BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
+ scannedResult.getDeleteDeltaDataCache();
+ Map<Integer, GenericQueryType> comlexDimensionInfoMap =
+ tableBlockExecutionInfos.getComlexDimensionInfoMap();
while (scannedResult.hasNext() && rowCounter < batchSize) {
- Object[] row = new Object[dimSize + queryMeasures.length];
+ Object[] row = new Object[queryDimensions.length + queryMeasures.length];
if (isDimensionsExist) {
surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
@@ -116,38 +97,10 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
dictionaryColumnIndex = 0;
noDictionaryColumnIndex = 0;
complexTypeColumnIndex = 0;
- for (int i = 0; i < dimSize; i++) {
- if (!dictionaryEncodingArray[i]) {
- if (implictColumnArray[i]) {
- if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
- .equals(queryDimensions[i].getDimension().getColName())) {
- row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
- scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR
- + scannedResult.getCurrenrRowId(), DataType.STRING);
- } else {
- row[order[i]] = DataTypeUtil
- .getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING);
- }
- } else {
- row[order[i]] = DataTypeUtil
- .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
- queryDimensions[i].getDimension().getDataType());
- }
- } else if (directDictionaryEncodingArray[i]) {
- if (directDictionaryGenerators[i] != null) {
- row[order[i]] = directDictionaryGenerators[i].getValueFromSurrogate(
- surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
- }
- } else if (complexDataTypeArray[i]) {
- row[order[i]] = comlexDimensionInfoMap
- .get(queryDimensions[i].getDimension().getOrdinal())
- .getDataBasedOnDataTypeFromSurrogates(
- ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
- } else {
- row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
- }
+ for (int i = 0; i < queryDimensions.length; i++) {
+ fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
+ comlexDimensionInfoMap, row, i);
}
-
} else {
scannedResult.incrementCounter();
}
@@ -155,17 +108,95 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
.contains(scannedResult.getCurrenrRowId())) {
continue;
}
- if (isMsrsPresent) {
- Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
- fillMeasureData(msrValues, 0, scannedResult);
- for (int i = 0; i < msrValues.length; i++) {
- row[order[i + dimSize]] = msrValues[i];
- }
- }
+ fillMeasureData(scannedResult, row);
listBasedResult.add(row);
rowCounter++;
}
return listBasedResult;
}
+ protected void fillDimensionData(AbstractScannedResult scannedResult, int[] surrogateResult,
+ String[] noDictionaryKeys, byte[][] complexTypeKeyArray,
+ Map<Integer, GenericQueryType> comlexDimensionInfoMap, Object[] row, int i) {
+ if (!dictionaryEncodingArray[i]) {
+ if (implictColumnArray[i]) {
+ if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
+ .equals(queryDimensions[i].getDimension().getColName())) {
+ row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
+ scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR
+ + scannedResult.getCurrenrRowId(), DataType.STRING);
+ } else {
+ row[order[i]] = DataTypeUtil
+ .getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING);
+ }
+ } else {
+ row[order[i]] = DataTypeUtil
+ .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
+ queryDimensions[i].getDimension().getDataType());
+ }
+ } else if (directDictionaryEncodingArray[i]) {
+ if (directDictionaryGenerators[i] != null) {
+ row[order[i]] = directDictionaryGenerators[i].getValueFromSurrogate(
+ surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
+ }
+ } else if (complexDataTypeArray[i]) {
+ row[order[i]] =
+ comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
+ .getDataBasedOnDataTypeFromSurrogates(
+ ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+ } else {
+ row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
+ }
+ }
+
+ protected void fillMeasureData(AbstractScannedResult scannedResult, Object[] row) {
+ if (measureInfo.getMeasureDataTypes().length > 0) {
+ Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
+ fillMeasureData(msrValues, 0, scannedResult);
+ for (int i = 0; i < msrValues.length; i++) {
+ row[order[i + queryDimensions.length]] = msrValues[i];
+ }
+ }
+ }
+
+ protected void initDimensionAndMeasureIndexesForFillingData() {
+ List<Integer> dictionaryIndexes = new ArrayList<Integer>();
+ for (int i = 0; i < queryDimensions.length; i++) {
+ if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
+ .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
+ }
+ }
+ int[] primitive =
+ ArrayUtils.toPrimitive(dictionaryIndexes.toArray(new Integer[dictionaryIndexes.size()]));
+ Arrays.sort(primitive);
+ actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
+ int index = 0;
+ for (int i = 0; i < queryDimensions.length; i++) {
+ if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
+ .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ actualIndexInSurrogateKey[index++] =
+ Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
+ }
+ }
+
+ dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
+ directDictionaryEncodingArray = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
+ implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
+ complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
+ order = new int[queryDimensions.length + queryMeasures.length];
+ for (int i = 0; i < queryDimensions.length; i++) {
+ order[i] = queryDimensions[i].getQueryOrder();
+ }
+ for (int i = 0; i < queryMeasures.length; i++) {
+ order[i + queryDimensions.length] = queryMeasures[i].getQueryOrder();
+ }
+ directDictionaryGenerators =
+ new DirectDictionaryGenerator[queryDimensions.length];
+ for (int i = 0; i < queryDimensions.length; i++) {
+ directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 670df1a..82eaac7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -35,22 +35,30 @@ import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
*/
public class DictionaryBasedVectorResultCollector extends AbstractScannedResultCollector {
- private ColumnVectorInfo[] dictionaryInfo;
+ protected QueryDimension[] queryDimensions;
- private ColumnVectorInfo[] noDictionaryInfo;
+ protected QueryMeasure[] queryMeasures;
- private ColumnVectorInfo[] complexInfo;
+ protected ColumnVectorInfo[] dictionaryInfo;
- private ColumnVectorInfo[] measureColumnInfo;
+ protected ColumnVectorInfo[] noDictionaryInfo;
- private ColumnVectorInfo[] allColumnInfo;
+ protected ColumnVectorInfo[] complexInfo;
+
+ protected ColumnVectorInfo[] measureColumnInfo;
+
+ protected ColumnVectorInfo[] allColumnInfo;
public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
- QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
- QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
- measureColumnInfo = new ColumnVectorInfo[queryMeasures.length];
+ queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+ queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length];
+ prepareDimensionAndMeasureColumnVectors();
+ }
+
+ protected void prepareDimensionAndMeasureColumnVectors() {
+ measureColumnInfo = new ColumnVectorInfo[queryMeasures.length];
List<ColumnVectorInfo> dictInfoList = new ArrayList<>();
List<ColumnVectorInfo> noDictInfoList = new ArrayList<>();
List<ColumnVectorInfo> complexList = new ArrayList<>();
@@ -122,26 +130,35 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
if (requiredRows < 1) {
return;
}
- for (int i = 0; i < allColumnInfo.length; i++) {
- allColumnInfo[i].size = requiredRows;
- allColumnInfo[i].offset = rowCounter;
- allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
- allColumnInfo[i].vector = columnarBatch.columnVectors[i];
- }
+ fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+ scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
+ }
+ }
- scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
- scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
- scannedResult.fillColumnarMeasureBatch(measureColumnInfo, measureInfo.getMeasureOrdinals());
- scannedResult.fillColumnarComplexBatch(complexInfo);
- // it means fetched all data out of page so increment the page counter
- if (availableRows == requiredRows) {
- scannedResult.incrementPageCounter();
- } else {
- // Or set the row counter.
- scannedResult.setRowCounter(rowCounter + requiredRows);
- }
- columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
- columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+ protected void scanAndFillResult(AbstractScannedResult scannedResult,
+ CarbonColumnarBatch columnarBatch, int rowCounter, int availableRows, int requiredRows) {
+ scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
+ scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
+ scannedResult.fillColumnarMeasureBatch(measureColumnInfo, measureInfo.getMeasureOrdinals());
+ scannedResult.fillColumnarComplexBatch(complexInfo);
+ // it means fetched all data out of page so increment the page counter
+ if (availableRows == requiredRows) {
+ scannedResult.incrementPageCounter();
+ } else {
+ // Or set the row counter.
+ scannedResult.setRowCounter(rowCounter + requiredRows);
+ }
+ columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
+ columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+ }
+
+ protected void fillColumnVectorDetails(CarbonColumnarBatch columnarBatch, int rowCounter,
+ int requiredRows) {
+ for (int i = 0; i < allColumnInfo.length; i++) {
+ allColumnInfo[i].size = requiredRows;
+ allColumnInfo[i].offset = rowCounter;
+ allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+ allColumnInfo[i].vector = columnarBatch.columnVectors[i];
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 1acd830..bdaffc8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -33,6 +33,16 @@ import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
*/
public class RawBasedResultCollector extends AbstractScannedResultCollector {
+ protected ByteArrayWrapper wrapper;
+
+ protected byte[] dictionaryKeyArray;
+
+ protected byte[][] noDictionaryKeyArray;
+
+ protected byte[][] complexTypeKeyArray;
+
+ protected byte[] implicitColumnByteArray;
+
public RawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
}
@@ -44,28 +54,40 @@ public class RawBasedResultCollector extends AbstractScannedResultCollector {
@Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
List<Object[]> listBasedResult = new ArrayList<>(batchSize);
QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
- ByteArrayWrapper wrapper = null;
BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
scannedResult.getDeleteDeltaDataCache();
// scan the record and add to list
int rowCounter = 0;
while (scannedResult.hasNext() && rowCounter < batchSize) {
- Object[] row = new Object[1 + queryMeasures.length];
- wrapper = new ByteArrayWrapper();
- wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
- wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
- wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
- wrapper.setImplicitColumnByteArray(scannedResult.getBlockletId()
- .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+ scanResultAndGetData(scannedResult);
if (null != deleteDeltaDataCache && deleteDeltaDataCache
.contains(scannedResult.getCurrenrRowId())) {
continue;
}
- row[0] = wrapper;
- fillMeasureData(row, 1, scannedResult);
- listBasedResult.add(row);
+ prepareRow(scannedResult, listBasedResult, queryMeasures);
rowCounter++;
}
return listBasedResult;
}
+
+ protected void prepareRow(AbstractScannedResult scannedResult, List<Object[]> listBasedResult,
+ QueryMeasure[] queryMeasures) {
+ Object[] row = new Object[1 + queryMeasures.length];
+ wrapper = new ByteArrayWrapper();
+ wrapper.setDictionaryKey(dictionaryKeyArray);
+ wrapper.setNoDictionaryKeys(noDictionaryKeyArray);
+ wrapper.setComplexTypesKeys(complexTypeKeyArray);
+ wrapper.setImplicitColumnByteArray(implicitColumnByteArray);
+ row[0] = wrapper;
+ fillMeasureData(row, 1, scannedResult);
+ listBasedResult.add(row);
+ }
+
+ protected void scanResultAndGetData(AbstractScannedResult scannedResult) {
+ dictionaryKeyArray = scannedResult.getDictionaryKeyArray();
+ noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray();
+ complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
+ implicitColumnByteArray = scannedResult.getBlockletId()
+ .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 99bdae4..5ab400a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -16,31 +16,19 @@
*/
package org.apache.carbondata.core.scan.collector.impl;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.result.AbstractScannedResult;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.commons.lang3.ArrayUtils;
/**
- * It is not a collector it is just a scanned result holder.
+ * class for handling restructure scenarios for filling result
*/
-public class RestructureBasedDictionaryResultCollector extends AbstractScannedResultCollector {
+public class RestructureBasedDictionaryResultCollector extends DictionaryBasedResultCollector {
public RestructureBasedDictionaryResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
@@ -51,60 +39,22 @@ public class RestructureBasedDictionaryResultCollector extends AbstractScannedRe
* it will keep track of how many record is processed, to handle limit scenario
*/
@Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
-
- List<Object[]> listBasedResult = new ArrayList<>(batchSize);
- boolean isMsrsPresent = measureInfo.getMeasureDataTypes().length > 0;
-
- QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
- List<Integer> dictionaryIndexes = new ArrayList<Integer>();
- for (int i = 0; i < queryDimensions.length; i++) {
- if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
- .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
- }
- }
- int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray(
- new Integer[dictionaryIndexes.size()]));
- Arrays.sort(primitive);
- int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
- int index = 0;
- for (int i = 0; i < queryDimensions.length; i++) {
- if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
- .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- actualIndexInSurrogateKey[index++] =
- Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
- }
- }
-
- QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
- BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
- scannedResult.getDeleteDeltaDataCache();
- Map<Integer, GenericQueryType> comlexDimensionInfoMap =
- tableBlockExecutionInfos.getComlexDimensionInfoMap();
- boolean[] dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
- boolean[] directDictionaryEncodingArray =
- CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
- boolean[] implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
- boolean[] complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
- int dimSize = queryDimensions.length;
- boolean isDimensionsExist = dimSize > 0;
- int[] order = new int[dimSize + queryMeasures.length];
- for (int i = 0; i < dimSize; i++) {
- order[i] = queryDimensions[i].getQueryOrder();
- }
- for (int i = 0; i < queryMeasures.length; i++) {
- order[i + dimSize] = queryMeasures[i].getQueryOrder();
- }
+ queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
+ queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
+ initDimensionAndMeasureIndexesForFillingData();
// scan the record and add to list
+ List<Object[]> listBasedResult = new ArrayList<>(batchSize);
int rowCounter = 0;
- int dictionaryColumnIndex = 0;
- int noDictionaryColumnIndex = 0;
- int complexTypeColumnIndex = 0;
int[] surrogateResult;
String[] noDictionaryKeys;
byte[][] complexTypeKeyArray;
+ boolean isDimensionsExist = queryDimensions.length > 0;
+ BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
+ scannedResult.getDeleteDeltaDataCache();
+ Map<Integer, GenericQueryType> comlexDimensionInfoMap =
+ tableBlockExecutionInfos.getComlexDimensionInfoMap();
while (scannedResult.hasNext() && rowCounter < batchSize) {
- Object[] row = new Object[dimSize + queryMeasures.length];
+ Object[] row = new Object[queryDimensions.length + queryMeasures.length];
if (isDimensionsExist) {
surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
@@ -112,7 +62,7 @@ public class RestructureBasedDictionaryResultCollector extends AbstractScannedRe
dictionaryColumnIndex = 0;
noDictionaryColumnIndex = 0;
complexTypeColumnIndex = 0;
- for (int i = 0; i < dimSize; i++) {
+ for (int i = 0; i < queryDimensions.length; i++) {
// fill default value in case the dimension does not exist in the current block
if (!dimensionInfo.getDimensionExists()[i]) {
if (dictionaryEncodingArray[i] || directDictionaryEncodingArray[i]) {
@@ -123,40 +73,9 @@ public class RestructureBasedDictionaryResultCollector extends AbstractScannedRe
}
continue;
}
- if (!dictionaryEncodingArray[i]) {
- if (implictColumnArray[i]) {
- if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
- .equals(queryDimensions[i].getDimension().getColName())) {
- row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
- scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR
- + scannedResult.getCurrenrRowId(), DataType.STRING);
- } else {
- row[order[i]] = DataTypeUtil
- .getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING);
- }
- } else {
- row[order[i]] = DataTypeUtil
- .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
- queryDimensions[i].getDimension().getDataType());
- }
- } else if (directDictionaryEncodingArray[i]) {
- DirectDictionaryGenerator directDictionaryGenerator =
- DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
- if (directDictionaryGenerator != null) {
- row[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
- surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
- }
- } else if (complexDataTypeArray[i]) {
- row[order[i]] = comlexDimensionInfoMap
- .get(queryDimensions[i].getDimension().getOrdinal())
- .getDataBasedOnDataTypeFromSurrogates(
- ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
- } else {
- row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
- }
+ fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
+ comlexDimensionInfoMap, row, i);
}
-
} else {
scannedResult.incrementCounter();
}
@@ -164,13 +83,7 @@ public class RestructureBasedDictionaryResultCollector extends AbstractScannedRe
.contains(scannedResult.getCurrenrRowId())) {
continue;
}
- if (isMsrsPresent) {
- Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
- fillMeasureData(msrValues, 0, scannedResult);
- for (int i = 0; i < msrValues.length; i++) {
- row[order[i + dimSize]] = msrValues[i];
- }
- }
+ fillMeasureData(scannedResult, row);
listBasedResult.add(row);
rowCounter++;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 14867d6..77dc3db 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -16,7 +16,6 @@
*/
package org.apache.carbondata.core.scan.collector.impl;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
@@ -43,7 +42,7 @@ import org.apache.spark.unsafe.types.UTF8String;
/**
* It is not a collector it is just a scanned result holder.
*/
-public class RestructureBasedRawResultCollector extends AbstractScannedResultCollector {
+public class RestructureBasedRawResultCollector extends RawBasedResultCollector {
/**
* logger
@@ -85,10 +84,10 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
// will only be for dictionary encoded columns
CarbonDimension currentBlockDimension = segmentProperties.getDimensions()
.get(dictionaryColumnBlockIndex[dimCounterInCurrentBlock]);
- updatedColumnCardinality.add(segmentProperties
- .getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]);
- updatedDimensionPartitioner.add(segmentProperties
- .getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]);
+ updatedColumnCardinality.add(
+ segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]);
+ updatedDimensionPartitioner.add(
+ segmentProperties.getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]);
dimCounterInCurrentBlock++;
} else {
// partitioner index will be 1 every column will be in columnar format
@@ -133,8 +132,8 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
for (int i = 0; i < dictionaryColumnBlockIndex.length; i++) {
// get the dictionary key ordinal as column cardinality in segment properties
// will only be for dictionary encoded columns
- CarbonDimension currentBlockDimension = segmentProperties.getDimensions()
- .get(dictionaryColumnBlockIndex[i]);
+ CarbonDimension currentBlockDimension =
+ segmentProperties.getDimensions().get(dictionaryColumnBlockIndex[i]);
updatedColumnCardinality[i] =
segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()];
updatedDimensionPartitioner[i] =
@@ -160,24 +159,19 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
// scan the record and add to list
int rowCounter = 0;
while (scannedResult.hasNext() && rowCounter < batchSize) {
- byte[] dictionaryKeyArray = scannedResult.getDictionaryKeyArray();
- byte[][] noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray();
- byte[][] complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
- byte[] implicitColumnByteArray = scannedResult.getBlockletId()
- .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ scanResultAndGetData(scannedResult);
if (null != deleteDeltaDataCache && deleteDeltaDataCache
.contains(scannedResult.getCurrenrRowId())) {
continue;
}
- Object[] row = new Object[1 + queryMeasures.length];
- wrapper = new ByteArrayWrapper();
- wrapper.setDictionaryKey(fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray));
- wrapper.setNoDictionaryKeys(fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray));
- wrapper.setComplexTypesKeys(complexTypeKeyArray);
- wrapper.setImplicitColumnByteArray(implicitColumnByteArray);
- row[0] = wrapper;
- fillMeasureData(row, 1, scannedResult);
- listBasedResult.add(row);
+ // re-fill dictionary and no dictionary key arrays for the newly added columns
+ if (dimensionInfo.isDictionaryColumnAdded()) {
+ dictionaryKeyArray = fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray);
+ }
+ if (dimensionInfo.isNoDictionaryColumnAdded()) {
+ noDictionaryKeyArray = fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray);
+ }
+ prepareRow(scannedResult, listBasedResult, queryMeasures);
rowCounter++;
}
return listBasedResult;
@@ -191,17 +185,19 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
*/
private byte[] fillDictionaryKeyArrayWithLatestSchema(byte[] dictionaryKeyArray) {
QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
- List<Long> keyArrayWithNewColumnValues = new ArrayList<>(actualQueryDimensions.length);
long[] keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictionaryKeyArray);
+ long[] keyArrayWithNewAddedColumns =
+ new long[keyArray.length + dimensionInfo.getNewDictionaryColumnCount()];
int existingColumnKeyArrayIndex = 0;
+ int newKeyArrayIndex = 0;
for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
if (CarbonUtil
.hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(), Encoding.DICTIONARY)) {
// if dimension exists then add the key array value else add the default value
if (dimensionInfo.getDimensionExists()[i]) {
- keyArrayWithNewColumnValues.add(keyArray[existingColumnKeyArrayIndex++]);
+ keyArrayWithNewAddedColumns[newKeyArrayIndex++] = keyArray[existingColumnKeyArrayIndex++];
} else {
- Long defaultValueAsLong = null;
+ long defaultValueAsLong;
Object defaultValue = dimensionInfo.getDefaultValues()[i];
if (null != defaultValue) {
defaultValueAsLong = ((Integer) defaultValue).longValue();
@@ -209,18 +205,14 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
defaultValueAsLong =
new Integer(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY).longValue();
}
- keyArrayWithNewColumnValues.add(defaultValueAsLong);
+ keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong;
}
}
}
- if (!keyArrayWithNewColumnValues.isEmpty()) {
- long[] keyArrayWithLatestSchema = ArrayUtils.toPrimitive(
- keyArrayWithNewColumnValues.toArray(new Long[keyArrayWithNewColumnValues.size()]));
- try {
- dictionaryKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithLatestSchema);
- } catch (KeyGenException e) {
- LOGGER.error(e, e.getMessage());
- }
+ try {
+ dictionaryKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithNewAddedColumns);
+ } catch (KeyGenException e) {
+ LOGGER.error(e, e.getMessage());
}
return dictionaryKeyArray;
}
@@ -233,33 +225,29 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
*/
private byte[][] fillNoDictionaryKeyArrayWithLatestSchema(byte[][] noDictionaryKeyArray) {
QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
- List<byte[]> noDictionaryValueList = new ArrayList<>(actualQueryDimensions.length);
+ byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
+ new byte[noDictionaryKeyArray.length + dimensionInfo.getNewNoDictionaryColumnCount()][];
int existingColumnValueIndex = 0;
+ int newKeyArrayIndex = 0;
for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
if (!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) {
// if dimension exists then add the byte array value else add the default value
if (dimensionInfo.getDimensionExists()[i]) {
- noDictionaryValueList.add(noDictionaryKeyArray[existingColumnValueIndex++]);
+ noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+ noDictionaryKeyArray[existingColumnValueIndex++];
} else {
byte[] newColumnDefaultValue = null;
Object defaultValue = dimensionInfo.getDefaultValues()[i];
if (null != defaultValue) {
- newColumnDefaultValue = ((UTF8String)defaultValue).getBytes();
+ newColumnDefaultValue = ((UTF8String) defaultValue).getBytes();
} else {
newColumnDefaultValue =
UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL).getBytes();
}
- noDictionaryValueList.add(newColumnDefaultValue);
+ noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = newColumnDefaultValue;
}
}
}
- // fill the 2-D byte array with all the values of columns in latest schema
- if (!noDictionaryValueList.isEmpty()) {
- noDictionaryKeyArray = new byte[noDictionaryValueList.size()][];
- for (int i = 0; i < noDictionaryKeyArray.length; i++) {
- noDictionaryKeyArray[i] = noDictionaryValueList.get(i);
- }
- }
- return noDictionaryKeyArray;
+ return noDictionaryKeyArrayWithNewlyAddedColumns;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index b1ce040..dd84e6f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -17,114 +17,59 @@
package org.apache.carbondata.core.scan.collector.impl;
import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.result.AbstractScannedResult;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
-import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
import org.apache.spark.sql.types.Decimal;
/**
* It is not a collector it is just a scanned result holder.
*/
-public class RestructureBasedVectorResultCollector extends AbstractScannedResultCollector {
-
- private ColumnVectorInfo[] dictionaryInfo;
-
- private ColumnVectorInfo[] noDictionaryInfo;
-
- private ColumnVectorInfo[] complexInfo;
-
- private ColumnVectorInfo[] measureColumnInfo;
-
- private ColumnVectorInfo[] allColumnInfo;
+public class RestructureBasedVectorResultCollector extends DictionaryBasedVectorResultCollector {
public RestructureBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
- QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
- QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
- measureColumnInfo = new ColumnVectorInfo[queryMeasures.length];
+ queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
+ queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length];
- List<ColumnVectorInfo> dictInfoList = new ArrayList<>();
- List<ColumnVectorInfo> noDictInfoList = new ArrayList<>();
- List<ColumnVectorInfo> complexList = new ArrayList<>();
- int dimensionExistIndex = 0;
+ createVectorForNewlyAddedDimensions();
+ createVectorForNewlyAddedMeasures();
+ prepareDimensionAndMeasureColumnVectors();
+ }
+
+ /**
+ * create column vector for newly added dimension columns
+ */
+ private void createVectorForNewlyAddedDimensions() {
for (int i = 0; i < queryDimensions.length; i++) {
if (!dimensionInfo.getDimensionExists()[i]) {
// add a dummy column vector result collector object
ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
- continue;
- }
- // get the current block dimension and fetch the required information from it
- QueryDimension currentBlockDimension =
- tableBlockExecutionInfos.getQueryDimensions()[dimensionExistIndex++];
- if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) {
- ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
- noDictInfoList.add(columnVectorInfo);
- columnVectorInfo.dimension = currentBlockDimension;
- columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal();
- allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
- } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
- dictInfoList.add(columnVectorInfo);
- columnVectorInfo.dimension = currentBlockDimension;
- columnVectorInfo.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(currentBlockDimension.getDimension().getDataType());
- columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal();
- allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
- } else if (queryDimensions[i].getDimension().isComplex()) {
- ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
- complexList.add(columnVectorInfo);
- columnVectorInfo.dimension = currentBlockDimension;
- columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal();
- columnVectorInfo.genericQueryType =
- tableBlockExecutionInfos.getComlexDimensionInfoMap().get(columnVectorInfo.ordinal);
- allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
- } else {
- ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
- dictInfoList.add(columnVectorInfo);
- columnVectorInfo.dimension = currentBlockDimension;
- columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal();
- allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
}
}
- int measureExistIndex = 0;
+ }
+
+ /**
+ * create column vector for newly added measure columns
+ */
+ private void createVectorForNewlyAddedMeasures() {
for (int i = 0; i < queryMeasures.length; i++) {
if (!measureInfo.getMeasureExists()[i]) {
// add a dummy column vector result collector object
ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo;
- continue;
}
- QueryMeasure currentBlockMeasure =
- tableBlockExecutionInfos.getQueryMeasures()[measureExistIndex++];
- ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
- columnVectorInfo.measureVectorFiller = MeasureDataVectorProcessor.MeasureVectorFillerFactory
- .getMeasureVectorFiller(currentBlockMeasure.getMeasure().getDataType());
- columnVectorInfo.ordinal = currentBlockMeasure.getMeasure().getOrdinal();
- columnVectorInfo.measure = currentBlockMeasure;
- this.measureColumnInfo[i] = columnVectorInfo;
- allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo;
}
- dictionaryInfo = dictInfoList.toArray(new ColumnVectorInfo[dictInfoList.size()]);
- noDictionaryInfo = noDictInfoList.toArray(new ColumnVectorInfo[noDictInfoList.size()]);
- complexInfo = complexList.toArray(new ColumnVectorInfo[complexList.size()]);
- Arrays.sort(dictionaryInfo);
- Arrays.sort(noDictionaryInfo);
- Arrays.sort(complexInfo);
}
@Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
@@ -147,29 +92,12 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
if (requiredRows < 1) {
return;
}
- for (int i = 0; i < allColumnInfo.length; i++) {
- allColumnInfo[i].size = requiredRows;
- allColumnInfo[i].offset = rowCounter;
- allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
- allColumnInfo[i].vector = columnarBatch.columnVectors[i];
- }
-
- scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
- scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
- scannedResult.fillColumnarMeasureBatch(measureColumnInfo, measureInfo.getMeasureOrdinals());
- scannedResult.fillColumnarComplexBatch(complexInfo);
+ fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
// fill default values for non existing dimensions and measures
fillDataForNonExistingDimensions();
fillDataForNonExistingMeasures();
- // it means fetched all data out of page so increment the page counter
- if (availableRows == requiredRows) {
- scannedResult.incrementPageCounter();
- } else {
- // Or set the row counter.
- scannedResult.setRowCounter(rowCounter + requiredRows);
- }
- columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
- columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+ // fill existing dimensions and measures data
+ scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
}
}
@@ -203,17 +131,11 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
*
* @param vector
* @param columnVectorInfo
- *
* @param defaultValue
*/
private void fillDictionaryData(CarbonColumnVector vector, ColumnVectorInfo columnVectorInfo,
Object defaultValue) {
- int offset = columnVectorInfo.offset;
- int vectorOffset = columnVectorInfo.vectorOffset;
- int len = columnVectorInfo.size + offset;
- for (int j = offset; j < len; j++) {
- vector.putInt(vectorOffset++, (int) defaultValue);
- }
+ vector.putInts(columnVectorInfo.vectorOffset, columnVectorInfo.size, (int) defaultValue);
}
/**
@@ -225,15 +147,10 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
*/
private void fillDirectDictionaryData(CarbonColumnVector vector,
ColumnVectorInfo columnVectorInfo, Object defaultValue) {
- int offset = columnVectorInfo.offset;
- int vectorOffset = columnVectorInfo.vectorOffset;
- int len = columnVectorInfo.size + offset;
- for (int j = offset; j < len; j++) {
- if (null != defaultValue) {
- vector.putLong(vectorOffset++, (long) defaultValue);
- } else {
- vector.putNull(vectorOffset++);
- }
+ if (null != defaultValue) {
+ vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size, (long) defaultValue);
+ } else {
+ vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);
}
}
@@ -246,15 +163,10 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
*/
private void fillNoDictionaryData(CarbonColumnVector vector, ColumnVectorInfo columnVectorInfo,
byte[] defaultValue) {
- int offset = columnVectorInfo.offset;
- int vectorOffset = columnVectorInfo.vectorOffset;
- int len = columnVectorInfo.size + offset;
- for (int j = offset; j < len; j++) {
- if (null != defaultValue) {
- vector.putBytes(vectorOffset++, defaultValue);
- } else {
- vector.putNull(vectorOffset++);
- }
+ if (null != defaultValue) {
+ vector.putBytes(columnVectorInfo.vectorOffset, columnVectorInfo.size, defaultValue);
+ } else {
+ vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);
}
}
@@ -267,55 +179,36 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
CarbonMeasure measure = tableBlockExecutionInfos.getActualQueryMeasures()[i].getMeasure();
ColumnVectorInfo columnVectorInfo = allColumnInfo[i];
CarbonColumnVector vector = allColumnInfo[i].vector;
- int offset = columnVectorInfo.offset;
- int len = offset + columnVectorInfo.size;
- int vectorOffset = columnVectorInfo.vectorOffset;
- // convert decimal default value to spark decimal type so that new object is not getting
- // created for every row added
- Object defaultValue = convertDecimalValue(measure, measureInfo.getDefaultValues()[i]);
- for (int j = offset; j < len; j++) {
- if (null == defaultValue) {
- vector.putNull(vectorOffset++);
- } else {
- switch (measureInfo.getMeasureDataTypes()[i]) {
- case SHORT:
- vector.putShort(vectorOffset++, (short) defaultValue);
- break;
- case INT:
- vector.putInt(vectorOffset++, (int) defaultValue);
- break;
- case LONG:
- vector.putLong(vectorOffset++, (long) defaultValue);
- break;
- case DECIMAL:
- vector.putDecimal(vectorOffset, (Decimal) defaultValue, measure.getPrecision());
- break;
- default:
- vector.putDouble(vectorOffset++, (double) defaultValue);
- }
+ Object defaultValue = RestructureUtil
+ .getMeasureDefaultValue(measure.getColumnSchema(), measure.getDefaultValue());
+ if (null == defaultValue) {
+ vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);
+ } else {
+ switch (measureInfo.getMeasureDataTypes()[i]) {
+ case SHORT:
+ vector.putShorts(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+ (short) defaultValue);
+ break;
+ case INT:
+ vector.putInts(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+ (int) defaultValue);
+ break;
+ case LONG:
+ vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+ (long) defaultValue);
+ break;
+ case DECIMAL:
+ Decimal convertToSparkType = Decimal.apply((BigDecimal) defaultValue);
+ vector.putDecimals(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+ convertToSparkType, measure.getPrecision());
+ break;
+ default:
+ vector.putDoubles(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+ (double) defaultValue);
}
}
}
}
}
- /**
- * This method will parse the measure default value based on data type
- *
- * @param measure
- * @return
- */
- private Object convertDecimalValue(CarbonMeasure measure, Object defaultValue) {
- if (null != measure.getDefaultValue()) {
- switch (measure.getDataType()) {
- case DECIMAL:
- defaultValue = org.apache.spark.sql.types.Decimal.apply((BigDecimal) defaultValue);
- return defaultValue;
- default:
- return defaultValue;
- }
- }
- return defaultValue;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 0bd0bf6..2a5c342 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -223,8 +223,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// below is to get only those dimension in query which is present in the
// table block
- List<QueryDimension> updatedQueryDimension = RestructureUtil
- .createDimensionInfoAndGetUpdatedQueryDimension(blockExecutionInfo,
+ List<QueryDimension> currentBlockQueryDimensions = RestructureUtil
+ .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
queryModel.getQueryDimension(), tableBlockDimensions,
segmentProperties.getComplexDimensions());
int tableFactPathLength = CarbonStorePath
@@ -234,13 +234,13 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
blockExecutionInfo.setBlockId(filePath.substring(tableFactPathLength));
blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
- blockExecutionInfo.setQueryDimensions(
- updatedQueryDimension.toArray(new QueryDimension[updatedQueryDimension.size()]));
+ blockExecutionInfo.setQueryDimensions(currentBlockQueryDimensions
+ .toArray(new QueryDimension[currentBlockQueryDimensions.size()]));
// get measures present in the current block
- List<QueryMeasure> updatedQueryMeasures =
- getUpdatedQueryMeasures(blockExecutionInfo, queryModel, blockIndex);
+ List<QueryMeasure> currentBlockQueryMeasures =
+ getCurrentBlockQueryMeasures(blockExecutionInfo, queryModel, blockIndex);
blockExecutionInfo.setQueryMeasures(
- updatedQueryMeasures.toArray(new QueryMeasure[updatedQueryMeasures.size()]));
+ currentBlockQueryMeasures.toArray(new QueryMeasure[currentBlockQueryMeasures.size()]));
blockExecutionInfo.setDataBlock(blockIndex);
blockExecutionInfo.setBlockKeyGenerator(blockKeyGenerator);
// setting whether raw record query or not
@@ -252,7 +252,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
.setTotalNumberOfMeasureBlock(segmentProperties.getMeasuresOrdinalToBlockMapping().size());
blockExecutionInfo.setAbsoluteTableIdentifier(queryModel.getAbsoluteTableIdentifier());
blockExecutionInfo.setComplexDimensionInfoMap(QueryUtil
- .getComplexDimensionsMap(updatedQueryDimension,
+ .getComplexDimensionsMap(currentBlockQueryDimensions,
segmentProperties.getDimensionOrdinalToBlockMapping(),
segmentProperties.getEachComplexDimColumnValueSize(),
queryProperties.columnToDictionayMapping, queryProperties.complexFilterDimension));
@@ -291,12 +291,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// list of dimensions to be projected
Set<Integer> allProjectionListDimensionIdexes = new LinkedHashSet<>();
// create a list of filter dimensions present in the current block
- Set<CarbonDimension> updatedFilterDimensions = QueryUtil
- .getUpdatedFilterDimensions(queryProperties.complexFilterDimension,
- segmentProperties.getDimensions());
- int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension,
+ Set<CarbonDimension> currentBlockFilterDimensions =
+ getCurrentBlockFilterDimensions(queryProperties.complexFilterDimension, segmentProperties);
+ int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(currentBlockQueryDimensions,
segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions,
- updatedFilterDimensions, allProjectionListDimensionIdexes);
+ currentBlockFilterDimensions, allProjectionListDimensionIdexes);
int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE));
@@ -313,13 +312,13 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]);
}
// get the list of updated filter measures present in the current block
- Set<CarbonMeasure> updatedFilterMeasures = QueryUtil
- .getUpdatedFilterMeasures(queryProperties.filterMeasures, segmentProperties.getMeasures());
+ Set<CarbonMeasure> currentBlockFilterMeasures =
+ getCurrentBlockFilterMeasures(queryProperties.filterMeasures, segmentProperties);
// list of measures to be projected
List<Integer> allProjectionListMeasureIndexes = new ArrayList<>();
int[] measureBlockIndexes = QueryUtil
- .getMeasureBlockIndexes(updatedQueryMeasures, expressionMeasures,
- segmentProperties.getMeasuresOrdinalToBlockMapping(), updatedFilterMeasures,
+ .getMeasureBlockIndexes(currentBlockQueryMeasures, expressionMeasures,
+ segmentProperties.getMeasuresOrdinalToBlockMapping(), currentBlockFilterMeasures,
allProjectionListMeasureIndexes);
if (measureBlockIndexes.length > 0) {
@@ -342,16 +341,14 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
blockExecutionInfo.setProjectionListMeasureIndexes(ArrayUtils.toPrimitive(
allProjectionListMeasureIndexes
.toArray(new Integer[allProjectionListMeasureIndexes.size()])));
- // setting the key structure info which will be required
- // to update the older block key with new key generator
- // blockExecutionInfo.setKeyStructureInfo(queryProperties.keyStructureInfo);
// setting the size of fixed key column (dictionary column)
- blockExecutionInfo.setFixedLengthKeySize(getKeySize(updatedQueryDimension, segmentProperties));
+ blockExecutionInfo
+ .setFixedLengthKeySize(getKeySize(currentBlockQueryDimensions, segmentProperties));
Set<Integer> dictionaryColumnBlockIndex = new HashSet<Integer>();
List<Integer> noDictionaryColumnBlockIndex = new ArrayList<Integer>();
// get the block index to be read from file for query dimension
// for both dictionary columns and no dictionary columns
- QueryUtil.fillQueryDimensionsBlockIndexes(updatedQueryDimension,
+ QueryUtil.fillQueryDimensionsBlockIndexes(currentBlockQueryDimensions,
segmentProperties.getDimensionOrdinalToBlockMapping(), dictionaryColumnBlockIndex,
noDictionaryColumnBlockIndex);
int[] queryDictionaryColumnBlockIndexes = ArrayUtils.toPrimitive(
@@ -368,7 +365,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// setting each column value size
blockExecutionInfo.setEachColumnValueSize(segmentProperties.getEachDimColumnValueSize());
blockExecutionInfo.setComplexColumnParentBlockIndexes(
- getComplexDimensionParentBlockIndexes(updatedQueryDimension));
+ getComplexDimensionParentBlockIndexes(currentBlockQueryDimensions));
blockExecutionInfo.setVectorBatchCollector(queryModel.isVectorReader());
try {
// to set column group and its key structure info which will be used
@@ -376,7 +373,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// for getting the column group column data in case of final row
// and in case of dimension aggregation
blockExecutionInfo.setColumnGroupToKeyStructureInfo(
- QueryUtil.getColumnGroupKeyStructureInfo(updatedQueryDimension, segmentProperties));
+ QueryUtil.getColumnGroupKeyStructureInfo(currentBlockQueryDimensions, segmentProperties));
} catch (KeyGenException e) {
throw new QueryExecutionException(e);
}
@@ -398,6 +395,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
*/
private int getKeySize(List<QueryDimension> queryDimension,
SegmentProperties blockMetadataInfo) {
+ // add the dimension block ordinal for each dictionary column
+ // existing in the current block dimensions. Set is used because in case of column groups
+ // ordinal of columns in a column group will be same
Set<Integer> fixedLengthDimensionOrdinal =
new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
int counter = 0;
@@ -416,6 +416,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
}
int[] dictionaryColumnOrdinal = ArrayUtils.toPrimitive(
fixedLengthDimensionOrdinal.toArray(new Integer[fixedLengthDimensionOrdinal.size()]));
+ // calculate the size of existing query dictionary columns in this block
if (dictionaryColumnOrdinal.length > 0) {
int[] eachColumnValueSize = blockMetadataInfo.getEachDimColumnValueSize();
int keySize = 0;
@@ -435,11 +436,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
* @param tableBlock table block
* @return
*/
- private List<QueryMeasure> getUpdatedQueryMeasures(BlockExecutionInfo blockExecutionInfo,
+ private List<QueryMeasure> getCurrentBlockQueryMeasures(BlockExecutionInfo blockExecutionInfo,
QueryModel queryModel, AbstractIndex tableBlock) throws QueryExecutionException {
- // getting the aggregate infos which will be used during aggregation
+ // getting the measure info which will be used while filling up measure data
List<QueryMeasure> updatedQueryMeasures = RestructureUtil
- .createMeasureInfoAndGetUpdatedQueryMeasures(blockExecutionInfo,
+ .createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
queryModel.getQueryMeasures(), tableBlock.getSegmentProperties().getMeasures());
// setting the measure aggregator for all aggregation function selected
// in query
@@ -460,6 +461,54 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
}
/**
+ * This method will create the updated list of filter measures present in the current block
+ *
+ * @param queryFilterMeasures
+ * @param segmentProperties
+ * @return
+ */
+ private Set<CarbonMeasure> getCurrentBlockFilterMeasures(Set<CarbonMeasure> queryFilterMeasures,
+ SegmentProperties segmentProperties) {
+ if (!queryFilterMeasures.isEmpty()) {
+ Set<CarbonMeasure> updatedFilterMeasures = new HashSet<>(queryFilterMeasures.size());
+ for (CarbonMeasure queryMeasure : queryFilterMeasures) {
+ CarbonMeasure measureFromCurrentBlock =
+ segmentProperties.getMeasureFromCurrentBlock(queryMeasure.getColumnId());
+ if (null != measureFromCurrentBlock) {
+ updatedFilterMeasures.add(measureFromCurrentBlock);
+ }
+ }
+ return updatedFilterMeasures;
+ } else {
+ return queryFilterMeasures;
+ }
+ }
+
+ /**
+ * This method will create the updated list of filter dimensions present in the current block
+ *
+ * @param queryFilterDimensions
+ * @param segmentProperties
+ * @return
+ */
+ private Set<CarbonDimension> getCurrentBlockFilterDimensions(
+ Set<CarbonDimension> queryFilterDimensions, SegmentProperties segmentProperties) {
+ if (!queryFilterDimensions.isEmpty()) {
+ Set<CarbonDimension> updatedFilterDimensions = new HashSet<>(queryFilterDimensions.size());
+ for (CarbonDimension queryDimension : queryFilterDimensions) {
+ CarbonDimension dimensionFromCurrentBlock =
+ segmentProperties.getDimensionFromCurrentBlock(queryDimension);
+ if (null != dimensionFromCurrentBlock) {
+ updatedFilterDimensions.add(dimensionFromCurrentBlock);
+ }
+ }
+ return updatedFilterDimensions;
+ } else {
+ return queryFilterDimensions;
+ }
+ }
+
+ /**
* Below method will be used to finish the execution
*
* @throws QueryExecutionException