You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/08 16:55:41 UTC
[40/54] [abbrv] carbondata git commit: [CARBONDATA-2189] Add
DataMapProvider developer interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
new file mode 100644
index 0000000..34e11ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
@@ -0,0 +1,971 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.xerial.snappy.Snappy;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockletIndexDataMap extends AbstractCoarseGrainIndexDataMap implements Cacheable {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletIndexDataMap.class.getName());
+
+ private static int KEY_INDEX = 0;
+
+ private static int MIN_VALUES_INDEX = 1;
+
+ private static int MAX_VALUES_INDEX = 2;
+
+ private static int ROW_COUNT_INDEX = 3;
+
+ private static int FILE_PATH_INDEX = 4;
+
+ private static int PAGE_COUNT_INDEX = 5;
+
+ private static int VERSION_INDEX = 6;
+
+ private static int SCHEMA_UPADATED_TIME_INDEX = 7;
+
+ private static int BLOCK_INFO_INDEX = 8;
+
+ private static int BLOCK_FOOTER_OFFSET = 9;
+
+ private static int LOCATIONS = 10;
+
+ private static int BLOCKLET_ID_INDEX = 11;
+
+ private static int BLOCK_LENGTH = 12;
+
+ private static int TASK_MIN_VALUES_INDEX = 0;
+
+ private static int TASK_MAX_VALUES_INDEX = 1;
+
+ private static int SCHEMA = 2;
+
+ private static int INDEX_PATH = 3;
+
+ private static int INDEX_FILE_NAME = 4;
+
+ private static int SEGMENTID = 5;
+
+ private UnsafeMemoryDMStore unsafeMemoryDMStore;
+
+ private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
+
+ private SegmentProperties segmentProperties;
+
+ private int[] columnCardinality;
+
+ @Override
+ public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
+ long startTime = System.currentTimeMillis();
+ assert (dataMapModel instanceof BlockletDataMapModel);
+ BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ List<DataFileFooter> indexInfo = fileFooterConverter
+ .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
+ Path path = new Path(blockletDataMapInfo.getFilePath());
+ byte[] filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+ byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+ byte[] segmentId =
+ blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+ DataMapRowImpl summaryRow = null;
+ byte[] schemaBinary = null;
+ // below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID
+ // is id assigned to a blocklet within a part file
+ String tempFilePath = null;
+ int relativeBlockletId = 0;
+ for (DataFileFooter fileFooter : indexInfo) {
+ if (segmentProperties == null) {
+ List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+ schemaBinary = convertSchemaToBinary(columnInTable);
+ columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+ segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+ createSchema(segmentProperties);
+ createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
+ segmentId);
+ }
+ TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+ BlockMetaInfo blockMetaInfo =
+ blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath());
+ // Here it loads info about all blocklets of index
+ // Only add if the file exists physically. There are scenarios which index file exists inside
+ // merge index but related carbondata files are deleted. In that case we first check whether
+ // the file exists physically or not
+ if (blockMetaInfo != null) {
+ if (fileFooter.getBlockletList() == null) {
+ // This is old store scenario, here blocklet information is not available in index file so
+ // load only block info
+ summaryRow =
+ loadToUnsafeBlock(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow,
+ blockMetaInfo);
+ } else {
+ // blocklet ID will start from 0 again only when part file path is changed
+ if (null == tempFilePath || !tempFilePath.equals(blockInfo.getFilePath())) {
+ tempFilePath = blockInfo.getFilePath();
+ relativeBlockletId = 0;
+ }
+ summaryRow =
+ loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow,
+ blockMetaInfo, relativeBlockletId);
+ // this is done because relative blocklet id need to be incremented based on the
+ // total number of blocklets
+ relativeBlockletId += fileFooter.getBlockletList().size();
+ }
+ }
+ }
+ if (unsafeMemoryDMStore != null) {
+ unsafeMemoryDMStore.finishWriting();
+ }
+ if (null != unsafeMemorySummaryDMStore) {
+ addTaskSummaryRowToUnsafeMemoryStore(
+ summaryRow,
+ schemaBinary,
+ filePath,
+ fileName,
+ segmentId);
+ unsafeMemorySummaryDMStore.finishWriting();
+ }
+ LOGGER.info(
+ "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
+ System.currentTimeMillis() - startTime));
+ }
+
+ private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
+ SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow,
+ BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
+ int[] minMaxLen = segmentProperties.getColumnsValueSize();
+ List<BlockletInfo> blockletList = fileFooter.getBlockletList();
+ CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+ // Add one row to maintain task level min max for segment pruning
+ if (!blockletList.isEmpty() && summaryRow == null) {
+ summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+ }
+ for (int index = 0; index < blockletList.size(); index++) {
+ DataMapRow row = new DataMapRowImpl(schema);
+ int ordinal = 0;
+ int taskMinMaxOrdinal = 0;
+ BlockletInfo blockletInfo = blockletList.get(index);
+
+ // add start key as index key
+ row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
+
+ BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
+ byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
+ // compute and set task level min values
+ addTaskMinMaxValues(summaryRow, minMaxLen,
+ unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+ TASK_MIN_VALUES_INDEX, true);
+ ordinal++;
+ taskMinMaxOrdinal++;
+ byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
+ // compute and set task level max values
+ addTaskMinMaxValues(summaryRow, minMaxLen,
+ unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+ TASK_MAX_VALUES_INDEX, false);
+ ordinal++;
+
+ row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
+
+ // add file path
+ byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ row.setByteArray(filePathBytes, ordinal++);
+
+ // add pages
+ row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+
+ // add version number
+ row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+ // add schema updated time
+ row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+ // add blocklet info
+ byte[] serializedData;
+ try {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(stream);
+ blockletInfo.write(dataOutput);
+ serializedData = stream.toByteArray();
+ row.setByteArray(serializedData, ordinal++);
+ // Add block footer offset, it is used if we need to read footer of block
+ row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
+ setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
+ ordinal++;
+ // for relative blockelt id i.e blocklet id that belongs to a particular part file
+ row.setShort((short) relativeBlockletId++, ordinal++);
+ // Store block size
+ row.setLong(blockMetaInfo.getSize(), ordinal);
+ unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return summaryRow;
+ }
+
+ private void setLocations(String[] locations, DataMapRow row, int ordinal)
+ throws UnsupportedEncodingException {
+ // Add location info
+ String locationStr = StringUtils.join(locations, ',');
+ row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), ordinal);
+ }
+
+ /**
+ * Load information for the block.It is the case can happen only for old stores
+ * where blocklet information is not available in index file. So load only block information
+ * and read blocklet information in executor.
+ */
+ private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter,
+ SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow,
+ BlockMetaInfo blockMetaInfo) {
+ int[] minMaxLen = segmentProperties.getColumnsValueSize();
+ BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
+ CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+ // Add one row to maintain task level min max for segment pruning
+ if (summaryRow == null) {
+ summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+ }
+ DataMapRow row = new DataMapRowImpl(schema);
+ int ordinal = 0;
+ int taskMinMaxOrdinal = 0;
+ // add start key as index key
+ row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++);
+
+ BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
+ byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
+ byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
+ // update min max values in case of old store
+ byte[][] updatedMinValues =
+ CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, true);
+ byte[][] updatedMaxValues =
+ CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal);
+ // compute and set task level min values
+ addTaskMinMaxValues(summaryRow, minMaxLen,
+ unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
+ TASK_MIN_VALUES_INDEX, true);
+ ordinal++;
+ taskMinMaxOrdinal++;
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal);
+ // compute and set task level max values
+ addTaskMinMaxValues(summaryRow, minMaxLen,
+ unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
+ TASK_MAX_VALUES_INDEX, false);
+ ordinal++;
+
+ row.setInt((int)fileFooter.getNumberOfRows(), ordinal++);
+
+ // add file path
+ byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ row.setByteArray(filePathBytes, ordinal++);
+
+ // add pages
+ row.setShort((short) 0, ordinal++);
+
+ // add version number
+ row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+ // add schema updated time
+ row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+ // add blocklet info
+ row.setByteArray(new byte[0], ordinal++);
+
+ row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
+ try {
+ setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
+ ordinal++;
+ // for relative blocklet id. Value is -1 because in case of old store blocklet info will
+ // not be present in the index file and in that case we will not knwo the total number of
+ // blocklets
+ row.setShort((short) -1, ordinal++);
+
+ // store block size
+ row.setLong(blockMetaInfo.getSize(), ordinal);
+ unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return summaryRow;
+ }
+
+ private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, byte[] schemaBinary,
+ byte[] filePath, byte[] fileName, byte[] segmentId) {
+ // write the task summary info to unsafe memory store
+ if (null != summaryRow) {
+ // Add column schema , it is useful to generate segment properties in executor.
+ // So we no need to read footer again there.
+ if (schemaBinary != null) {
+ summaryRow.setByteArray(schemaBinary, SCHEMA);
+ }
+ summaryRow.setByteArray(filePath, INDEX_PATH);
+ summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
+ summaryRow.setByteArray(segmentId, SEGMENTID);
+ try {
+ unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Fill the measures min values with minimum , this is needed for backward version compatability
+ * as older versions don't store min values for measures
+ */
+ private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) {
+ byte[][] updatedValues = minValues;
+ if (minValues.length < minMaxLen.length) {
+ updatedValues = new byte[minMaxLen.length][];
+ System.arraycopy(minValues, 0, updatedValues, 0, minValues.length);
+ List<CarbonMeasure> measures = segmentProperties.getMeasures();
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ for (int i = 0; i < measures.size(); i++) {
+ buffer.rewind();
+ DataType dataType = measures.get(i).getDataType();
+ if (dataType == DataTypes.BYTE) {
+ buffer.putLong(Byte.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ } else if (dataType == DataTypes.SHORT) {
+ buffer.putLong(Short.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ } else if (dataType == DataTypes.INT) {
+ buffer.putLong(Integer.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ } else if (dataType == DataTypes.LONG) {
+ buffer.putLong(Long.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ } else if (DataTypes.isDecimal(dataType)) {
+ updatedValues[minValues.length + i] =
+ DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
+ } else {
+ buffer.putDouble(Double.MIN_VALUE);
+ updatedValues[minValues.length + i] = buffer.array().clone();
+ }
+ }
+ }
+ return updatedValues;
+ }
+
+ /**
+ * Fill the measures max values with maximum , this is needed for backward version compatability
+ * as older versions don't store max values for measures
+ */
+ private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) {
+ byte[][] updatedValues = maxValues;
+ if (maxValues.length < minMaxLen.length) {
+ updatedValues = new byte[minMaxLen.length][];
+ System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length);
+ List<CarbonMeasure> measures = segmentProperties.getMeasures();
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ for (int i = 0; i < measures.size(); i++) {
+ buffer.rewind();
+ DataType dataType = measures.get(i).getDataType();
+ if (dataType == DataTypes.BYTE) {
+ buffer.putLong(Byte.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ } else if (dataType == DataTypes.SHORT) {
+ buffer.putLong(Short.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ } else if (dataType == DataTypes.INT) {
+ buffer.putLong(Integer.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ } else if (dataType == DataTypes.LONG) {
+ buffer.putLong(Long.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ } else if (DataTypes.isDecimal(dataType)) {
+ updatedValues[maxValues.length + i] =
+ DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
+ } else {
+ buffer.putDouble(Double.MAX_VALUE);
+ updatedValues[maxValues.length + i] = buffer.array().clone();
+ }
+ }
+ }
+ return updatedValues;
+ }
+
+ private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema,
+ byte[][] minValues) {
+ CarbonRowSchema[] minSchemas =
+ ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
+ DataMapRow minRow = new DataMapRowImpl(minSchemas);
+ int minOrdinal = 0;
+ // min value adding
+ for (int i = 0; i < minMaxLen.length; i++) {
+ minRow.setByteArray(minValues[i], minOrdinal++);
+ }
+ return minRow;
+ }
+
+ /**
+ * This method will compute min/max values at task level
+ *
+ * @param taskMinMaxRow
+ * @param minMaxLen
+ * @param carbonRowSchema
+ * @param minMaxValue
+ * @param ordinal
+ * @param isMinValueComparison
+ */
+ private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen,
+ CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal,
+ boolean isMinValueComparison) {
+ DataMapRow row = taskMinMaxRow.getRow(ordinal);
+ byte[][] updatedMinMaxValues = minMaxValue;
+ if (null == row) {
+ CarbonRowSchema[] minSchemas =
+ ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
+ row = new DataMapRowImpl(minSchemas);
+ } else {
+ byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal);
+ // Compare and update min max values
+ for (int i = 0; i < minMaxLen.length; i++) {
+ int compare =
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]);
+ if (isMinValueComparison) {
+ if (compare < 0) {
+ updatedMinMaxValues[i] = existingMinMaxValues[i];
+ }
+ } else if (compare > 0) {
+ updatedMinMaxValues[i] = existingMinMaxValues[i];
+ }
+ }
+ }
+ int minMaxOrdinal = 0;
+ // min/max value adding
+ for (int i = 0; i < minMaxLen.length; i++) {
+ row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++);
+ }
+ taskMinMaxRow.setRow(row, ordinal);
+ }
+
+ private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
+ List<CarbonRowSchema> indexSchemas = new ArrayList<>();
+
+ // Index key
+ indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+ getMinMaxSchema(segmentProperties, indexSchemas);
+
+ // for number of rows.
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
+
+ // for table block path
+ indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+
+ // for number of pages.
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+
+ // for version number.
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+
+ // for schema updated time.
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+
+ //for blocklet info
+ indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+
+ // for block footer offset.
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+
+ // for locations
+ indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+
+ // for relative blocklet id i.e. blocklet id that belongs to a particular part file.
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+
+ // for storing block length.
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+
+ unsafeMemoryDMStore =
+ new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
+ }
+
+ /**
+ * Creates the schema to store summary information or the information which can be stored only
+ * once per datamap. It stores datamap level max/min of each column and partition information of
+ * datamap
+ * @param segmentProperties
+ * @throws MemoryException
+ */
+ private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
+ byte[] filePath, byte[] fileName, byte[] segmentId)
+ throws MemoryException {
+ List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
+ getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
+ // for storing column schema
+ taskMinMaxSchemas.add(
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length));
+ // for storing file path
+ taskMinMaxSchemas.add(
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, filePath.length));
+ // for storing file name
+ taskMinMaxSchemas.add(
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, fileName.length));
+ // for storing segmentid
+ taskMinMaxSchemas.add(
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
+ unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
+ taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
+ }
+
+ private void getMinMaxSchema(SegmentProperties segmentProperties,
+ List<CarbonRowSchema> minMaxSchemas) {
+ // Index key
+ int[] minMaxLen = segmentProperties.getColumnsValueSize();
+ // do it 2 times, one for min and one for max.
+ for (int k = 0; k < 2; k++) {
+ CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
+ for (int i = 0; i < minMaxLen.length; i++) {
+ if (minMaxLen[i] <= 0) {
+ mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
+ } else {
+ mapSchemas[i] =
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
+ }
+ }
+ CarbonRowSchema mapSchema =
+ new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+ mapSchemas);
+ minMaxSchemas.add(mapSchema);
+ }
+ }
+
+ @Override
+ public boolean isScanRequired(FilterResolverIntf filterExp) {
+ FilterExecuter filterExecuter =
+ FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+ for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
+ DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
+ boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
+ filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
+ getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
+ if (isScanRequired) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) {
+ if (unsafeMemoryDMStore.getRowCount() == 0) {
+ return new ArrayList<>();
+ }
+ List<Blocklet> blocklets = new ArrayList<>();
+ if (filterExp == null) {
+ int rowCount = unsafeMemoryDMStore.getRowCount();
+ for (int i = 0; i < rowCount; i++) {
+ DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
+ blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)));
+ }
+ } else {
+ // Remove B-tree jump logic as start and end key prepared is not
+ // correct for old store scenarios
+ int startIndex = 0;
+ int endIndex = unsafeMemoryDMStore.getRowCount();
+ FilterExecuter filterExecuter =
+ FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+ while (startIndex < endIndex) {
+ DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
+ int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
+ String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
+ CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ boolean isValid =
+ addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, MAX_VALUES_INDEX),
+ getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, blockletId);
+ if (isValid) {
+ blocklets.add(createBlocklet(safeRow, blockletId));
+ }
+ startIndex++;
+ }
+ }
+ return blocklets;
+ }
+
+ @Override
+ public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+ List<PartitionSpec> partitions) {
+ if (unsafeMemoryDMStore.getRowCount() == 0) {
+ return new ArrayList<>();
+ }
+ // if it has partitioned datamap but there is no partitioned information stored, it means
+ // partitions are dropped so return empty list.
+ if (partitions != null) {
+ // First get the partitions which are stored inside datamap.
+ String[] fileDetails = getFileDetails();
+ // Check the exact match of partition information inside the stored partitions.
+ boolean found = false;
+ Path folderPath = new Path(fileDetails[0]);
+ for (PartitionSpec spec : partitions) {
+ if (folderPath.equals(spec.getLocation()) && isCorrectUUID(fileDetails, spec)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ return new ArrayList<>();
+ }
+ }
+ // Prune with filters if the partitions are existed in this datamap
+ return prune(filterExp, segmentProperties);
+ }
+
+ private boolean isCorrectUUID(String[] fileDetails, PartitionSpec spec) {
+ boolean needToScan = false;
+ if (spec.getUuid() != null) {
+ String[] split = spec.getUuid().split("_");
+ if (split[0].equals(fileDetails[2]) && CarbonTablePath.DataFileUtil
+ .getTimeStampFromFileName(fileDetails[1]).equals(split[1])) {
+ needToScan = true;
+ }
+ } else {
+ needToScan = true;
+ }
+ return needToScan;
+ }
+
+ /**
+ * select the blocks based on column min and max value
+ *
+ * @param filterExecuter
+ * @param maxValue
+ * @param minValue
+ * @param filePath
+ * @param blockletId
+ * @return
+ */
+ private boolean addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, byte[][] maxValue,
+ byte[][] minValue, String filePath, int blockletId) {
+ BitSet bitSet = null;
+ if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
+ String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
+ // this case will come in case of old store where index file does not contain the
+ // blocklet information
+ if (blockletId != -1) {
+ uniqueBlockPath = uniqueBlockPath + CarbonCommonConstants.FILE_SEPARATOR + blockletId;
+ }
+ bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
+ .isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath);
+ } else {
+ bitSet = filterExecuter.isScanRequired(maxValue, minValue);
+ }
+ if (!bitSet.isEmpty()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
+ int index = Integer.parseInt(blockletId);
+ DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
+ return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
+ }
+
+ private byte[][] getMinMaxValue(DataMapRow row, int index) {
+ DataMapRow minMaxRow = row.getRow(index);
+ byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
+ for (int i = 0; i < minMax.length; i++) {
+ minMax[i] = minMaxRow.getByteArray(i);
+ }
+ return minMax;
+ }
+
+ private ExtendedBlocklet createBlocklet(DataMapRow row, int blockletId) {
+ ExtendedBlocklet blocklet = new ExtendedBlocklet(
+ new String(row.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
+ blockletId + "");
+ BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+ detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
+ detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
+ detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
+ detailInfo.setBlockletId((short) blockletId);
+ detailInfo.setDimLens(columnCardinality);
+ detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
+ byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
+ BlockletInfo blockletInfo = null;
+ try {
+ if (byteArray.length > 0) {
+ blockletInfo = new BlockletInfo();
+ ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
+ DataInputStream inputStream = new DataInputStream(stream);
+ blockletInfo.readFields(inputStream);
+ inputStream.close();
+ }
+ blocklet.setLocation(
+ new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
+ .split(","));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ detailInfo.setBlockletInfo(blockletInfo);
+ blocklet.setDetailInfo(detailInfo);
+ detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
+ detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
+ detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH));
+ return blocklet;
+ }
+
+ private String[] getFileDetails() {
+ try {
+ String[] fileDetails = new String[3];
+ DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+ fileDetails[0] =
+ new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
+ fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
+ CarbonCommonConstants.DEFAULT_CHARSET);
+ fileDetails[2] = new String(unsafeRow.getByteArray(SEGMENTID),
+ CarbonCommonConstants.DEFAULT_CHARSET);
+ return fileDetails;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /**
+ * Binary search used to get the first tentative index row based on
+ * search key
+ *
+ * @param key search key
+ * @return first tentative block
+ */
+ private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+ int childNodeIndex;
+ int low = 0;
+ int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int mid = 0;
+ int compareRes = -1;
+ //
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+ // compare the entries
+ compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ if (compareRes < 0) {
+ high = mid - 1;
+ } else if (compareRes > 0) {
+ low = mid + 1;
+ } else {
+ // if key is matched then get the first entry
+ int currentPos = mid;
+ while (currentPos - 1 >= 0
+ && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+ currentPos--;
+ }
+ mid = currentPos;
+ break;
+ }
+ }
+ // if compare result is less than zero then we
+ // and mid is more than 0 then we need to previous block as duplicates
+ // record can be present
+ if (compareRes < 0) {
+ if (mid > 0) {
+ mid--;
+ }
+ childNodeIndex = mid;
+ } else {
+ childNodeIndex = mid;
+ }
+ // get the leaf child
+ return childNodeIndex;
+ }
+
+ /**
+ * Binary search used to get the last tentative block based on
+ * search key
+ *
+ * @param key search key
+ * @return first tentative block
+ */
+ private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+ int childNodeIndex;
+ int low = 0;
+ int high = unsafeMemoryDMStore.getRowCount() - 1;
+ int mid = 0;
+ int compareRes = -1;
+ //
+ while (low <= high) {
+ mid = (low + high) >>> 1;
+ // compare the entries
+ compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+ if (compareRes < 0) {
+ high = mid - 1;
+ } else if (compareRes > 0) {
+ low = mid + 1;
+ } else {
+ int currentPos = mid;
+ // if key is matched then get the first entry
+ while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
+ && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+ currentPos++;
+ }
+ mid = currentPos;
+ break;
+ }
+ }
+ // if compare result is less than zero then we
+ // and mid is more than 0 then we need to previous block as duplicates
+ // record can be present
+ if (compareRes < 0) {
+ if (mid > 0) {
+ mid--;
+ }
+ childNodeIndex = mid;
+ } else {
+ childNodeIndex = mid;
+ }
+ return childNodeIndex;
+ }
+
+ private DataMapRow convertToRow(IndexKey key) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8);
+ buffer.putInt(key.getDictionaryKeys().length);
+ buffer.putInt(key.getNoDictionaryKeys().length);
+ buffer.put(key.getDictionaryKeys());
+ buffer.put(key.getNoDictionaryKeys());
+ DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+ dataMapRow.setByteArray(buffer.array(), 0);
+ return dataMapRow;
+ }
+
+ private byte[] getColumnSchemaBinary() {
+ DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+ return unsafeRow.getByteArray(SCHEMA);
+ }
+
+ /**
+ * Convert schema to binary
+ */
+ private byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(stream);
+ dataOutput.writeShort(columnSchemas.size());
+ for (ColumnSchema columnSchema : columnSchemas) {
+ if (columnSchema.getColumnReferenceId() == null) {
+ columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId());
+ }
+ columnSchema.write(dataOutput);
+ }
+ byte[] byteArray = stream.toByteArray();
+ // Compress with snappy to reduce the size of schema
+ return Snappy.rawCompress(byteArray, byteArray.length);
+ }
+
+ @Override
+ public void clear() {
+ if (unsafeMemoryDMStore != null) {
+ unsafeMemoryDMStore.freeMemory();
+ unsafeMemoryDMStore = null;
+ segmentProperties = null;
+ }
+ // clear task min/max unsafe memory
+ if (null != unsafeMemorySummaryDMStore) {
+ unsafeMemorySummaryDMStore.freeMemory();
+ unsafeMemorySummaryDMStore = null;
+ }
+ }
+
+ @Override
+ public long getFileTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public int getAccessCount() {
+ return 0;
+ }
+
+ @Override
+ public long getMemorySize() {
+ long memoryUsed = 0L;
+ if (unsafeMemoryDMStore != null) {
+ memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+ }
+ if (null != unsafeMemorySummaryDMStore) {
+ memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
+ }
+ return memoryUsed;
+ }
+
+ public SegmentProperties getSegmentProperties() {
+ return segmentProperties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
new file mode 100644
index 0000000..f1b97a4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.IndexDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * Table map for blocklet
+ */
+public class BlockletIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory
+ implements BlockletDetailsFetcher, SegmentPropertiesFetcher {
+
+ private static final String NAME = "clustered.btree.blocklet";
+
+ public static final DataMapSchema DATA_MAP_SCHEMA =
+ new DataMapSchema(NAME, BlockletIndexDataMapFactory.class.getName());
+
+ private AbsoluteTableIdentifier identifier;
+
+ // segmentId -> list of index file
+ private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+
+ private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainIndexDataMap> cache;
+
+ @Override
+ public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
+ this.identifier = identifier;
+ cache = CacheProvider.getInstance()
+ .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
+ }
+
+ @Override
+ public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ @Override
+ public List<AbstractCoarseGrainIndexDataMap> getDataMaps(Segment segment) throws IOException {
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ getTableBlockIndexUniqueIdentifiers(segment);
+ return cache.getAll(tableBlockIndexUniqueIdentifiers);
+ }
+
+ private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
+ Segment segment) throws IOException {
+ List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+ segmentMap.get(segment.getSegmentNo());
+ if (tableBlockIndexUniqueIdentifiers == null) {
+ tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+ Map<String, String> indexFiles;
+ if (segment.getSegmentFileName() == null) {
+ String path =
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+ indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+ } else {
+ SegmentFileStore fileStore =
+ new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+ indexFiles = fileStore.getIndexFiles();
+ }
+ for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
+ Path indexFile = new Path(indexFileEntry.getKey());
+ tableBlockIndexUniqueIdentifiers.add(
+ new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
+ indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo()));
+ }
+ segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
+ }
+ return tableBlockIndexUniqueIdentifiers;
+ }
+
+ /**
+ * Get the blocklet detail information based on blockletid, blockid and segmentid. This method is
+ * exclusively for BlockletIndexDataMapFactory as detail information is only available in this
+ * default datamap.
+ */
+ @Override
+ public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
+ throws IOException {
+ List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
+ // If it is already detailed blocklet then type cast and return same
+ if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) {
+ for (Blocklet blocklet : blocklets) {
+ detailedBlocklets.add((ExtendedBlocklet) blocklet);
+ }
+ return detailedBlocklets;
+ }
+ List<TableBlockIndexUniqueIdentifier> identifiers =
+ getTableBlockIndexUniqueIdentifiers(segment);
+ // Retrieve each blocklets detail information from blocklet datamap
+ for (Blocklet blocklet : blocklets) {
+ detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
+ }
+ return detailedBlocklets;
+ }
+
+ @Override
+ public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
+ throws IOException {
+ if (blocklet instanceof ExtendedBlocklet) {
+ return (ExtendedBlocklet) blocklet;
+ }
+ List<TableBlockIndexUniqueIdentifier> identifiers =
+ getTableBlockIndexUniqueIdentifiers(segment);
+ return getExtendedBlocklet(identifiers, blocklet);
+ }
+
+ private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
+ Blocklet blocklet) throws IOException {
+ String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
+ for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
+ if (identifier.getIndexFilePath().equals(carbonIndexFileName)) {
+ IndexDataMap dataMap = cache.get(identifier);
+ return ((BlockletIndexDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+ }
+ }
+ throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found ");
+ }
+
+
+ @Override
+ public List<DataMapDistributable> toDistributable(Segment segment) {
+ List<DataMapDistributable> distributables = new ArrayList<>();
+ try {
+ CarbonFile[] carbonIndexFiles;
+ if (segment.getSegmentFileName() == null) {
+ carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
+ } else {
+ SegmentFileStore fileStore =
+ new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+ Map<String, String> indexFiles = fileStore.getIndexFiles();
+ carbonIndexFiles = new CarbonFile[indexFiles.size()];
+ int i = 0;
+ for (String indexFile : indexFiles.keySet()) {
+ carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
+ }
+ }
+ for (int i = 0; i < carbonIndexFiles.length; i++) {
+ Path path = new Path(carbonIndexFiles[i].getPath());
+
+ FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+ LocatedFileStatus fileStatus = iter.next();
+ String[] location = fileStatus.getBlockLocations()[0].getHosts();
+ BlockletDataMapDistributable distributable =
+ new BlockletDataMapDistributable(path.toString());
+ distributable.setLocations(location);
+ distributables.add(distributable);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return distributables;
+ }
+
+ @Override
+ public void fireEvent(Event event) {
+
+ }
+
+ @Override
+ public void clear(Segment segment) {
+ List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
+ if (blockIndexes != null) {
+ for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
+ IndexDataMap indexDataMap = cache.getIfPresent(blockIndex);
+ if (indexDataMap != null) {
+ cache.invalidate(blockIndex);
+ indexDataMap.clear();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void clear() {
+ for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
+ clear(new Segment(segmentId, null));
+ }
+ }
+
+ @Override
+ public List<AbstractCoarseGrainIndexDataMap> getDataMaps(DataMapDistributable distributable)
+ throws IOException {
+ BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
+ List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
+ Path indexPath = new Path(mapDistributable.getFilePath());
+ String segmentNo = mapDistributable.getSegment().getSegmentNo();
+ if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ String parent = indexPath.getParent().toString();
+ identifiers
+ .add(new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo));
+ } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString());
+ String parentPath = carbonFile.getParentFile().getAbsolutePath();
+ List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath());
+ for (String indexFile : indexFiles) {
+ identifiers.add(
+ new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(),
+ segmentNo));
+ }
+ }
+ List<AbstractCoarseGrainIndexDataMap> dataMaps;
+ try {
+ dataMaps = cache.getAll(identifiers);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return dataMaps;
+ }
+
+ @Override
+ public DataMapMeta getMeta() {
+ // TODO: pass SORT_COLUMNS into this class
+ return null;
+ }
+
+ @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
+ List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segment);
+ assert (dataMaps.size() > 0);
+ AbstractCoarseGrainIndexDataMap coarseGrainDataMap = dataMaps.get(0);
+ assert (coarseGrainDataMap instanceof BlockletIndexDataMap);
+ BlockletIndexDataMap dataMap = (BlockletIndexDataMap) coarseGrainDataMap;
+ return dataMap.getSegmentProperties();
+ }
+
+ @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
+ throws IOException {
+ List<Blocklet> blocklets = new ArrayList<>();
+ List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segment);
+ for (AbstractCoarseGrainIndexDataMap dataMap : dataMaps) {
+ blocklets.addAll(dataMap.prune(null, getSegmentProperties(segment), partitions));
+ }
+ return blocklets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 4d54ad5..721845d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -768,7 +768,15 @@ public class CarbonTable implements Serializable {
}
/**
- * whether this table has aggregation DataMap or not
+ * Return true if 'autoRefreshDataMap' is enabled, by default it is enabled
+ */
+ public boolean isAutoRefreshDataMap() {
+ String refresh = getTableInfo().getFactTable().getTableProperties().get("autoRefreshDataMap");
+ return refresh == null || refresh.equalsIgnoreCase("true");
+ }
+
+ /**
+ * whether this table has aggregation IndexDataMap or not
*/
public boolean hasAggregationDataMap() {
List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 5a9017b..ae49467 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -32,6 +32,7 @@ public class DataMapSchema implements Serializable, Writable {
protected String dataMapName;
+ // this name can be class name of the DataMapProvider implementation or short name of it
private String className;
protected RelationIdentifier relationIdentifier;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
index d0c7386..1c6ebad 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
@@ -24,15 +24,16 @@ public class DataMapSchemaFactory {
/**
* Below class will be used to get data map schema object
* based on class name
- * @param className
+ * @param providerName
* @return data map schema
*/
- public DataMapSchema getDataMapSchema(String dataMapName, String className) {
- if (DataMapProvider.PREAGGREGATE.getClassName().equals(className) ||
- DataMapProvider.TIMESERIES.getClassName().equals(className)) {
- return new AggregationDataMapSchema(dataMapName, className);
+ public DataMapSchema getDataMapSchema(String dataMapName, String providerName) {
+ if (providerName.equalsIgnoreCase(DataMapProvider.PREAGGREGATE.toString())) {
+ return new AggregationDataMapSchema(dataMapName, providerName);
+ } else if (providerName.equalsIgnoreCase(DataMapProvider.TIMESERIES.toString())) {
+ return new AggregationDataMapSchema(dataMapName, providerName);
} else {
- return new DataMapSchema(dataMapName, className);
+ return new DataMapSchema(dataMapName, providerName);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index fff1a74..5d79abc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -285,8 +285,7 @@ public class TableSchema implements Serializable, Writable {
// only = is allowed as special character , so replace with &
CarbonCommonConstants.DEFAULT_CHARSET)).replace("=","&"));
properties.put("QUERYTYPE", queryType);
- DataMapSchema dataMapSchema =
- new DataMapSchema(dataMapName, className);
+ DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className);
dataMapSchema.setProperties(properties);
dataMapSchema.setChildSchema(this);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
deleted file mode 100644
index 88ac3ed..0000000
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.carbondata.core.indexstore.blockletindex;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
-import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
-import org.apache.carbondata.core.util.ByteUtil;
-
-import mockit.Mock;
-import mockit.MockUp;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestBlockletDataMap extends AbstractDictionaryCacheTest {
-
- ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor;
- @Before public void setUp() throws Exception {
- CarbonImplicitDimension carbonImplicitDimension =
- new CarbonImplicitDimension(0, CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID);
- DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new DimColumnResolvedFilterInfo();
- dimColumnEvaluatorInfo.setColumnIndex(0);
- dimColumnEvaluatorInfo.setRowIndex(0);
- dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension);
- dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false);
- implicitIncludeFilterExecutor =
- new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo);
- }
-
- @Test public void testaddBlockBasedOnMinMaxValue() throws Exception {
-
- new MockUp<ImplicitIncludeFilterExecutorImpl>() {
- @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue,
- String uniqueBlockPath) {
- BitSet bitSet = new BitSet(1);
- bitSet.set(8);
- return bitSet;
- }
- };
-
- BlockletDataMap blockletDataMap = new BlockletDataMap();
- Method method = BlockletDataMap.class
- .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class,
- byte[][].class, String.class, int.class);
- method.setAccessible(true);
-
- byte[][] minValue = { ByteUtil.toBytes("sfds") };
- byte[][] maxValue = { ByteUtil.toBytes("resa") };
- Object result = method
- .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, maxValue,
- "/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata",
- 0);
- assert ((boolean) result);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
new file mode 100644
index 0000000..16048db
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java
@@ -0,0 +1,59 @@
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.lang.reflect.Method;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlockletIndexDataMap extends AbstractDictionaryCacheTest {
+
+ ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor;
+ @Before public void setUp() throws Exception {
+ CarbonImplicitDimension carbonImplicitDimension =
+ new CarbonImplicitDimension(0, CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID);
+ DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new DimColumnResolvedFilterInfo();
+ dimColumnEvaluatorInfo.setColumnIndex(0);
+ dimColumnEvaluatorInfo.setRowIndex(0);
+ dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension);
+ dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false);
+ implicitIncludeFilterExecutor =
+ new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo);
+ }
+
+ @Test public void testaddBlockBasedOnMinMaxValue() throws Exception {
+
+ new MockUp<ImplicitIncludeFilterExecutorImpl>() {
+ @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue,
+ String uniqueBlockPath) {
+ BitSet bitSet = new BitSet(1);
+ bitSet.set(8);
+ return bitSet;
+ }
+ };
+
+ BlockletIndexDataMap blockletDataMap = new BlockletIndexDataMap();
+ Method method = BlockletIndexDataMap.class
+ .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class,
+ byte[][].class, String.class, int.class);
+ method.setAccessible(true);
+
+ byte[][] minValue = { ByteUtil.toBytes("sfds") };
+ byte[][] maxValue = { ByteUtil.toBytes("resa") };
+ Object result = method
+ .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, maxValue,
+ "/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata",
+ 0);
+ assert ((boolean) result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
deleted file mode 100644
index 75dac9e..0000000
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.examples;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.row.DataMapRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-import com.google.gson.Gson;
-
-/**
- * Datamap implementation for min max blocklet.
- */
-public class MinMaxDataMap extends AbstractCoarseGrainDataMap {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(MinMaxDataMap.class.getName());
-
- public static final String NAME = "clustered.minmax.btree.blocklet";
-
- private String filePath;
-
- private MinMaxIndexBlockDetails[] readMinMaxDataMap;
-
- @Override
- public void init(DataMapModel model) throws MemoryException, IOException {
- this.filePath = model.getFilePath();
- CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
- for (int i = 0; i < listFiles.length; i++) {
- readMinMaxDataMap = readJson(listFiles[i].getPath());
- }
- }
-
- private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String segmentId) {
- String path = filePath.substring(0, filePath.lastIndexOf("/") + 1);
- CarbonFile carbonFile = FileFactory.getCarbonFile(path);
- return carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
- return file.getName().endsWith(".minmaxindex");
- }
- });
- }
-
- private MinMaxIndexBlockDetails[] readJson(String filePath) {
- Gson gsonObjectToRead = new Gson();
- DataInputStream dataInputStream = null;
- BufferedReader buffReader = null;
- InputStreamReader inStream = null;
- MinMaxIndexBlockDetails[] readMinMax = null;
- AtomicFileOperations fileOperation =
- new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath));
-
- try {
- if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) {
- return null;
- }
- dataInputStream = fileOperation.openForRead();
- inStream = new InputStreamReader(dataInputStream, "UTF-8");
- buffReader = new BufferedReader(inStream);
- readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class);
- } catch (IOException e) {
- return null;
- } finally {
- CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
- }
- return readMinMax;
- }
-
- /**
- * Block Prunning logic for Min Max DataMap.
- *
- * @param filterExp
- * @param segmentProperties
- * @return
- */
- @Override
- public List<Blocklet> prune(FilterResolverIntf filterExp,
- SegmentProperties segmentProperties, List<PartitionSpec> partitions) {
- List<Blocklet> blocklets = new ArrayList<>();
-
- if (filterExp == null) {
- for (int i = 0; i < readMinMaxDataMap.length; i++) {
- blocklets.add(new Blocklet(filePath, String.valueOf(readMinMaxDataMap[i].getBlockletId())));
- }
- } else {
- FilterExecuter filterExecuter =
- FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
- int startIndex = 0;
- while (startIndex < readMinMaxDataMap.length) {
- BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
- readMinMaxDataMap[startIndex].getMinValues());
- if (!bitSet.isEmpty()) {
- blocklets.add(new Blocklet(filePath,
- String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
- }
- startIndex++;
- }
- }
- return blocklets;
- }
-
- @Override
- public boolean isScanRequired(FilterResolverIntf filterExp) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void clear() {
- readMinMaxDataMap = null;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
deleted file mode 100644
index 9a67644..0000000
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.examples;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.core.datamap.DataMapDistributable;
-import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
-import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.events.Event;
-
-/**
- * Min Max DataMap Factory
- */
-public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
-
- private AbsoluteTableIdentifier identifier;
-
- @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
- this.identifier = identifier;
- }
-
- /**
- * createWriter will return the MinMaxDataWriter.
- *
- * @param segment
- * @return
- */
- @Override public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
- return new MinMaxDataWriter(identifier, segment, writeDirectoryPath);
- }
-
- /**
- * getDataMaps Factory method Initializes the Min Max Data Map and returns.
- *
- * @param segment
- * @return
- * @throws IOException
- */
- @Override public List<AbstractCoarseGrainDataMap> getDataMaps(Segment segment)
- throws IOException {
- List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>();
- // Form a dataMap of Type MinMaxDataMap.
- MinMaxDataMap dataMap = new MinMaxDataMap();
- try {
- dataMap.init(new DataMapModel(
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())));
- } catch (MemoryException ex) {
- throw new IOException(ex);
- }
- dataMapList.add(dataMap);
- return dataMapList;
- }
-
- /**
- * @param segment
- * @return
- */
- @Override public List<DataMapDistributable> toDistributable(Segment segment) {
- return null;
- }
-
- /**
- * Clear the DataMap.
- *
- * @param segment
- */
- @Override public void clear(Segment segment) {
- }
-
- /**
- * Clearing the data map.
- */
- @Override public void clear() {
- }
-
- @Override public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
- throws IOException {
- return null;
- }
-
- @Override public void fireEvent(Event event) {
-
- }
-
- @Override public DataMapMeta getMeta() {
- return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
- new ArrayList<ExpressionType>());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
new file mode 100644
index 0000000..8569081
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.gson.Gson;
+
+/**
+ * Datamap implementation for min max blocklet.
+ */
+public class MinMaxIndexDataMap extends AbstractCoarseGrainIndexDataMap {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName());
+
+ public static final String NAME = "clustered.minmax.btree.blocklet";
+
+ private String filePath;
+
+ private MinMaxIndexBlockDetails[] readMinMaxDataMap;
+
+ @Override
+ public void init(DataMapModel model) throws MemoryException, IOException {
+ this.filePath = model.getFilePath();
+ CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
+ for (int i = 0; i < listFiles.length; i++) {
+ readMinMaxDataMap = readJson(listFiles[i].getPath());
+ }
+ }
+
+ private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String segmentId) {
+ String path = filePath.substring(0, filePath.lastIndexOf("/") + 1);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(path);
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(".minmaxindex");
+ }
+ });
+ }
+
+ private MinMaxIndexBlockDetails[] readJson(String filePath) {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ MinMaxIndexBlockDetails[] readMinMax = null;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath));
+
+ try {
+ if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream, "UTF-8");
+ buffReader = new BufferedReader(inStream);
+ readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class);
+ } catch (IOException e) {
+ return null;
+ } finally {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ return readMinMax;
+ }
+
+ /**
+ * Block Prunning logic for Min Max DataMap.
+ *
+ * @param filterExp
+ * @param segmentProperties
+ * @return
+ */
+ @Override
+ public List<Blocklet> prune(FilterResolverIntf filterExp,
+ SegmentProperties segmentProperties, List<PartitionSpec> partitions) {
+ List<Blocklet> blocklets = new ArrayList<>();
+
+ if (filterExp == null) {
+ for (int i = 0; i < readMinMaxDataMap.length; i++) {
+ blocklets.add(new Blocklet(filePath, String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+ }
+ } else {
+ FilterExecuter filterExecuter =
+ FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+ int startIndex = 0;
+ while (startIndex < readMinMaxDataMap.length) {
+ BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
+ readMinMaxDataMap[startIndex].getMinValues());
+ if (!bitSet.isEmpty()) {
+ blocklets.add(new Blocklet(filePath,
+ String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
+ }
+ startIndex++;
+ }
+ }
+ return blocklets;
+ }
+
+ @Override
+ public boolean isScanRequired(FilterResolverIntf filterExp) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ readMinMaxDataMap = null;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
new file mode 100644
index 0000000..9624766
--- /dev/null
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
+
+/**
+ * Min Max DataMap Factory
+ */
+public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory {
+
+ private AbsoluteTableIdentifier identifier;
+
+ @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
+ this.identifier = identifier;
+ }
+
+ /**
+ * createWriter will return the MinMaxDataWriter.
+ *
+ * @param segment
+ * @return
+ */
+ @Override public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+ return new MinMaxDataWriter(identifier, segment, writeDirectoryPath);
+ }
+
+ /**
+ * getDataMaps Factory method Initializes the Min Max Data Map and returns.
+ *
+ * @param segment
+ * @return
+ * @throws IOException
+ */
+ @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(Segment segment)
+ throws IOException {
+ List<AbstractCoarseGrainIndexDataMap> dataMapList = new ArrayList<>();
+ // Form a dataMap of Type MinMaxIndexDataMap.
+ MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
+ try {
+ dataMap.init(new DataMapModel(
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())));
+ } catch (MemoryException ex) {
+ throw new IOException(ex);
+ }
+ dataMapList.add(dataMap);
+ return dataMapList;
+ }
+
+ /**
+ * @param segment
+ * @return
+ */
+ @Override public List<DataMapDistributable> toDistributable(Segment segment) {
+ return null;
+ }
+
+ /**
+ * Clear the DataMap.
+ *
+ * @param segment
+ */
+ @Override public void clear(Segment segment) {
+ }
+
+ /**
+ * Clearing the data map.
+ */
+ @Override public void clear() {
+ }
+
+ @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(DataMapDistributable distributable)
+ throws IOException {
+ return null;
+ }
+
+ @Override public void fireEvent(Event event) {
+
+ }
+
+ @Override public DataMapMeta getMeta() {
+ return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
+ new ArrayList<ExpressionType>());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
index 0cfe410..59872aa 100644
--- a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
+++ b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
@@ -52,8 +52,8 @@ object MinMaxDataMapExample {
// register datamap writer
DataMapStoreManager.getInstance().createAndRegisterDataMap(
AbsoluteTableIdentifier.from(storeLocation, "default", "carbonminmax"),
- classOf[MinMaxDataMapFactory].getName,
- MinMaxDataMap.NAME)
+ classOf[MinMaxIndexDataMapFactory].getName,
+ MinMaxIndexDataMap.NAME)
spark.sql("DROP TABLE IF EXISTS carbonminmax")