You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:18 UTC
[33/52] [partial] incubator-carbondata git commit: Renamed packages
to org.apache.carbondata and fixed errors
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfo.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfo.java
new file mode 100644
index 0000000..77bcef5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.util.List;
+
+/**
+ * Model to hold the sortIndex and sortIndexInverted data
+ */
+public class CarbonDictionarySortInfo {
+ /**
+ * Sort index after members are sorted
+ */
+ private List<Integer> sortIndex;
+ /**
+ * inverted sort index to get the member
+ */
+ private List<Integer> sortIndexInverted;
+
+ /**
+ * The constructor to instantiate the CarbonDictionarySortInfo object
+ * with sortIndex and sortInverted Index data
+ *
+ * @param sortIndex
+ * @param sortIndexInverted
+ */
+ public CarbonDictionarySortInfo(List<Integer> sortIndex, List<Integer> sortIndexInverted) {
+ this.sortIndex = sortIndex;
+ this.sortIndexInverted = sortIndexInverted;
+ }
+
+ /**
+ * return list of sortIndex
+ *
+ * @return
+ */
+ public List<Integer> getSortIndex() {
+ return sortIndex;
+ }
+
+ /**
+ * returns list of sortindexinverted
+ *
+ * @return
+ */
+ public List<Integer> getSortIndexInverted() {
+ return sortIndexInverted;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
new file mode 100644
index 0000000..3a7f0f1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryChunksWrapper;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtilException;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * The class prepares the column sort info ie sortIndex
+ * and inverted sort index info
+ */
+public class CarbonDictionarySortInfoPreparator {
+
+ /**
+ * The method returns the column Sort Info
+ *
+ * @param newDistinctValues new distinct value to be added
+ * @param dictionary old distinct values
+ * @param dataType DataType of columns
+ * @return CarbonDictionarySortInfo returns the column Sort Info
+ * @throws CarbonUtilException
+ */
+ public CarbonDictionarySortInfo getDictionarySortInfo(List<String> newDistinctValues,
+ Dictionary dictionary, DataType dataType) throws CarbonUtilException {
+ CarbonDictionarySortModel[] dictionarySortModels =
+ prepareDictionarySortModels(newDistinctValues, dictionary, dataType);
+ return createColumnSortInfo(dictionarySortModels);
+ }
+
+ /**
+ * The method prepares the sort_index and sort_index_inverted data
+ *
+ * @param dictionarySortModels
+ */
+ private CarbonDictionarySortInfo createColumnSortInfo(
+ CarbonDictionarySortModel[] dictionarySortModels) {
+
+ //Sort index after members are sorted
+ int[] sortIndex;
+ //inverted sort index to get the member
+ int[] sortIndexInverted;
+
+ Arrays.sort(dictionarySortModels);
+ sortIndex = new int[dictionarySortModels.length];
+ sortIndexInverted = new int[dictionarySortModels.length];
+
+ for (int i = 0; i < dictionarySortModels.length; i++) {
+ CarbonDictionarySortModel dictionarySortModel = dictionarySortModels[i];
+ sortIndex[i] = dictionarySortModel.getKey();
+ // the array index starts from 0 therefore -1 is done to avoid wastage
+ // of 0th index in array and surrogate key starts from 1 there 1 is added to i
+ // which is a counter starting from 0
+ sortIndexInverted[dictionarySortModel.getKey() - 1] = i + 1;
+ }
+ dictionarySortModels = null;
+ List<Integer> sortIndexList = convertToList(sortIndex);
+ List<Integer> sortIndexInvertedList = convertToList(sortIndexInverted);
+ return new CarbonDictionarySortInfo(sortIndexList, sortIndexInvertedList);
+ }
+
+ /**
+ * The method converts the int[] to List<Integer>
+ *
+ * @param data
+ * @return
+ */
+ private List<Integer> convertToList(int[] data) {
+ Integer[] wrapperType = ArrayUtils.toObject(data);
+ return Arrays.asList(wrapperType);
+ }
+
+ /**
+ * The method returns the array of CarbonDictionarySortModel
+ *
+ * @param distinctValues new distinct values
+ * @param dictionary The wrapper wraps the list<list<bye[]>> and provide the
+ * iterator to retrieve the chunks members.
+ * @param dataType DataType of columns
+ * @return CarbonDictionarySortModel[] CarbonDictionarySortModel[] the model
+ * CarbonDictionarySortModel contains the member's surrogate and
+ * its byte value
+ */
+ private CarbonDictionarySortModel[] prepareDictionarySortModels(List<String> distinctValues,
+ Dictionary dictionary, DataType dataType) {
+ CarbonDictionarySortModel[] dictionarySortModels = null;
+ //The wrapper wraps the list<list<bye[]>> and provide the iterator to
+ // retrieve the chunks members.
+ int surrogate = 1;
+ if (null != dictionary) {
+ DictionaryChunksWrapper dictionaryChunksWrapper = dictionary.getDictionaryChunks();
+ dictionarySortModels =
+ new CarbonDictionarySortModel[dictionaryChunksWrapper.getSize() + distinctValues.size()];
+ while (dictionaryChunksWrapper.hasNext()) {
+ dictionarySortModels[surrogate - 1] =
+ createDictionarySortModel(surrogate, dataType, dictionaryChunksWrapper.next());
+ surrogate++;
+ }
+ } else {
+ dictionarySortModels = new CarbonDictionarySortModel[distinctValues.size()];
+ }
+ // for new distinct values
+ Iterator<String> distinctValue = distinctValues.iterator();
+ while (distinctValue.hasNext()) {
+ dictionarySortModels[surrogate - 1] =
+ createDictionarySortModel(surrogate, dataType, distinctValue.next().getBytes());
+ surrogate++;
+ }
+ return dictionarySortModels;
+ }
+
+ /**
+ *
+ * @param surrogate
+ * @param dataType
+ * @param value member value
+ * @return CarbonDictionarySortModel
+ */
+ private CarbonDictionarySortModel createDictionarySortModel(int surrogate, DataType dataType,
+ byte[] value) {
+ String memberValue = new String(value, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ return new CarbonDictionarySortModel(surrogate, dataType, memberValue);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
new file mode 100644
index 0000000..0d3040a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortModel.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Dictionary sort model class holds the member byte value and corresponding key value.
+ */
+public class CarbonDictionarySortModel implements Comparable<CarbonDictionarySortModel> {
+
+ /**
+ * Surrogate key
+ */
+ private int key;
+
+ /**
+ * member value in bytes
+ */
+ private String memberValue;
+
+ /**
+ * member dataType
+ */
+ private DataType dataType;
+
+ /**
+ * Constructor to init the dictionary sort model
+ *
+ * @param key
+ * @param dataType
+ * @param memberValue
+ */
+ public CarbonDictionarySortModel(int key, DataType dataType, String memberValue) {
+ this.key = key;
+ this.dataType = dataType;
+ this.memberValue = memberValue;
+ }
+
+ /**
+ * Compare
+ */
+ @Override public int compareTo(CarbonDictionarySortModel o) {
+ switch (dataType) {
+ case SHORT:
+ case INT:
+ case LONG:
+ case DOUBLE:
+
+ Double d1 = null;
+ Double d2 = null;
+ try {
+ d1 = new Double(memberValue);
+ } catch (NumberFormatException e) {
+ if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(o.memberValue)) {
+ return -1;
+ }
+ return 1;
+ }
+ try {
+ d2 = new Double(o.memberValue);
+ } catch (NumberFormatException e) {
+ return -1;
+ }
+ return d1.compareTo(d2);
+ case DECIMAL:
+ java.math.BigDecimal val1 = null;
+ java.math.BigDecimal val2 = null;
+ try {
+ val1 = new java.math.BigDecimal(memberValue);
+ } catch (NumberFormatException e) {
+ if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(o.memberValue)) {
+ return -1;
+ }
+ return 1;
+ }
+ try {
+ val2 = new java.math.BigDecimal(o.memberValue);
+ } catch (NumberFormatException e) {
+ return -1;
+ }
+ return val1.compareTo(val2);
+ case TIMESTAMP:
+ SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+ Date date1 = null;
+ Date date2 = null;
+ try {
+ date1 = parser.parse(memberValue);
+ } catch (ParseException e) {
+ if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(o.memberValue)) {
+ return -1;
+ }
+ return 1;
+ }
+ try {
+ date2 = parser.parse(o.memberValue);
+ } catch (ParseException e) {
+ return -1;
+ }
+ return date1.compareTo(date2);
+ case STRING:
+ default:
+ return this.memberValue.compareTo(o.memberValue);
+ }
+ }
+
+ /**
+ * @see Object#hashCode()
+ */
+ @Override public int hashCode() {
+ int result = ((memberValue == null) ? 0 : memberValue.hashCode());
+ return result;
+ }
+
+ /**
+ * @see Object#equals(Object)
+ */
+ @Override public boolean equals(Object obj) {
+ if (obj instanceof CarbonDictionarySortModel) {
+ if (this == obj) {
+ return true;
+ }
+ CarbonDictionarySortModel other = (CarbonDictionarySortModel) obj;
+ if (memberValue == null) {
+ if (other.memberValue != null) {
+ return false;
+ }
+ } else if (!this.memberValue.equals(other.memberValue)) {
+ return false;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * return the surrogate of the member
+ *
+ * @return
+ */
+ public int getKey() {
+ return key;
+ }
+
+ /**
+ * Returns member buye
+ *
+ * @return
+ */
+ public String getMemberValue() {
+ return memberValue;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
new file mode 100644
index 0000000..dce6ae5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector;
+
+import java.util.List;
+
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+
+/**
+ * Interface which will be used to aggregate the scan result
+ */
+public interface ScannedResultCollector {
+
+ /**
+ * Below method will be used to aggregate the scanned result
+ *
+ * @param scannedResult scanned result
+ * @return how many records was aggregated
+ */
+ List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
new file mode 100644
index 0000000..387b54f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.executor.util.QueryUtil;
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public abstract class AbstractScannedResultCollector implements ScannedResultCollector {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractScannedResultCollector.class.getName());
+
+ /**
+ * restructuring info
+ */
+ private KeyStructureInfo restructureInfos;
+
+ /**
+ * table block execution infos
+ */
+ protected BlockExecutionInfo tableBlockExecutionInfos;
+
+ /**
+ * Measure ordinals
+ */
+ protected int[] measuresOrdinal;
+
+ /**
+ * to check whether measure exists in current table block or not this to
+ * handle restructuring scenario
+ */
+ protected boolean[] isMeasureExistsInCurrentBlock;
+
+ /**
+ * default value of the measures in case of restructuring some measure wont
+ * be present in the table so in that default value will be used to
+ * aggregate the data for that measure columns
+ */
+ private Object[] measureDefaultValue;
+
+ /**
+ * measure datatypes.
+ */
+ protected DataType[] measureDatatypes;
+
+ public AbstractScannedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+ this.tableBlockExecutionInfos = blockExecutionInfos;
+ restructureInfos = blockExecutionInfos.getKeyStructureInfo();
+ measuresOrdinal = tableBlockExecutionInfos.getAggregatorInfo().getMeasureOrdinals();
+ isMeasureExistsInCurrentBlock = tableBlockExecutionInfos.getAggregatorInfo().getMeasureExists();
+ measureDefaultValue = tableBlockExecutionInfos.getAggregatorInfo().getDefaultValues();
+ this.measureDatatypes = tableBlockExecutionInfos.getAggregatorInfo().getMeasureDataTypes();
+ }
+
+ protected void fillMeasureData(Object[] msrValues, int offset,
+ AbstractScannedResult scannedResult) {
+ for (short i = 0; i < measuresOrdinal.length; i++) {
+ // if measure exists is block then pass measure column
+ // data chunk to the collector
+ if (isMeasureExistsInCurrentBlock[i]) {
+ msrValues[i + offset] = getMeasureData(scannedResult.getMeasureChunk(measuresOrdinal[i]),
+ scannedResult.getCurrenrRowId(), measureDatatypes[i]);
+ } else {
+ // if not then get the default value and use that value in aggregation
+ msrValues[i + offset] = measureDefaultValue[i];
+ }
+ }
+ }
+
+ private Object getMeasureData(MeasureColumnDataChunk dataChunk, int index, DataType dataType) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ Object msrVal;
+ switch (dataType) {
+ case INT:
+ case LONG:
+ msrVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+ break;
+ case DECIMAL:
+ msrVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+ break;
+ default:
+ msrVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+ }
+ return DataTypeUtil.getMeasureDataBasedOnDataType(msrVal, dataType);
+ }
+ return null;
+ }
+
+ /**
+ * Below method will used to get the result
+ */
+ protected void updateData(List<Object[]> listBasedResult) {
+ if (tableBlockExecutionInfos.isFixedKeyUpdateRequired()) {
+ updateKeyWithLatestBlockKeygenerator(listBasedResult);
+ }
+ }
+
+ /**
+ * Below method will be used to update the fixed length key with the
+ * latest block key generator
+ *
+ * @return updated block
+ */
+ private void updateKeyWithLatestBlockKeygenerator(List<Object[]> listBasedResult) {
+ try {
+ long[] data = null;
+ ByteArrayWrapper key = null;
+ for (int i = 0; i < listBasedResult.size(); i++) {
+ // get the key
+ key = (ByteArrayWrapper)listBasedResult.get(i)[0];
+ // unpack the key with table block key generator
+ data = tableBlockExecutionInfos.getBlockKeyGenerator()
+ .getKeyArray(key.getDictionaryKey(), tableBlockExecutionInfos.getMaskedByteForBlock());
+ // packed the key with latest block key generator
+ // and generate the masked key for that key
+ key.setDictionaryKey(QueryUtil
+ .getMaskedKey(restructureInfos.getKeyGenerator().generateKey(data),
+ restructureInfos.getMaxKey(), restructureInfos.getMaskByteRanges(),
+ restructureInfos.getMaskByteRanges().length));
+ }
+ } catch (KeyGenException e) {
+ LOGGER.error(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
new file mode 100644
index 0000000..108677f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public class DictionaryBasedResultCollector extends AbstractScannedResultCollector {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DictionaryBasedResultCollector.class.getName());
+
+ public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+ super(blockExecutionInfos);
+ }
+
+ /**
+ * This method will add a record both key and value to list object
+ * it will keep track of how many record is processed, to handle limit scenario
+ */
+ @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
+ List<Object[]> listBasedResult = new ArrayList<>(batchSize);
+ boolean isMsrsPresent = measureDatatypes.length > 0;
+ QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+ QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+ Map<Integer, GenericQueryType> comlexDimensionInfoMap =
+ tableBlockExecutionInfos.getComlexDimensionInfoMap();
+ boolean[] dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
+ boolean[] directDictionaryEncodingArray =
+ CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
+ boolean[] complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
+ int dimSize = queryDimensions.length;
+ boolean isDimensionsExist = dimSize > 0;
+ int[] order = new int[dimSize + queryMeasures.length];
+ for (int i = 0; i < dimSize; i++) {
+ order[i] = queryDimensions[i].getQueryOrder();
+ }
+ for (int i = 0; i < queryMeasures.length; i++) {
+ order[i + dimSize] = queryMeasures[i].getQueryOrder();
+ }
+ // scan the record and add to list
+ int rowCounter = 0;
+ int dictionaryColumnIndex = 0;
+ int noDictionaryColumnIndex = 0;
+ int complexTypeColumnIndex = 0;
+ int[] surrogateResult;
+ String[] noDictionaryKeys;
+ byte[][] complexTypeKeyArray;
+ while (scannedResult.hasNext() && rowCounter < batchSize) {
+ Object[] row = new Object[dimSize + queryMeasures.length];
+ if (isDimensionsExist) {
+ surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
+ noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
+ complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
+ dictionaryColumnIndex = 0;
+ noDictionaryColumnIndex = 0;
+ complexTypeColumnIndex = 0;
+ for (int i = 0; i < dimSize; i++) {
+ if (!dictionaryEncodingArray[i]) {
+ row[order[i]] = DataTypeUtil
+ .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
+ queryDimensions[i].getDimension().getDataType());
+ } else if (directDictionaryEncodingArray[i]) {
+ DirectDictionaryGenerator directDictionaryGenerator =
+ DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
+ if (directDictionaryGenerator != null) {
+ row[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
+ surrogateResult[dictionaryColumnIndex++]);
+ }
+ } else if (complexDataTypeArray[i]) {
+ row[order[i]] = comlexDimensionInfoMap
+ .get(queryDimensions[i].getDimension().getOrdinal())
+ .getDataBasedOnDataTypeFromSurrogates(
+ ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+ } else {
+ row[order[i]] = surrogateResult[dictionaryColumnIndex++];
+ }
+ }
+
+ } else {
+ scannedResult.incrementCounter();
+ }
+ if (isMsrsPresent) {
+ Object[] msrValues = new Object[measureDatatypes.length];
+ fillMeasureData(msrValues, 0, scannedResult);
+ for (int i = 0; i < msrValues.length; i++) {
+ row[order[i + dimSize]] = msrValues[i];
+ }
+ }
+ listBasedResult.add(row);
+ rowCounter++;
+ }
+ return listBasedResult;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/RawBasedResultCollector.java
new file mode 100644
index 0000000..74d4170
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/RawBasedResultCollector.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public class RawBasedResultCollector extends AbstractScannedResultCollector {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RawBasedResultCollector.class.getName());
+
+ public RawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+ super(blockExecutionInfos);
+ }
+
+ /**
+ * This method will add a record both key and value to list object
+ * it will keep track of how many record is processed, to handle limit scenario
+ */
+ @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
+ List<Object[]> listBasedResult = new ArrayList<>(batchSize);
+ QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+ ByteArrayWrapper wrapper = null;
+ // scan the record and add to list
+ int rowCounter = 0;
+ while (scannedResult.hasNext() && rowCounter < batchSize) {
+ Object[] row = new Object[1 + queryMeasures.length];
+ wrapper = new ByteArrayWrapper();
+ wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
+ wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
+ wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
+ row[0] = wrapper;
+ fillMeasureData(row, 1, scannedResult);
+ listBasedResult.add(row);
+ rowCounter++;
+ }
+ updateData(listBasedResult);
+ return listBasedResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/ArrayQueryType.java
new file mode 100644
index 0000000..8dd6749
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/ArrayQueryType.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.catalyst.util.*;
+import org.apache.spark.sql.types.*;
+
+public class ArrayQueryType extends ComplexQueryType implements GenericQueryType {
+
+ private GenericQueryType children;
+ private int keyOrdinalForQuery;
+
+ public ArrayQueryType(String name, String parentname, int blockIndex) {
+ super(name, parentname, blockIndex);
+ }
+
+ @Override public void addChildren(GenericQueryType children) {
+ if (this.getName().equals(children.getParentname())) {
+ this.children = children;
+ } else {
+ this.children.addChildren(children);
+ }
+ }
+
+ @Override public String getName() {
+ return name;
+ }
+
+ @Override public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override public String getParentname() {
+ return parentname;
+ }
+
+ @Override public void setParentname(String parentname) {
+ this.parentname = parentname;
+
+ }
+
+ @Override public void getAllPrimitiveChildren(List<GenericQueryType> primitiveChild) {
+ if (children instanceof PrimitiveQueryType) {
+ primitiveChild.add(children);
+ } else {
+ children.getAllPrimitiveChildren(primitiveChild);
+ }
+ }
+
+ public void parseBlocksAndReturnComplexColumnByteArray(
+ DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber,
+ DataOutputStream dataOutputStream) throws IOException {
+ byte[] input = new byte[8];
+ copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input);
+ ByteBuffer byteArray = ByteBuffer.wrap(input);
+ int dataLength = byteArray.getInt();
+ dataOutputStream.writeInt(dataLength);
+ if (dataLength == 0) {
+ // b.putInt(0);
+ } else {
+ int columnIndex = byteArray.getInt();
+ for (int i = 0; i < dataLength; i++) {
+ children
+ .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, columnIndex++,
+ dataOutputStream);
+ }
+ }
+ }
+
+ @Override public int getSurrogateIndex() {
+ return 0;
+ }
+
+ @Override public void setSurrogateIndex(int surrIndex) {
+
+ }
+
+ @Override public int getBlockIndex() {
+ return blockIndex;
+ }
+
+ @Override public void setBlockIndex(int blockIndex) {
+ this.blockIndex = blockIndex;
+ }
+
+ @Override public int getColsCount() {
+ return children.getColsCount() + 1;
+ }
+
+ @Override public void parseAndGetResultBytes(ByteBuffer complexData, DataOutputStream dataOutput)
+ throws IOException {
+ int dataLength = complexData.getInt();
+ dataOutput.writeInt(dataLength);
+ for (int i = 0; i < dataLength; i++) {
+ children.parseAndGetResultBytes(complexData, dataOutput);
+ }
+ }
+
+ @Override public void setKeySize(int[] keyBlockSize) {
+ children.setKeySize(keyBlockSize);
+ }
+
+ @Override public DataType getSchemaType() {
+ return new ArrayType(null, true);
+ }
+
+ @Override public int getKeyOrdinalForQuery() {
+ return keyOrdinalForQuery;
+ }
+
+ @Override public void setKeyOrdinalForQuery(int keyOrdinalForQuery) {
+ this.keyOrdinalForQuery = keyOrdinalForQuery;
+ }
+
+ @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) {
+ readBlockDataChunk(blockChunkHolder);
+ children.fillRequiredBlockData(blockChunkHolder);
+ }
+
+ @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
+ int dataLength = surrogateData.getInt();
+ if (dataLength == -1) {
+ return null;
+ }
+ Object[] data = new Object[dataLength];
+ for (int i = 0; i < dataLength; i++) {
+ data[i] = children.getDataBasedOnDataTypeFromSurrogates(surrogateData);
+ }
+ return new GenericArrayData(data);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
new file mode 100644
index 0000000..0a4c999
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/ComplexQueryType.java
@@ -0,0 +1,80 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.complextypes;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class ComplexQueryType {
+ protected GenericQueryType children;
+
+ protected String name;
+
+ protected String parentname;
+
+ protected int blockIndex;
+
+ public ComplexQueryType(String name, String parentname, int blockIndex) {
+ this.name = name;
+ this.parentname = parentname;
+ this.blockIndex = blockIndex;
+ }
+
+ public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) {
+ if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+ blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+ .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+ }
+ children.fillRequiredBlockData(blockChunkHolder);
+ }
+
+ /**
+ * Method will copy the block chunk holder data to the passed
+ * byte[], this method is also used by child
+ *
+ * @param rowNumber
+ * @param input
+ */
+ protected void copyBlockDataChunk(DimensionColumnDataChunk[] dimensionColumnDataChunks,
+ int rowNumber, byte[] input) {
+ byte[] data = (byte[]) dimensionColumnDataChunks[blockIndex].getCompleteDataChunk();
+ if (null != dimensionColumnDataChunks[blockIndex].getAttributes().getInvertedIndexes()) {
+ System.arraycopy(data, dimensionColumnDataChunks[blockIndex].getAttributes()
+ .getInvertedIndexesReverse()[rowNumber] * dimensionColumnDataChunks[blockIndex]
+ .getAttributes().getColumnValueSize(), input, 0,
+ dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize());
+ } else {
+ System.arraycopy(data,
+ rowNumber * dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize(),
+ input, 0, dimensionColumnDataChunks[blockIndex].getAttributes().getColumnValueSize());
+ }
+ }
+
+ /*
+ * This method will read the block data chunk from the respective block
+ */
+ protected void readBlockDataChunk(BlocksChunkHolder blockChunkHolder) {
+ if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+ blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+ .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
new file mode 100644
index 0000000..11f4651
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/PrimitiveQueryType.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.keygenerator.mdkey.Bits;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.types.*;
+
+public class PrimitiveQueryType extends ComplexQueryType implements GenericQueryType {
+
+ private int index;
+
+ private String name;
+ private String parentname;
+
+ private int keySize;
+
+ private int blockIndex;
+
+ private Dictionary dictionary;
+
+ private org.apache.carbondata.core.carbon.metadata.datatype.DataType dataType;
+
+ private boolean isDirectDictionary;
+
+ public PrimitiveQueryType(String name, String parentname, int blockIndex,
+ org.apache.carbondata.core.carbon.metadata.datatype.DataType dataType, int keySize,
+ Dictionary dictionary, boolean isDirectDictionary) {
+ super(name, parentname, blockIndex);
+ this.dataType = dataType;
+ this.keySize = keySize;
+ this.dictionary = dictionary;
+ this.name = name;
+ this.parentname = parentname;
+ this.blockIndex = blockIndex;
+ this.isDirectDictionary = isDirectDictionary;
+ }
+
+ @Override public void addChildren(GenericQueryType children) {
+
+ }
+
+ @Override public String getName() {
+ return name;
+ }
+
+ @Override public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override public String getParentname() {
+ return parentname;
+ }
+
+ @Override public void setParentname(String parentname) {
+ this.parentname = parentname;
+
+ }
+
+ @Override public void getAllPrimitiveChildren(List<GenericQueryType> primitiveChild) {
+
+ }
+
+ @Override public int getSurrogateIndex() {
+ return index;
+ }
+
+ @Override public void setSurrogateIndex(int surrIndex) {
+ index = surrIndex;
+ }
+
+ @Override public int getBlockIndex() {
+ return blockIndex;
+ }
+
+ @Override public void setBlockIndex(int blockIndex) {
+ this.blockIndex = blockIndex;
+ }
+
+ @Override public int getColsCount() {
+ return 1;
+ }
+
+ @Override public void parseBlocksAndReturnComplexColumnByteArray(
+ DimensionColumnDataChunk[] dimensionDataChunks, int rowNumber,
+ DataOutputStream dataOutputStream) throws IOException {
+ byte[] currentVal =
+ new byte[dimensionDataChunks[blockIndex].getAttributes().getColumnValueSize()];
+ copyBlockDataChunk(dimensionDataChunks, rowNumber, currentVal);
+ dataOutputStream.write(currentVal);
+ }
+
+ @Override public void setKeySize(int[] keyBlockSize) {
+ this.keySize = keyBlockSize[this.blockIndex];
+ }
+
+ @Override public void parseAndGetResultBytes(ByteBuffer complexData, DataOutputStream dataOutput)
+ throws IOException {
+ }
+
+ @Override public DataType getSchemaType() {
+ switch (dataType) {
+ case INT:
+ return IntegerType$.MODULE$;
+ case DOUBLE:
+ return DoubleType$.MODULE$;
+ case LONG:
+ return LongType$.MODULE$;
+ case BOOLEAN:
+ return BooleanType$.MODULE$;
+ case TIMESTAMP:
+ return TimestampType$.MODULE$;
+ default:
+ return IntegerType$.MODULE$;
+ }
+ }
+
+ @Override public int getKeyOrdinalForQuery() {
+ return 0;
+ }
+
+ @Override public void setKeyOrdinalForQuery(int keyOrdinalForQuery) {
+ }
+
+ @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) {
+ readBlockDataChunk(blockChunkHolder);
+ }
+
+ @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
+
+ byte[] data = new byte[keySize];
+ surrogateData.get(data);
+ Bits bit = new Bits(new int[]{keySize * 8});
+ int surrgateValue = (int)bit.getKeyArray(data, 0)[0];
+ Object actualData = null;
+ if (isDirectDictionary) {
+ DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dataType);
+ actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
+ } else {
+ String dictionaryValueForKey = dictionary.getDictionaryValueForKey(surrgateValue);
+ actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, this.dataType);
+ }
+ return actualData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/scan/complextypes/StructQueryType.java
new file mode 100644
index 0000000..a8b188b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/complextypes/StructQueryType.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRowWithSchema;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class StructQueryType extends ComplexQueryType implements GenericQueryType {
+
+ private List<GenericQueryType> children = new ArrayList<GenericQueryType>();
+ private String name;
+ private String parentname;
+ private int blockIndex;
+ private int keyOrdinalForQuery;
+
+ public StructQueryType(String name, String parentname, int blockIndex) {
+ super(name, parentname, blockIndex);
+ this.name = name;
+ this.parentname = parentname;
+ this.blockIndex = blockIndex;
+ }
+
+ @Override public void addChildren(GenericQueryType newChild) {
+ if (this.getName().equals(newChild.getParentname())) {
+ this.children.add(newChild);
+ } else {
+ for (GenericQueryType child : this.children) {
+ child.addChildren(newChild);
+ }
+ }
+
+ }
+
+ @Override public String getName() {
+ return name;
+ }
+
+ @Override public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override public String getParentname() {
+ return parentname;
+ }
+
+ @Override public void setParentname(String parentname) {
+ this.parentname = parentname;
+
+ }
+
+ @Override public void getAllPrimitiveChildren(List<GenericQueryType> primitiveChild) {
+ for (int i = 0; i < children.size(); i++) {
+ GenericQueryType child = children.get(i);
+ if (child instanceof PrimitiveQueryType) {
+ primitiveChild.add(child);
+ } else {
+ child.getAllPrimitiveChildren(primitiveChild);
+ }
+ }
+ }
+
+ @Override public int getSurrogateIndex() {
+ return 0;
+ }
+
+ @Override public void setSurrogateIndex(int surrIndex) {
+
+ }
+
+ @Override public int getBlockIndex() {
+ return blockIndex;
+ }
+
+ @Override public void setBlockIndex(int blockIndex) {
+ this.blockIndex = blockIndex;
+ }
+
+ @Override public int getColsCount() {
+ int colsCount = 1;
+ for (int i = 0; i < children.size(); i++) {
+ colsCount += children.get(i).getColsCount();
+ }
+ return colsCount;
+ }
+
+ @Override public void parseBlocksAndReturnComplexColumnByteArray(
+ DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber,
+ DataOutputStream dataOutputStream) throws IOException {
+ byte[] input = new byte[8];
+ copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input);
+ ByteBuffer byteArray = ByteBuffer.wrap(input);
+ int childElement = byteArray.getInt();
+ dataOutputStream.writeInt(childElement);
+ if (childElement == 0) {
+ // b.putInt(0);
+ } else {
+ for (int i = 0; i < childElement; i++) {
+ children.get(i)
+ .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, rowNumber,
+ dataOutputStream);
+ }
+ }
+ }
+
+ @Override public void parseAndGetResultBytes(ByteBuffer complexData, DataOutputStream dataOutput)
+ throws IOException {
+ int childElement = complexData.getInt();
+ dataOutput.writeInt(childElement);
+ for (int i = 0; i < childElement; i++) {
+ children.get(i).parseAndGetResultBytes(complexData, dataOutput);
+ }
+ }
+
+ @Override public void setKeySize(int[] keyBlockSize) {
+ for (int i = 0; i < children.size(); i++) {
+ children.get(i).setKeySize(keyBlockSize);
+ }
+ }
+
+ @Override public DataType getSchemaType() {
+ StructField[] fields = new StructField[children.size()];
+ for (int i = 0; i < children.size(); i++) {
+ fields[i] = new StructField(children.get(i).getName(), null, true,
+ Metadata.empty());
+ }
+ return new StructType(fields);
+ }
+
+ @Override public int getKeyOrdinalForQuery() {
+ return keyOrdinalForQuery;
+ }
+
+ @Override public void setKeyOrdinalForQuery(int keyOrdinalForQuery) {
+ this.keyOrdinalForQuery = keyOrdinalForQuery;
+ }
+
+ @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) {
+ readBlockDataChunk(blockChunkHolder);
+
+ for (int i = 0; i < children.size(); i++) {
+ children.get(i).fillRequiredBlockData(blockChunkHolder);
+ }
+ }
+
+ @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
+ int childLength = surrogateData.getInt();
+ Object[] fields = new Object[childLength];
+ for (int i = 0; i < childLength; i++) {
+ fields[i] = children.get(i).getDataBasedOnDataTypeFromSurrogates(surrogateData);
+ }
+
+ return new GenericInternalRowWithSchema(fields, (StructType) getSchemaType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
new file mode 100644
index 0000000..53bf8ca
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.model.QueryModel;
+
+/**
+ * Interface for carbon query executor.
+ * Will be used to execute the query based on the query model
+ * and will return the iterator over query result
+ */
+public interface QueryExecutor<E> {
+
+ /**
+ * Below method will be used to execute the query based on query model passed from driver
+ *
+ * @param queryModel query details
+ * @return query result iterator
+ * @throws QueryExecutionException if any failure while executing the query
+ */
+ CarbonIterator<E> execute(QueryModel queryModel) throws QueryExecutionException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
new file mode 100644
index 0000000..ab75231
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor;
+
+import org.apache.carbondata.scan.executor.impl.DetailQueryExecutor;
+import org.apache.carbondata.scan.model.QueryModel;
+
+/**
+ * Factory class to get the query executor from RDD
+ * This will return the executor based on query type
+ */
+public class QueryExecutorFactory {
+
+ public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+ return new DetailQueryExecutor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/exception/QueryExecutionException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/exception/QueryExecutionException.java b/core/src/main/java/org/apache/carbondata/scan/executor/exception/QueryExecutionException.java
new file mode 100644
index 0000000..7003184
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/exception/QueryExecutionException.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor.exception;
+
+import java.util.Locale;
+
+/**
+ * Exception class for query execution
+ *
+ * @author Administrator
+ */
+public class QueryExecutionException extends Exception {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param errorCode The error code for this exception.
+ * @param msg The error message for this exception.
+ */
+ public QueryExecutionException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param errorCode The error code for this exception.
+ * @param msg The error message for this exception.
+ */
+ public QueryExecutionException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param t
+ */
+ public QueryExecutionException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * This method is used to get the localized message.
+ *
+ * @param locale - A Locale object represents a specific geographical,
+ * political, or cultural region.
+ * @return - Localized error message.
+ */
+ public String getLocalizedMessage(Locale locale) {
+ return "";
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
new file mode 100644
index 0000000..e204572
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.common.logging.impl.StandardLogService;
+import org.apache.carbondata.core.carbon.datastore.BlockIndexStore;
+import org.apache.carbondata.core.carbon.datastore.IndexKey;
+import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.executor.QueryExecutor;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.executor.infos.AggregatorInfo;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.executor.util.QueryUtil;
+import org.apache.carbondata.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.scan.filter.FilterUtil;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.model.QueryModel;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+/**
+ * This class provides a skeletal implementation of the {@link QueryExecutor}
+ * interface to minimize the effort required to implement this interface. This
+ * will be used to prepare all the properties required for query execution
+ */
+public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractQueryExecutor.class.getName());
+ /**
+ * holder for query properties which will be used to execute the query
+ */
+ protected QueryExecutorProperties queryProperties;
+
+ public AbstractQueryExecutor() {
+ queryProperties = new QueryExecutorProperties();
+ }
+
+ /**
+ * Below method will be used to fill the executor properties based on query
+ * model it will parse the query model and get the detail and fill it in
+ * query properties
+ *
+ * @param queryModel
+ */
+ protected void initQuery(QueryModel queryModel) throws QueryExecutionException {
+ StandardLogService.setThreadName(StandardLogService.getPartitionID(
+ queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableName()),
+ queryModel.getQueryId());
+ LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
+ .getCarbonTableIdentifier().getTableName());
+ // Initializing statistics list to record the query statistics
+ // creating copy on write to handle concurrent scenario
+ queryProperties.queryStatisticsRecorder = new QueryStatisticsRecorder(queryModel.getQueryId());
+ queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
+ QueryUtil.resolveQueryModel(queryModel);
+ QueryStatistic queryStatistic = new QueryStatistic();
+ // get the table blocks
+ try {
+ queryProperties.dataBlocks = BlockIndexStore.getInstance()
+ .loadAndGetBlocks(queryModel.getTableBlockInfos(),
+ queryModel.getAbsoluteTableIdentifier());
+ } catch (IndexBuilderException e) {
+ throw new QueryExecutionException(e);
+ }
+ queryStatistic
+ .addStatistics("Time taken to load the Block(s) In Executor", System.currentTimeMillis());
+ queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+ //
+ // // updating the restructuring infos for the query
+ queryProperties.keyStructureInfo = getKeyStructureInfo(queryModel,
+ queryProperties.dataBlocks.get(queryProperties.dataBlocks.size() - 1).getSegmentProperties()
+ .getDimensionKeyGenerator());
+
+ // calculating the total number of aggeragted columns
+ int aggTypeCount = queryModel.getQueryMeasures().size();
+
+ int currentIndex = 0;
+ String[] aggTypes = new String[aggTypeCount];
+ DataType[] dataTypes = new DataType[aggTypeCount];
+
+ for (QueryMeasure carbonMeasure : queryModel.getQueryMeasures()) {
+ // adding the data type and aggregation type of all the measure this
+ // can be used
+ // to select the aggregator
+ aggTypes[currentIndex] = carbonMeasure.getAggregateFunction();
+ dataTypes[currentIndex] = carbonMeasure.getMeasure().getDataType();
+ currentIndex++;
+ }
+ queryProperties.measureDataTypes = dataTypes;
+ // as aggregation will be executed in following order
+ // 1.aggregate dimension expression
+ // 2. expression
+ // 3. query measure
+ // so calculating the index of the expression start index
+ // and measure column start index
+ queryProperties.aggExpressionStartIndex = queryModel.getQueryMeasures().size();
+ queryProperties.measureStartIndex = aggTypes.length - queryModel.getQueryMeasures().size();
+
+ queryProperties.complexFilterDimension =
+ QueryUtil.getAllFilterDimensions(queryModel.getFilterExpressionResolverTree());
+ queryStatistic = new QueryStatistic();
+ // dictionary column unique column id to dictionary mapping
+ // which will be used to get column actual data
+ queryProperties.columnToDictionayMapping = QueryUtil
+ .getDimensionDictionaryDetail(queryModel.getQueryDimension(),
+ queryProperties.complexFilterDimension, queryModel.getAbsoluteTableIdentifier());
+ queryStatistic
+ .addStatistics("Time taken to load the Dictionary In Executor", System.currentTimeMillis());
+ queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+ queryModel.setColumnToDictionaryMapping(queryProperties.columnToDictionayMapping);
+ // setting the sort dimension index. as it will be updated while getting the sort info
+ // so currently setting it to default 0 means sort is not present in any dimension
+ queryProperties.sortDimIndexes = new byte[queryModel.getQueryDimension().size()];
+ }
+
+ /**
+ * Below method will be used to get the key structure info for the uqery
+ *
+ * @param queryModel query model
+ * @param keyGenerator
+ * @return key structure info
+ */
+ private KeyStructureInfo getKeyStructureInfo(QueryModel queryModel, KeyGenerator keyGenerator) {
+ // getting the masked byte range for dictionary column
+ int[] maskByteRanges =
+ QueryUtil.getMaskedByteRange(queryModel.getQueryDimension(), keyGenerator);
+
+ // getting the masked bytes for query dimension dictionary column
+ int[] maskedBytes = QueryUtil.getMaskedByte(keyGenerator.getKeySizeInBytes(), maskByteRanges);
+
+ // max key for the dictionary dimension present in the query
+ byte[] maxKey = null;
+ try {
+ // getting the max key which will be used to masked and get the
+ // masked key
+ maxKey = QueryUtil.getMaxKeyBasedOnDimensions(queryModel.getQueryDimension(), keyGenerator);
+ } catch (KeyGenException e) {
+ LOGGER.error(e, "problem while getting the max key");
+ }
+
+ KeyStructureInfo restructureInfos = new KeyStructureInfo();
+ restructureInfos.setKeyGenerator(keyGenerator);
+ restructureInfos.setMaskByteRanges(maskByteRanges);
+ restructureInfos.setMaskedBytes(maskedBytes);
+ restructureInfos.setMaxKey(maxKey);
+ return restructureInfos;
+ }
+
+ protected List<BlockExecutionInfo> getBlockExecutionInfos(QueryModel queryModel)
+ throws QueryExecutionException {
+ initQuery(queryModel);
+ List<BlockExecutionInfo> blockExecutionInfoList = new ArrayList<BlockExecutionInfo>();
+ // fill all the block execution infos for all the blocks selected in
+ // query
+ // and query will be executed based on that infos
+ for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
+ blockExecutionInfoList
+ .add(getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i)));
+ }
+ queryProperties.complexDimensionInfoMap =
+ blockExecutionInfoList.get(blockExecutionInfoList.size() - 1).getComlexDimensionInfoMap();
+ return blockExecutionInfoList;
+ }
+
+ /**
+ * Below method will be used to get the block execution info which is
+ * required to execute any block based on query model
+ *
+ * @param queryModel query model from user query
+ * @param blockIndex block index
+ * @return block execution info
+ * @throws QueryExecutionException any failure during block info creation
+ */
+ protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
+ AbstractIndex blockIndex) throws QueryExecutionException {
+ BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
+ SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
+ List<CarbonDimension> tableBlockDimensions = segmentProperties.getDimensions();
+ KeyGenerator blockKeyGenerator = segmentProperties.getDimensionKeyGenerator();
+
+ // below is to get only those dimension in query which is present in the
+ // table block
+ List<QueryDimension> updatedQueryDimension = RestructureUtil
+ .getUpdatedQueryDimension(queryModel.getQueryDimension(), tableBlockDimensions,
+ segmentProperties.getComplexDimensions());
+ // TODO add complex dimension children
+ int[] maskByteRangesForBlock =
+ QueryUtil.getMaskedByteRange(updatedQueryDimension, blockKeyGenerator);
+ int[] maksedByte =
+ QueryUtil.getMaskedByte(blockKeyGenerator.getKeySizeInBytes(), maskByteRangesForBlock);
+ blockExecutionInfo.setQueryDimensions(
+ updatedQueryDimension.toArray(new QueryDimension[updatedQueryDimension.size()]));
+ blockExecutionInfo.setQueryMeasures(queryModel.getQueryMeasures()
+ .toArray(new QueryMeasure[queryModel.getQueryMeasures().size()]));
+ blockExecutionInfo.setDataBlock(blockIndex);
+ blockExecutionInfo.setBlockKeyGenerator(blockKeyGenerator);
+ // adding aggregation info for query
+ blockExecutionInfo.setAggregatorInfo(getAggregatorInfoForBlock(queryModel, blockIndex));
+ // adding query statistics list to record the statistics
+ blockExecutionInfo.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
+ // setting the limit
+ blockExecutionInfo.setLimit(queryModel.getLimit());
+ // setting whether detail query or not
+ blockExecutionInfo.setDetailQuery(queryModel.isDetailQuery());
+ // setting whether raw record query or not
+ blockExecutionInfo.setRawRecordDetailQuery(queryModel.isForcedDetailRawQuery());
+ // setting the masked byte of the block which will be
+ // used to update the unpack the older block keys
+ blockExecutionInfo.setMaskedByteForBlock(maksedByte);
+ // total number dimension
+ blockExecutionInfo
+ .setTotalNumberDimensionBlock(segmentProperties.getDimensionOrdinalToBlockMapping().size());
+ blockExecutionInfo
+ .setTotalNumberOfMeasureBlock(segmentProperties.getMeasuresOrdinalToBlockMapping().size());
+ blockExecutionInfo.setComplexDimensionInfoMap(QueryUtil
+ .getComplexDimensionsMap(updatedQueryDimension,
+ segmentProperties.getDimensionOrdinalToBlockMapping(),
+ segmentProperties.getEachComplexDimColumnValueSize(),
+ queryProperties.columnToDictionayMapping, queryProperties.complexFilterDimension));
+ // to check whether older block key update is required or not
+ blockExecutionInfo.setFixedKeyUpdateRequired(
+ !blockKeyGenerator.equals(queryProperties.keyStructureInfo.getKeyGenerator()));
+ IndexKey startIndexKey = null;
+ IndexKey endIndexKey = null;
+ if (null != queryModel.getFilterExpressionResolverTree()) {
+ // loading the filter executer tree for filter evaluation
+ blockExecutionInfo.setFilterExecuterTree(FilterUtil
+ .getFilterExecuterTree(queryModel.getFilterExpressionResolverTree(), segmentProperties,
+ blockExecutionInfo.getComlexDimensionInfoMap()));
+ List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
+ FilterUtil.traverseResolverTreeAndGetStartAndEndKey(segmentProperties,
+ queryModel.getAbsoluteTableIdentifier(), queryModel.getFilterExpressionResolverTree(),
+ listOfStartEndKeys);
+ startIndexKey = listOfStartEndKeys.get(0);
+ endIndexKey = listOfStartEndKeys.get(1);
+ } else {
+ try {
+ startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+ endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+ } catch (KeyGenException e) {
+ throw new QueryExecutionException(e);
+ }
+ }
+ blockExecutionInfo.setFileType(
+ FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath()));
+ //setting the start index key of the block node
+ blockExecutionInfo.setStartKey(startIndexKey);
+ //setting the end index key of the block node
+ blockExecutionInfo.setEndKey(endIndexKey);
+ // expression dimensions
+ List<CarbonDimension> expressionDimensions =
+ new ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ // expression measure
+ List<CarbonMeasure> expressionMeasures =
+ new ArrayList<CarbonMeasure>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ // setting all the dimension chunk indexes to be read from file
+ blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(QueryUtil
+ .getDimensionsBlockIndexes(updatedQueryDimension,
+ segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions));
+ // setting all the measure chunk indexes to be read from file
+ blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(QueryUtil
+ .getMeasureBlockIndexes(queryModel.getQueryMeasures(), expressionMeasures,
+ segmentProperties.getMeasuresOrdinalToBlockMapping()));
+ // setting the key structure info which will be required
+ // to update the older block key with new key generator
+ blockExecutionInfo.setKeyStructureInfo(queryProperties.keyStructureInfo);
+ // setting the size of fixed key column (dictionary column)
+ blockExecutionInfo.setFixedLengthKeySize(getKeySize(updatedQueryDimension, segmentProperties));
+ Set<Integer> dictionaryColumnBlockIndex = new HashSet<Integer>();
+ List<Integer> noDictionaryColumnBlockIndex = new ArrayList<Integer>();
+ // get the block index to be read from file for query dimension
+ // for both dictionary columns and no dictionary columns
+ QueryUtil.fillQueryDimensionsBlockIndexes(updatedQueryDimension,
+ segmentProperties.getDimensionOrdinalToBlockMapping(), dictionaryColumnBlockIndex,
+ noDictionaryColumnBlockIndex);
+ int[] queryDictionaryColumnBlockIndexes = ArrayUtils.toPrimitive(
+ dictionaryColumnBlockIndex.toArray(new Integer[dictionaryColumnBlockIndex.size()]));
+ // need to sort the dictionary column as for all dimension
+ // column key will be filled based on key order
+ Arrays.sort(queryDictionaryColumnBlockIndexes);
+ blockExecutionInfo.setDictionaryColumnBlockIndex(queryDictionaryColumnBlockIndexes);
+ // setting the no dictionary column block indexes
+ blockExecutionInfo.setNoDictionaryBlockIndexes(ArrayUtils.toPrimitive(
+ noDictionaryColumnBlockIndex.toArray(new Integer[noDictionaryColumnBlockIndex.size()])));
+ // setting column id to dictionary mapping
+ blockExecutionInfo.setColumnIdToDcitionaryMapping(queryProperties.columnToDictionayMapping);
+ // setting each column value size
+ blockExecutionInfo.setEachColumnValueSize(segmentProperties.getEachDimColumnValueSize());
+ blockExecutionInfo.setComplexColumnParentBlockIndexes(
+ getComplexDimensionParentBlockIndexes(updatedQueryDimension));
+ try {
+ // to set column group and its key structure info which will be used
+ // to
+ // for getting the column group column data in case of final row
+ // and in case of dimension aggregation
+ blockExecutionInfo.setColumnGroupToKeyStructureInfo(
+ QueryUtil.getColumnGroupKeyStructureInfo(updatedQueryDimension, segmentProperties));
+ } catch (KeyGenException e) {
+ throw new QueryExecutionException(e);
+ }
+ return blockExecutionInfo;
+ }
+
+ /**
+ * This method will be used to get fixed key length size this will be used
+ * to create a row from column chunk
+ *
+ * @param queryDimension query dimension
+ * @param blockMetadataInfo block metadata info
+ * @return key size
+ */
+ private int getKeySize(List<QueryDimension> queryDimension, SegmentProperties blockMetadataInfo) {
+ List<Integer> fixedLengthDimensionOrdinal =
+ new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ int counter = 0;
+ while (counter < queryDimension.size()) {
+ if (queryDimension.get(counter).getDimension().numberOfChild() > 0) {
+ counter += queryDimension.get(counter).getDimension().numberOfChild();
+ continue;
+ } else if (!CarbonUtil.hasEncoding(queryDimension.get(counter).getDimension().getEncoder(),
+ Encoding.DICTIONARY)) {
+ counter++;
+ } else {
+ fixedLengthDimensionOrdinal.add(queryDimension.get(counter).getDimension().getKeyOrdinal());
+ counter++;
+ }
+ }
+ int[] dictioanryColumnOrdinal = ArrayUtils.toPrimitive(
+ fixedLengthDimensionOrdinal.toArray(new Integer[fixedLengthDimensionOrdinal.size()]));
+ if (dictioanryColumnOrdinal.length > 0) {
+ return blockMetadataInfo.getFixedLengthKeySplitter()
+ .getKeySizeByBlock(dictioanryColumnOrdinal);
+ }
+ return 0;
+ }
+
+ /**
+ * Below method will be used to get the aggrgator info for the query
+ *
+ * @param queryModel query model
+ * @param tableBlock table block
+ * @return aggregator info
+ */
+ private AggregatorInfo getAggregatorInfoForBlock(QueryModel queryModel,
+ AbstractIndex tableBlock) {
+ // getting the aggregate infos which will be used during aggregation
+ AggregatorInfo aggregatorInfos = RestructureUtil
+ .getAggregatorInfos(queryModel.getQueryMeasures(),
+ tableBlock.getSegmentProperties().getMeasures());
+ // setting the index of expression in measure aggregators
+ aggregatorInfos.setExpressionAggregatorStartIndex(queryProperties.aggExpressionStartIndex);
+ // setting the index of measure columns in measure aggregators
+ aggregatorInfos.setMeasureAggregatorStartIndex(queryProperties.measureStartIndex);
+ // setting the measure aggregator for all aggregation function selected
+ // in query
+ aggregatorInfos.setMeasureDataTypes(queryProperties.measureDataTypes);
+ return aggregatorInfos;
+ }
+
+ private int[] getComplexDimensionParentBlockIndexes(List<QueryDimension> queryDimensions) {
+ List<Integer> parentBlockIndexList = new ArrayList<Integer>();
+ for (QueryDimension queryDimension : queryDimensions) {
+ if (CarbonUtil.hasDataType(queryDimension.getDimension().getDataType(),
+ new DataType[] { DataType.ARRAY, DataType.STRUCT, DataType.MAP })) {
+ parentBlockIndexList.add(queryDimension.getDimension().getOrdinal());
+ }
+ }
+ return ArrayUtils
+ .toPrimitive(parentBlockIndexList.toArray(new Integer[parentBlockIndexList.size()]));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
new file mode 100644
index 0000000..716cdc7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.iterator.DetailQueryResultIterator;
+
+/**
+ * Below class will be used to execute the detail query
+ * For executing the detail query it will pass all the block execution
+ * info to detail query result iterator and iterator will be returned
+ */
+public class DetailQueryExecutor extends AbstractQueryExecutor {
+
+ @Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
+ throws QueryExecutionException {
+ List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
+ return new DetailQueryResultIterator(blockExecutionInfoList, queryModel);
+ }
+
+}