You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/02/27 01:32:16 UTC
[3/5] carbondata git commit: [HOTFIX] Add dava doc for datamap
interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
deleted file mode 100644
index ef169af..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
+++ /dev/null
@@ -1,981 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.blockletindex;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cacheable;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
-import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.indexstore.BlockMetaInfo;
-import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
-import org.apache.carbondata.core.indexstore.row.DataMapRow;
-import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
-import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-import org.apache.commons.lang3.StringUtils;
-import org.xerial.snappy.Snappy;
-
-/**
- * Datamap implementation for blocklet.
- */
-public class BlockletIndexDataMap extends AbstractCoarseGrainIndexDataMap implements Cacheable {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(BlockletIndexDataMap.class.getName());
-
- private static int KEY_INDEX = 0;
-
- private static int MIN_VALUES_INDEX = 1;
-
- private static int MAX_VALUES_INDEX = 2;
-
- private static int ROW_COUNT_INDEX = 3;
-
- private static int FILE_PATH_INDEX = 4;
-
- private static int PAGE_COUNT_INDEX = 5;
-
- private static int VERSION_INDEX = 6;
-
- private static int SCHEMA_UPADATED_TIME_INDEX = 7;
-
- private static int BLOCK_INFO_INDEX = 8;
-
- private static int BLOCK_FOOTER_OFFSET = 9;
-
- private static int LOCATIONS = 10;
-
- private static int BLOCKLET_ID_INDEX = 11;
-
- private static int BLOCK_LENGTH = 12;
-
- private static int TASK_MIN_VALUES_INDEX = 0;
-
- private static int TASK_MAX_VALUES_INDEX = 1;
-
- private static int SCHEMA = 2;
-
- private static int PARTITION_INFO = 3;
-
- private UnsafeMemoryDMStore unsafeMemoryDMStore;
-
- private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
-
- private SegmentProperties segmentProperties;
-
- private int[] columnCardinality;
-
- private boolean isPartitionedSegment;
-
- @Override
- public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
- long startTime = System.currentTimeMillis();
- assert (dataMapModel instanceof BlockletDataMapModel);
- BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
- List<DataFileFooter> indexInfo = fileFooterConverter
- .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
- isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment();
- DataMapRowImpl summaryRow = null;
- byte[] schemaBinary = null;
- // below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID
- // is id assigned to a blocklet within a part file
- String tempFilePath = null;
- int relativeBlockletId = 0;
- for (DataFileFooter fileFooter : indexInfo) {
- if (segmentProperties == null) {
- List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
- schemaBinary = convertSchemaToBinary(columnInTable);
- columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
- segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
- createSchema(segmentProperties);
- createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions(), schemaBinary);
- }
- TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
- BlockMetaInfo blockMetaInfo =
- blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath());
- // Here it loads info about all blocklets of index
- // Only add if the file exists physically. There are scenarios which index file exists inside
- // merge index but related carbondata files are deleted. In that case we first check whether
- // the file exists physically or not
- if (blockMetaInfo != null) {
- if (fileFooter.getBlockletList() == null) {
- // This is old store scenario, here blocklet information is not available in index file so
- // load only block info
- summaryRow =
- loadToUnsafeBlock(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow,
- blockMetaInfo);
- } else {
- // blocklet ID will start from 0 again only when part file path is changed
- if (null == tempFilePath || !tempFilePath.equals(blockInfo.getFilePath())) {
- tempFilePath = blockInfo.getFilePath();
- relativeBlockletId = 0;
- }
- summaryRow =
- loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow,
- blockMetaInfo, relativeBlockletId);
- // this is done because relative blocklet id need to be incremented based on the
- // total number of blocklets
- relativeBlockletId += fileFooter.getBlockletList().size();
- }
- }
- }
- if (unsafeMemoryDMStore != null) {
- unsafeMemoryDMStore.finishWriting();
- }
- if (null != unsafeMemorySummaryDMStore) {
- addTaskSummaryRowToUnsafeMemoryStore(
- summaryRow,
- blockletDataMapInfo.getPartitions(),
- schemaBinary);
- unsafeMemorySummaryDMStore.finishWriting();
- }
- LOGGER.info(
- "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
- System.currentTimeMillis() - startTime));
- }
-
- private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
- SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow,
- BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
- int[] minMaxLen = segmentProperties.getColumnsValueSize();
- List<BlockletInfo> blockletList = fileFooter.getBlockletList();
- CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
- // Add one row to maintain task level min max for segment pruning
- if (!blockletList.isEmpty() && summaryRow == null) {
- summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
- }
- for (int index = 0; index < blockletList.size(); index++) {
- DataMapRow row = new DataMapRowImpl(schema);
- int ordinal = 0;
- int taskMinMaxOrdinal = 0;
- BlockletInfo blockletInfo = blockletList.get(index);
-
- // add start key as index key
- row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
-
- BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
- byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
- row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
- // compute and set task level min values
- addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
- TASK_MIN_VALUES_INDEX, true);
- ordinal++;
- taskMinMaxOrdinal++;
- byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
- row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
- // compute and set task level max values
- addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
- TASK_MAX_VALUES_INDEX, false);
- ordinal++;
-
- row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
-
- // add file path
- byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
- row.setByteArray(filePathBytes, ordinal++);
-
- // add pages
- row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
-
- // add version number
- row.setShort(fileFooter.getVersionId().number(), ordinal++);
-
- // add schema updated time
- row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
-
- // add blocklet info
- byte[] serializedData;
- try {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- DataOutput dataOutput = new DataOutputStream(stream);
- blockletInfo.write(dataOutput);
- serializedData = stream.toByteArray();
- row.setByteArray(serializedData, ordinal++);
- // Add block footer offset, it is used if we need to read footer of block
- row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
- setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
- ordinal++;
- // for relative blockelt id i.e blocklet id that belongs to a particular part file
- row.setShort((short) relativeBlockletId++, ordinal++);
- // Store block size
- row.setLong(blockMetaInfo.getSize(), ordinal);
- unsafeMemoryDMStore.addIndexRowToUnsafe(row);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- return summaryRow;
- }
-
- private void setLocations(String[] locations, DataMapRow row, int ordinal)
- throws UnsupportedEncodingException {
- // Add location info
- String locationStr = StringUtils.join(locations, ',');
- row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), ordinal);
- }
-
- /**
- * Load information for the block.It is the case can happen only for old stores
- * where blocklet information is not available in index file. So load only block information
- * and read blocklet information in executor.
- */
- private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter,
- SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow,
- BlockMetaInfo blockMetaInfo) {
- int[] minMaxLen = segmentProperties.getColumnsValueSize();
- BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
- CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
- // Add one row to maintain task level min max for segment pruning
- if (summaryRow == null) {
- summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
- }
- DataMapRow row = new DataMapRowImpl(schema);
- int ordinal = 0;
- int taskMinMaxOrdinal = 0;
- // add start key as index key
- row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++);
-
- BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
- byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
- row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
- // compute and set task level min values
- addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
- TASK_MIN_VALUES_INDEX, true);
- ordinal++;
- taskMinMaxOrdinal++;
- byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
- row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
- // compute and set task level max values
- addTaskMinMaxValues(summaryRow, minMaxLen,
- unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
- TASK_MAX_VALUES_INDEX, false);
- ordinal++;
-
- row.setInt((int)fileFooter.getNumberOfRows(), ordinal++);
-
- // add file path
- byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
- row.setByteArray(filePathBytes, ordinal++);
-
- // add pages
- row.setShort((short) 0, ordinal++);
-
- // add version number
- row.setShort(fileFooter.getVersionId().number(), ordinal++);
-
- // add schema updated time
- row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
-
- // add blocklet info
- row.setByteArray(new byte[0], ordinal++);
-
- row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
- try {
- setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
- ordinal++;
- // for relative blocklet id. Value is -1 because in case of old store blocklet info will
- // not be present in the index file and in that case we will not knwo the total number of
- // blocklets
- row.setShort((short) -1, ordinal++);
-
- // store block size
- row.setLong(blockMetaInfo.getSize(), ordinal);
- unsafeMemoryDMStore.addIndexRowToUnsafe(row);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return summaryRow;
- }
-
- private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow,
- List<String> partitions, byte[] schemaBinary) throws IOException {
- // write the task summary info to unsafe memory store
- if (null != summaryRow) {
- // Add column schema , it is useful to generate segment properties in executor.
- // So we no need to read footer again there.
- if (schemaBinary != null) {
- summaryRow.setByteArray(schemaBinary, SCHEMA);
- }
- if (partitions != null && partitions.size() > 0) {
- CarbonRowSchema[] minSchemas =
- ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore
- .getSchema()[PARTITION_INFO]).getChildSchemas();
- DataMapRow partitionRow = new DataMapRowImpl(minSchemas);
- for (int i = 0; i < partitions.size(); i++) {
- partitionRow
- .setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
- i);
- }
- summaryRow.setRow(partitionRow, PARTITION_INFO);
- }
- try {
- unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * Fill the measures min values with minimum , this is needed for backward version compatability
- * as older versions don't store min values for measures
- */
- private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) {
- byte[][] updatedValues = minValues;
- if (minValues.length < minMaxLen.length) {
- updatedValues = new byte[minMaxLen.length][];
- System.arraycopy(minValues, 0, updatedValues, 0, minValues.length);
- List<CarbonMeasure> measures = segmentProperties.getMeasures();
- ByteBuffer buffer = ByteBuffer.allocate(8);
- for (int i = 0; i < measures.size(); i++) {
- buffer.rewind();
- DataType dataType = measures.get(i).getDataType();
- if (dataType == DataTypes.BYTE) {
- buffer.putLong(Byte.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.SHORT) {
- buffer.putLong(Short.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.INT) {
- buffer.putLong(Integer.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.LONG) {
- buffer.putLong(Long.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- } else if (DataTypes.isDecimal(dataType)) {
- updatedValues[minValues.length + i] =
- DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
- } else {
- buffer.putDouble(Double.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- }
- }
- }
- return updatedValues;
- }
-
- /**
- * Fill the measures max values with maximum , this is needed for backward version compatability
- * as older versions don't store max values for measures
- */
- private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) {
- byte[][] updatedValues = maxValues;
- if (maxValues.length < minMaxLen.length) {
- updatedValues = new byte[minMaxLen.length][];
- System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length);
- List<CarbonMeasure> measures = segmentProperties.getMeasures();
- ByteBuffer buffer = ByteBuffer.allocate(8);
- for (int i = 0; i < measures.size(); i++) {
- buffer.rewind();
- DataType dataType = measures.get(i).getDataType();
- if (dataType == DataTypes.BYTE) {
- buffer.putLong(Byte.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.SHORT) {
- buffer.putLong(Short.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.INT) {
- buffer.putLong(Integer.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.LONG) {
- buffer.putLong(Long.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- } else if (DataTypes.isDecimal(dataType)) {
- updatedValues[maxValues.length + i] =
- DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
- } else {
- buffer.putDouble(Double.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- }
- }
- }
- return updatedValues;
- }
-
- private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema,
- byte[][] minValues) {
- CarbonRowSchema[] minSchemas =
- ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
- DataMapRow minRow = new DataMapRowImpl(minSchemas);
- int minOrdinal = 0;
- // min value adding
- for (int i = 0; i < minMaxLen.length; i++) {
- minRow.setByteArray(minValues[i], minOrdinal++);
- }
- return minRow;
- }
-
- /**
- * This method will compute min/max values at task level
- *
- * @param taskMinMaxRow
- * @param minMaxLen
- * @param carbonRowSchema
- * @param minMaxValue
- * @param ordinal
- * @param isMinValueComparison
- */
- private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen,
- CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal,
- boolean isMinValueComparison) {
- DataMapRow row = taskMinMaxRow.getRow(ordinal);
- byte[][] updatedMinMaxValues = minMaxValue;
- if (null == row) {
- CarbonRowSchema[] minSchemas =
- ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
- row = new DataMapRowImpl(minSchemas);
- } else {
- byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal);
- // Compare and update min max values
- for (int i = 0; i < minMaxLen.length; i++) {
- int compare =
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]);
- if (isMinValueComparison) {
- if (compare < 0) {
- updatedMinMaxValues[i] = existingMinMaxValues[i];
- }
- } else if (compare > 0) {
- updatedMinMaxValues[i] = existingMinMaxValues[i];
- }
- }
- }
- int minMaxOrdinal = 0;
- // min/max value adding
- for (int i = 0; i < minMaxLen.length; i++) {
- row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++);
- }
- taskMinMaxRow.setRow(row, ordinal);
- }
-
- private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
- List<CarbonRowSchema> indexSchemas = new ArrayList<>();
-
- // Index key
- indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
- getMinMaxSchema(segmentProperties, indexSchemas);
-
- // for number of rows.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
-
- // for table block path
- indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
- // for number of pages.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
- // for version number.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
- // for schema updated time.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
- //for blocklet info
- indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
- // for block footer offset.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
- // for locations
- indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
- // for relative blocklet id i.e. blocklet id that belongs to a particular part file.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
- // for storing block length.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
- unsafeMemoryDMStore =
- new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
- }
-
- /**
- * Creates the schema to store summary information or the information which can be stored only
- * once per datamap. It stores datamap level max/min of each column and partition information of
- * datamap
- * @param segmentProperties
- * @param partitions
- * @throws MemoryException
- */
- private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions,
- byte[] schemaBinary)
- throws MemoryException {
- List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
- getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
- // for storing column schema
- taskMinMaxSchemas.add(
- new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length));
- if (partitions != null && partitions.size() > 0) {
- CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()];
- for (int i = 0; i < mapSchemas.length; i++) {
- mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
- }
- CarbonRowSchema mapSchema =
- new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
- mapSchemas);
- taskMinMaxSchemas.add(mapSchema);
- }
- unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
- taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
- }
-
- private void getMinMaxSchema(SegmentProperties segmentProperties,
- List<CarbonRowSchema> minMaxSchemas) {
- // Index key
- int[] minMaxLen = segmentProperties.getColumnsValueSize();
- // do it 2 times, one for min and one for max.
- for (int k = 0; k < 2; k++) {
- CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
- for (int i = 0; i < minMaxLen.length; i++) {
- if (minMaxLen[i] <= 0) {
- mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
- } else {
- mapSchemas[i] =
- new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
- }
- }
- CarbonRowSchema mapSchema =
- new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
- mapSchemas);
- minMaxSchemas.add(mapSchema);
- }
- }
-
- @Override
- public boolean isScanRequired(FilterResolverIntf filterExp) {
- FilterExecuter filterExecuter =
- FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
- for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
- boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
- filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
- getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
- if (isScanRequired) {
- return true;
- }
- }
- return false;
- }
-
- private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) {
- if (unsafeMemoryDMStore.getRowCount() == 0) {
- return new ArrayList<>();
- }
- // getting the start and end index key based on filter for hitting the
- // selected block reference nodes based on filter resolver tree.
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("preparing the start and end key for finding"
- + "start and end block as per filter resolver");
- }
- List<Blocklet> blocklets = new ArrayList<>();
- Comparator<DataMapRow> comparator =
- new BlockletDMComparator(segmentProperties.getColumnsValueSize(),
- segmentProperties.getNumberOfSortColumns(),
- segmentProperties.getNumberOfNoDictSortColumns());
- List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
- FilterUtil
- .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys);
- // reading the first value from list which has start key
- IndexKey searchStartKey = listOfStartEndKeys.get(0);
- // reading the last value from list which has end key
- IndexKey searchEndKey = listOfStartEndKeys.get(1);
- if (null == searchStartKey && null == searchEndKey) {
- try {
- // TODO need to handle for no dictionary dimensions
- searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
- // TODO need to handle for no dictionary dimensions
- searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
- } catch (KeyGenException e) {
- return null;
- }
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Successfully retrieved the start and end key" + "Dictionary Start Key: " + Arrays
- .toString(searchStartKey.getDictionaryKeys()) + "No Dictionary Start Key " + Arrays
- .toString(searchStartKey.getNoDictionaryKeys()) + "Dictionary End Key: " + Arrays
- .toString(searchEndKey.getDictionaryKeys()) + "No Dictionary End Key " + Arrays
- .toString(searchEndKey.getNoDictionaryKeys()));
- }
- if (filterExp == null) {
- int rowCount = unsafeMemoryDMStore.getRowCount();
- for (int i = 0; i < rowCount; i++) {
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
- blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)));
- }
- } else {
- int startIndex = findStartIndex(convertToRow(searchStartKey), comparator);
- int endIndex = findEndIndex(convertToRow(searchEndKey), comparator);
- FilterExecuter filterExecuter =
- FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
- while (startIndex <= endIndex) {
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
- int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
- String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
- CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
- boolean isValid =
- addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, MAX_VALUES_INDEX),
- getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, blockletId);
- if (isValid) {
- blocklets.add(createBlocklet(safeRow, blockletId));
- }
- startIndex++;
- }
- }
- return blocklets;
- }
-
- @Override
- public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
- List<String> partitions) {
- if (unsafeMemoryDMStore.getRowCount() == 0) {
- return new ArrayList<>();
- }
- // First get the partitions which are stored inside datamap.
- List<String> storedPartitions = getPartitions();
- // if it has partitioned datamap but there is no partitioned information stored, it means
- // partitions are dropped so return empty list.
- if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) {
- return new ArrayList<>();
- }
- if (storedPartitions != null && storedPartitions.size() > 0) {
- // Check the exact match of partition information inside the stored partitions.
- boolean found = false;
- if (partitions != null && partitions.size() > 0) {
- found = partitions.containsAll(storedPartitions);
- }
- if (!found) {
- return new ArrayList<>();
- }
- }
- // Prune with filters if the partitions are existed in this datamap
- return prune(filterExp, segmentProperties);
- }
-
- /**
- * select the blocks based on column min and max value
- *
- * @param filterExecuter
- * @param maxValue
- * @param minValue
- * @param filePath
- * @param blockletId
- * @return
- */
- private boolean addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, byte[][] maxValue,
- byte[][] minValue, String filePath, int blockletId) {
- BitSet bitSet = null;
- if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
- String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
- // this case will come in case of old store where index file does not contain the
- // blocklet information
- if (blockletId != -1) {
- uniqueBlockPath = uniqueBlockPath + CarbonCommonConstants.FILE_SEPARATOR + blockletId;
- }
- bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
- .isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath);
- } else {
- bitSet = filterExecuter.isScanRequired(maxValue, minValue);
- }
- if (!bitSet.isEmpty()) {
- return true;
- } else {
- return false;
- }
- }
-
- public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
- int index = Integer.parseInt(blockletId);
- DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
- return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
- }
-
- private byte[][] getMinMaxValue(DataMapRow row, int index) {
- DataMapRow minMaxRow = row.getRow(index);
- byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
- for (int i = 0; i < minMax.length; i++) {
- minMax[i] = minMaxRow.getByteArray(i);
- }
- return minMax;
- }
-
- private ExtendedBlocklet createBlocklet(DataMapRow row, int blockletId) {
- ExtendedBlocklet blocklet = new ExtendedBlocklet(
- new String(row.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
- blockletId + "");
- BlockletDetailInfo detailInfo = new BlockletDetailInfo();
- detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
- detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
- detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
- detailInfo.setBlockletId((short) blockletId);
- detailInfo.setDimLens(columnCardinality);
- detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
- byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
- BlockletInfo blockletInfo = null;
- try {
- if (byteArray.length > 0) {
- blockletInfo = new BlockletInfo();
- ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
- DataInputStream inputStream = new DataInputStream(stream);
- blockletInfo.readFields(inputStream);
- inputStream.close();
- }
- blocklet.setLocation(
- new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
- .split(","));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- detailInfo.setBlockletInfo(blockletInfo);
- blocklet.setDetailInfo(detailInfo);
- detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
- detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
- detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH));
- return blocklet;
- }
-
- /**
- * Binary search used to get the first tentative index row based on
- * search key
- *
- * @param key search key
- * @return first tentative block
- */
- private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
- int childNodeIndex;
- int low = 0;
- int high = unsafeMemoryDMStore.getRowCount() - 1;
- int mid = 0;
- int compareRes = -1;
- //
- while (low <= high) {
- mid = (low + high) >>> 1;
- // compare the entries
- compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
- if (compareRes < 0) {
- high = mid - 1;
- } else if (compareRes > 0) {
- low = mid + 1;
- } else {
- // if key is matched then get the first entry
- int currentPos = mid;
- while (currentPos - 1 >= 0
- && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
- currentPos--;
- }
- mid = currentPos;
- break;
- }
- }
- // if compare result is less than zero then we
- // and mid is more than 0 then we need to previous block as duplicates
- // record can be present
- if (compareRes < 0) {
- if (mid > 0) {
- mid--;
- }
- childNodeIndex = mid;
- } else {
- childNodeIndex = mid;
- }
- // get the leaf child
- return childNodeIndex;
- }
-
- /**
- * Binary search used to get the last tentative block based on
- * search key
- *
- * @param key search key
- * @return first tentative block
- */
- private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
- int childNodeIndex;
- int low = 0;
- int high = unsafeMemoryDMStore.getRowCount() - 1;
- int mid = 0;
- int compareRes = -1;
- //
- while (low <= high) {
- mid = (low + high) >>> 1;
- // compare the entries
- compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
- if (compareRes < 0) {
- high = mid - 1;
- } else if (compareRes > 0) {
- low = mid + 1;
- } else {
- int currentPos = mid;
- // if key is matched then get the first entry
- while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
- && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
- currentPos++;
- }
- mid = currentPos;
- break;
- }
- }
- // if compare result is less than zero then we
- // and mid is more than 0 then we need to previous block as duplicates
- // record can be present
- if (compareRes < 0) {
- if (mid > 0) {
- mid--;
- }
- childNodeIndex = mid;
- } else {
- childNodeIndex = mid;
- }
- return childNodeIndex;
- }
-
- private DataMapRow convertToRow(IndexKey key) {
- ByteBuffer buffer =
- ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8);
- buffer.putInt(key.getDictionaryKeys().length);
- buffer.putInt(key.getNoDictionaryKeys().length);
- buffer.put(key.getDictionaryKeys());
- buffer.put(key.getNoDictionaryKeys());
- DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
- dataMapRow.setByteArray(buffer.array(), 0);
- return dataMapRow;
- }
-
- private List<String> getPartitions() {
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
- if (unsafeRow.getColumnCount() > PARTITION_INFO) {
- List<String> partitions = new ArrayList<>();
- DataMapRow row = unsafeRow.getRow(PARTITION_INFO);
- for (int i = 0; i < row.getColumnCount(); i++) {
- partitions.add(
- new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
- }
- return partitions;
- }
- return null;
- }
-
- private byte[] getColumnSchemaBinary() {
- DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
- return unsafeRow.getByteArray(SCHEMA);
- }
-
- /**
- * Convert schema to binary
- */
- private byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) throws IOException {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- DataOutput dataOutput = new DataOutputStream(stream);
- dataOutput.writeShort(columnSchemas.size());
- for (ColumnSchema columnSchema : columnSchemas) {
- if (columnSchema.getColumnReferenceId() == null) {
- columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId());
- }
- columnSchema.write(dataOutput);
- }
- byte[] byteArray = stream.toByteArray();
- // Compress with snappy to reduce the size of schema
- return Snappy.rawCompress(byteArray, byteArray.length);
- }
-
- @Override
- public void clear() {
- if (unsafeMemoryDMStore != null) {
- unsafeMemoryDMStore.freeMemory();
- unsafeMemoryDMStore = null;
- segmentProperties = null;
- }
- // clear task min/max unsafe memory
- if (null != unsafeMemorySummaryDMStore) {
- unsafeMemorySummaryDMStore.freeMemory();
- unsafeMemorySummaryDMStore = null;
- }
- }
-
- @Override
- public long getFileTimeStamp() {
- return 0;
- }
-
- @Override
- public int getAccessCount() {
- return 0;
- }
-
- @Override
- public long getMemorySize() {
- long memoryUsed = 0L;
- if (unsafeMemoryDMStore != null) {
- memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
- }
- if (null != unsafeMemorySummaryDMStore) {
- memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
- }
- return memoryUsed;
- }
-
- public SegmentProperties getSegmentProperties() {
- return segmentProperties;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
deleted file mode 100644
index a2c65ba..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.blockletindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.datamap.DataMapDistributable;
-import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
-import org.apache.carbondata.core.datamap.dev.IndexDataMap;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
-import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.events.Event;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
-/**
- * Table map for blocklet
- */
-public class BlockletIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory
- implements BlockletDetailsFetcher,
- SegmentPropertiesFetcher {
-
- private static final String NAME = "clustered.btree.blocklet";
-
- public static final DataMapSchema DATA_MAP_SCHEMA =
- new DataMapSchema(NAME, BlockletIndexDataMapFactory.class.getName());
-
- private AbsoluteTableIdentifier identifier;
-
- // segmentId -> list of index file
- private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
-
- private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainIndexDataMap> cache;
-
- @Override
- public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
- this.identifier = identifier;
- cache = CacheProvider.getInstance()
- .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
- }
-
- @Override
- public AbstractDataMapWriter createWriter(String segmentId, String dataWriterPath) {
- throw new UnsupportedOperationException("not implemented");
- }
-
- @Override
- public List<AbstractCoarseGrainIndexDataMap> getDataMaps(String segmentId) throws IOException {
- List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
- getTableBlockIndexUniqueIdentifiers(segmentId);
- return cache.getAll(tableBlockIndexUniqueIdentifiers);
- }
-
- private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
- String segmentId) throws IOException {
- List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
- segmentMap.get(segmentId);
- if (tableBlockIndexUniqueIdentifiers == null) {
- tableBlockIndexUniqueIdentifiers = new ArrayList<>();
- String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
- List<String> indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
- for (int i = 0; i < indexFiles.size(); i++) {
- tableBlockIndexUniqueIdentifiers.add(
- new TableBlockIndexUniqueIdentifier(identifier, segmentId, indexFiles.get(i)));
- }
- segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers);
- }
- return tableBlockIndexUniqueIdentifiers;
- }
-
- /**
- * Get the blocklet detail information based on blockletid, blockid and segmentid. This method is
- * exclusively for BlockletIndexDataMapFactory as detail information is only available in this
- * default datamap.
- */
- @Override
- public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId)
- throws IOException {
- List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
- // If it is already detailed blocklet then type cast and return same
- if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) {
- for (Blocklet blocklet : blocklets) {
- detailedBlocklets.add((ExtendedBlocklet) blocklet);
- }
- return detailedBlocklets;
- }
- List<TableBlockIndexUniqueIdentifier> identifiers =
- getTableBlockIndexUniqueIdentifiers(segmentId);
- // Retrieve each blocklets detail information from blocklet datamap
- for (Blocklet blocklet : blocklets) {
- detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
- }
- return detailedBlocklets;
- }
-
- @Override
- public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId)
- throws IOException {
- if (blocklet instanceof ExtendedBlocklet) {
- return (ExtendedBlocklet) blocklet;
- }
- List<TableBlockIndexUniqueIdentifier> identifiers =
- getTableBlockIndexUniqueIdentifiers(segmentId);
- return getExtendedBlocklet(identifiers, blocklet);
- }
-
- private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
- Blocklet blocklet) throws IOException {
- String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
- for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
- if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
- IndexDataMap indexDataMap = cache.get(identifier);
- return ((BlockletIndexDataMap) indexDataMap).getDetailedBlocklet(blocklet.getBlockletId());
- }
- }
- throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found ");
- }
-
-
-
- @Override
- public List<DataMapDistributable> toDistributable(String segmentId) {
- CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId);
- List<DataMapDistributable> distributables = new ArrayList<>();
- for (int i = 0; i < carbonIndexFiles.length; i++) {
- Path path = new Path(carbonIndexFiles[i].getPath());
- try {
- FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
- RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
- LocatedFileStatus fileStatus = iter.next();
- String[] location = fileStatus.getBlockLocations()[0].getHosts();
- BlockletDataMapDistributable distributable =
- new BlockletDataMapDistributable(path.getName());
- distributable.setLocations(location);
- distributables.add(distributable);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return distributables;
- }
-
- @Override public void fireEvent(Event event) {
-
- }
-
- @Override
- public void clear(String segmentId) {
- List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
- if (blockIndexes != null) {
- for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
- IndexDataMap indexDataMap = cache.getIfPresent(blockIndex);
- if (indexDataMap != null) {
- cache.invalidate(blockIndex);
- indexDataMap.clear();
- }
- }
- }
- }
-
- @Override
- public void clear() {
- for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
- clear(segmentId);
- }
- }
-
- @Override
- public List<AbstractCoarseGrainIndexDataMap> getDataMaps(DataMapDistributable distributable)
- throws IOException {
- BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
- List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
- if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
- identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
- mapDistributable.getFilePath()));
- } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
- List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), mapDistributable.getSegmentId())
- + "/" + mapDistributable.getFilePath());
- for (String indexFile : indexFiles) {
- identifiers.add(
- new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
- indexFile));
- }
- }
- List<AbstractCoarseGrainIndexDataMap> dataMaps;
- try {
- dataMaps = cache.getAll(identifiers);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return dataMaps;
- }
-
- @Override
- public DataMapMeta getMeta() {
- // TODO: pass SORT_COLUMNS into this class
- return null;
- }
-
- @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException {
- List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segmentId);
- assert (dataMaps.size() > 0);
- AbstractCoarseGrainIndexDataMap coarseGrainDataMap = dataMaps.get(0);
- assert (coarseGrainDataMap instanceof BlockletIndexDataMap);
- BlockletIndexDataMap dataMap = (BlockletIndexDataMap) coarseGrainDataMap;
- return dataMap.getSegmentProperties();
- }
-
- @Override public List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions)
- throws IOException {
- List<Blocklet> blocklets = new ArrayList<>();
- List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segmentId);
- for (AbstractCoarseGrainIndexDataMap dataMap : dataMaps) {
- blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), partitions));
- }
- return blocklets;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 2393c54..ff93fd7 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -776,7 +776,7 @@ public class CarbonTable implements Serializable {
}
/**
- * whether this table has aggregation IndexDataMap or not
+ * whether this table has aggregation DataMap or not
*/
public boolean hasAggregationDataMap() {
List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
new file mode 100644
index 0000000..fa142aa
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
@@ -0,0 +1,59 @@
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.lang.reflect.Method;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlockletDataMap extends AbstractDictionaryCacheTest {
+
+ ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor;
+ @Before public void setUp() throws Exception {
+ CarbonImplicitDimension carbonImplicitDimension =
+ new CarbonImplicitDimension(0, CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID);
+ DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new DimColumnResolvedFilterInfo();
+ dimColumnEvaluatorInfo.setColumnIndex(0);
+ dimColumnEvaluatorInfo.setRowIndex(0);
+ dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension);
+ dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false);
+ implicitIncludeFilterExecutor =
+ new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo);
+ }
+
+ @Test public void testaddBlockBasedOnMinMaxValue() throws Exception {
+
+ new MockUp<ImplicitIncludeFilterExecutorImpl>() {
+ @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue,
+ String uniqueBlockPath) {
+ BitSet bitSet = new BitSet(1);
+ bitSet.set(8);
+ return bitSet;
+ }
+ };
+
+ BlockletDataMap blockletDataMap = new BlockletDataMap();
+ Method method = BlockletDataMap.class
+ .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class,
+ byte[][].class, String.class, int.class);
+ method.setAccessible(true);
+
+ byte[][] minValue = { ByteUtil.toBytes("sfds") };
+ byte[][] maxValue = { ByteUtil.toBytes("resa") };
+ Object result = method
+ .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, maxValue,
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata",
+ 0);
+ assert ((boolean) result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
deleted file mode 100644
index 16048db..0000000
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.carbondata.core.indexstore.blockletindex;
-
-import java.lang.reflect.Method;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
-import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
-import org.apache.carbondata.core.util.ByteUtil;
-
-import mockit.Mock;
-import mockit.MockUp;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestBlockletIndexDataMap extends AbstractDictionaryCacheTest {
-
- ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor;
- @Before public void setUp() throws Exception {
- CarbonImplicitDimension carbonImplicitDimension =
- new CarbonImplicitDimension(0, CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID);
- DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new DimColumnResolvedFilterInfo();
- dimColumnEvaluatorInfo.setColumnIndex(0);
- dimColumnEvaluatorInfo.setRowIndex(0);
- dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension);
- dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false);
- implicitIncludeFilterExecutor =
- new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo);
- }
-
- @Test public void testaddBlockBasedOnMinMaxValue() throws Exception {
-
- new MockUp<ImplicitIncludeFilterExecutorImpl>() {
- @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue,
- String uniqueBlockPath) {
- BitSet bitSet = new BitSet(1);
- bitSet.set(8);
- return bitSet;
- }
- };
-
- BlockletIndexDataMap blockletDataMap = new BlockletIndexDataMap();
- Method method = BlockletIndexDataMap.class
- .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class,
- byte[][].class, String.class, int.class);
- method.setAccessible(true);
-
- byte[][] minValue = { ByteUtil.toBytes("sfds") };
- byte[][] maxValue = { ByteUtil.toBytes("resa") };
- Object result = method
- .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, maxValue,
- "/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata",
- 0);
- assert ((boolean) result);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index fe0bbcf..b18a8fe 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -28,18 +28,17 @@ import java.util.Map;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
-public class MinMaxDataWriter extends AbstractDataMapWriter {
+public class MinMaxDataWriter extends DataMapWriter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(TableInfo.class.getName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index 216000b..5f933b3 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -28,7 +28,7 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -47,7 +47,7 @@ import com.google.gson.Gson;
/**
* Datamap implementation for min max blocklet.
*/
-public class MinMaxIndexDataMap extends AbstractCoarseGrainIndexDataMap {
+public class MinMaxIndexDataMap extends CoarseGrainDataMap {
private static final LogService LOGGER =
LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 5f714a1..84a7c45 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -25,10 +25,10 @@ import java.util.List;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
@@ -38,7 +38,7 @@ import org.apache.carbondata.events.Event;
/**
* Min Max DataMap Factory
*/
-public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory {
+public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
private AbsoluteTableIdentifier identifier;
@@ -52,7 +52,7 @@ public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFa
* @param segmentId
* @return
*/
- @Override public AbstractDataMapWriter createWriter(String segmentId, String dataWritePath) {
+ @Override public DataMapWriter createWriter(String segmentId, String dataWritePath) {
return new MinMaxDataWriter(identifier, segmentId, dataWritePath);
}
@@ -63,9 +63,9 @@ public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFa
* @return
* @throws IOException
*/
- @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(String segmentId)
+ @Override public List<CoarseGrainDataMap> getDataMaps(String segmentId)
throws IOException {
- List<AbstractCoarseGrainIndexDataMap> dataMapList = new ArrayList<>();
+ List<CoarseGrainDataMap> dataMapList = new ArrayList<>();
// Form a dataMap of Type MinMaxIndexDataMap.
MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
try {
@@ -100,7 +100,7 @@ public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFa
@Override public void clear() {
}
- @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(DataMapDistributable distributable)
+ @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 98c3398..039bae2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -34,15 +34,15 @@ import java.util.Set;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datamap.DataMapType;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexDataMapFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -728,11 +728,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
List<String> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
List<ExtendedBlocklet> prunedBlocklets;
- if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapType.FG) {
+ if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
DistributableDataMapFormat datamapDstr =
new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
segmentIds, partitionsToPrune,
- BlockletIndexDataMapFactory.class.getName());
+ BlockletDataMapFactory.class.getName());
prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
// Apply expression on the blocklets.
prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
new file mode 100644
index 0000000..b81cce4
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
+import org.apache.carbondata.core.datastore.FileReader
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.ByteUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
+
+class CGDataMapFactory extends CoarseGrainDataMapFactory {
+ var identifier: AbsoluteTableIdentifier = _
+ var dataMapSchema: DataMapSchema = _
+
+ /**
+ * Initialization of Datamap factory with the identifier and datamap name
+ */
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
+ this.identifier = identifier
+ this.dataMapSchema = dataMapSchema
+ }
+
+ /**
+ * Return a new write for this datamap
+ */
+ override def createWriter(segmentId: String, dataWritePath: String): DataMapWriter = {
+ new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
+ }
+
+ /**
+ * Get the datamap for segmentid
+ */
+ override def getDataMaps(segmentId: String) = {
+ val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+ val files = file.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+ })
+ files.map {f =>
+ val dataMap: CoarseGrainDataMap = new CGDataMap()
+ dataMap.init(new DataMapModel(f.getCanonicalPath))
+ dataMap
+ }.toList.asJava
+ }
+
+
+ /**
+ * Get datamaps for distributable object.
+ */
+ override def getDataMaps(
+ distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
+ val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
+ val dataMap: CoarseGrainDataMap = new CGDataMap()
+ dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+ Seq(dataMap).asJava
+ }
+
+ /**
+ *
+ * @param event
+ */
+ override def fireEvent(event: Event): Unit = {
+ ???
+ }
+
+ /**
+ * Get all distributable objects of a segmentid
+ *
+ * @return
+ */
+ override def toDistributable(segmentId: String) = {
+ val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+
+ val files = file.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
+ })
+ files.map { f =>
+ val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath)
+ d
+ }.toList.asJava
+ }
+
+
+ /**
+ * Clears datamap of the segment
+ */
+ override def clear(segmentId: String): Unit = {
+
+ }
+
+ /**
+ * Clear all datamaps from memory
+ */
+ override def clear(): Unit = {
+
+ }
+
+ /**
+ * Return metadata of this datamap
+ */
+ override def getMeta: DataMapMeta = {
+ new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+ List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
+ }
+}
+
+class CGDataMap extends CoarseGrainDataMap {
+
+ var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _
+ var FileReader: FileReader = _
+ var filePath: String = _
+ val compressor = new SnappyCompressor
+
+ /**
+ * It is called to load the data map to memory or to initialize it.
+ */
+ override def init(dataMapModel: DataMapModel): Unit = {
+ this.filePath = dataMapModel.getFilePath
+ val size = FileFactory.getCarbonFile(filePath).getSize
+ FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+ val footerLen = FileReader.readInt(filePath, size-4)
+ val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
+ val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
+ val obj = new ObjectInputStream(in)
+ maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]]
+ }
+
+ /**
+ * Prune the datamap with filter expression. It returns the list of
+ * blocklets where these filters can exist.
+ *
+ * @param filterExp
+ * @return
+ */
+ override def prune(
+ filterExp: FilterResolverIntf,
+ segmentProperties: SegmentProperties,
+ partitions: java.util.List[String]): java.util.List[Blocklet] = {
+ val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
+ val expression = filterExp.getFilterExpression
+ getEqualToExpression(expression, buffer)
+ val value = buffer.map { f =>
+ f.getChildren.get(1).evaluate(null).getString
+ }
+ val meta = findMeta(value(0).getBytes)
+ meta.map { f=>
+ new Blocklet(f._1, f._2+"")
+ }.asJava
+ }
+
+
+ private def findMeta(value: Array[Byte]) = {
+ val tuples = maxMin.filter { f =>
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 &&
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0
+ }
+ tuples
+ }
+
+ private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
+ if (expression.isInstanceOf[EqualToExpression]) {
+ buffer += expression
+ } else {
+ if (expression.getChildren != null) {
+ expression.getChildren.asScala.map { f =>
+ if (f.isInstanceOf[EqualToExpression]) {
+ buffer += f
+ }
+ getEqualToExpression(f, buffer)
+ }
+ }
+ }
+ }
+
+ /**
+ * Clear complete index table and release memory.
+ */
+ override def clear() = {
+ ???
+ }
+
+ override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+}
+
+class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
+ segmentId: String,
+ dataWritePath: String,
+ dataMapSchema: DataMapSchema)
+ extends DataMapWriter(identifier, segmentId, dataWritePath) {
+
+ var currentBlockId: String = null
+ val cgwritepath = dataWritePath + "/" +
+ dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
+ lazy val stream: DataOutputStream = FileFactory
+ .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
+ val blockletList = new ArrayBuffer[Array[Byte]]()
+ val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]()
+ val compressor = new SnappyCompressor
+
+ /**
+ * Start of new block notification.
+ *
+ * @param blockId file name of the carbondata file
+ */
+ override def onBlockStart(blockId: String): Unit = {
+ currentBlockId = blockId
+ }
+
+ /**
+ * End of block notification
+ */
+ override def onBlockEnd(blockId: String): Unit = {
+
+ }
+
+ /**
+ * Start of new blocklet notification.
+ *
+ * @param blockletId sequence number of blocklet in the block
+ */
+ override def onBlockletStart(blockletId: Int): Unit = {
+
+ }
+
+ /**
+ * End of blocklet notification
+ *
+ * @param blockletId sequence number of blocklet in the block
+ */
+ override def onBlockletEnd(blockletId: Int): Unit = {
+ val sorted = blockletList
+ .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+ maxMin +=
+ ((currentBlockId+"", blockletId, (sorted.last, sorted.head)))
+ blockletList.clear()
+ }
+
+ /**
+ * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+ * DataMapMeta returned in DataMapFactory.
+ *
+ * Implementation should copy the content of `pages` as needed, because `pages` memory
+ * may be freed after this method returns, if using unsafe column page.
+ */
+ override def onPageAdded(blockletId: Int,
+ pageId: Int,
+ pages: Array[ColumnPage]): Unit = {
+ val size = pages(0).getPageSize
+ val list = new ArrayBuffer[Array[Byte]]()
+ var i = 0
+ while (i < size) {
+ val bytes = pages(0).getBytes(i)
+ val newBytes = new Array[Byte](bytes.length - 2)
+ System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
+ list += newBytes
+ i = i + 1
+ }
+ // Sort based on the column data in order to create index.
+ val sorted = list
+ .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
+ blockletList += sorted.head
+ blockletList += sorted.last
+ }
+
+
+ /**
+ * This is called during closing of writer.So after this call no more data will be sent to this
+ * class.
+ */
+ override def finish(): Unit = {
+ val out = new ByteOutputStream()
+ val outStream = new ObjectOutputStream(out)
+ outStream.writeObject(maxMin)
+ outStream.close()
+ val bytes = compressor.compressByte(out.getBytes)
+ stream.write(bytes)
+ stream.writeInt(bytes.length)
+ stream.close()
+ commitFile(cgwritepath)
+ }
+
+
+}
+
+class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
+
+ val file2 = resourcesPath + "/compaction/fil2.csv"
+ override protected def beforeAll(): Unit = {
+ //n should be about 5000000 of reset if size is default 1024
+ val n = 150000
+ CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql(
+ """
+ | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+ }
+
+ test("test cg datamap") {
+ sql("DROP TABLE IF EXISTS datamap_test_cg")
+ sql(
+ """
+ | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
+ // register datamap writer
+ sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
+ sql("select * from normal_test where name='n502670'"))
+ }
+
+ test("test cg datamap with 2 datamaps ") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+ sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+ sql("select * from normal_test where name='n502670' and city='c2670'"))
+ }
+
+ override protected def afterAll(): Unit = {
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+ sql("DROP TABLE IF EXISTS normal_test")
+ sql("DROP TABLE IF EXISTS datamap_test_cg")
+ }
+}