You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by manishgupta88 <gi...@git.apache.org> on 2018/07/02 15:25:51 UTC
[GitHub] carbondata pull request #2437: [WIP] segregate block and blocklet cache
GitHub user manishgupta88 opened a pull request:
https://github.com/apache/carbondata/pull/2437
[WIP] segregate block and blocklet cache
segregate block and blocklet cache
- [ ] Any interfaces changed?
- [ ] Any backward compatibility impacted?
- [ ] Document update required?
- [ ] Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/manishgupta88/carbondata block_blocklet_cache
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/2437.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2437
----
commit 8c8215da55a687f4debadd56fa9d5356729d3331
Author: manishgupta88 <to...@...>
Date: 2018-06-25T06:43:00Z
segregate block and blocklet cache
----
---
[GitHub] carbondata pull request #2437: [CARBONDATA-2645] Segregate block and blockle...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/2437
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5580/
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5567/
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6728/
---
[GitHub] carbondata issue #2437: [CARBONDATA-2645] Segregate block and blocklet cache
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2437
retest this please
---
[GitHub] carbondata pull request #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200156093
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---
@@ -221,438 +159,71 @@ private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
CarbonRowSchema[] schema = memoryDMStore.getSchema();
// Add one row to maintain task level min max for segment pruning
if (!blockletList.isEmpty() && summaryRow == null) {
- summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
+ summaryRow = new DataMapRowImpl(taskSummaryDMStore.getSchema().length);
}
for (int index = 0; index < blockletList.size(); index++) {
- DataMapRow row = new DataMapRowImpl(schema);
+ DataMapRow row = new DataMapRowImpl(schema.length);
int ordinal = 0;
int taskMinMaxOrdinal = 0;
BlockletInfo blockletInfo = blockletList.get(index);
-
- // add start key as index key
- row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
-
BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
- byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
- row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal);
// compute and set task level min values
- addTaskMinMaxValues(summaryRow, minMaxLen,
- summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
- TASK_MIN_VALUES_INDEX, true);
+ addTaskMinMaxValues(summaryRow, minMaxLen, taskSummaryDMStore.getSchema(), taskMinMaxOrdinal,
+ minMaxIndex.getMinValues(), BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX, true);
ordinal++;
taskMinMaxOrdinal++;
- byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
- row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal);
// compute and set task level max values
- addTaskMinMaxValues(summaryRow, minMaxLen,
- summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
- TASK_MAX_VALUES_INDEX, false);
+ addTaskMinMaxValues(summaryRow, minMaxLen, taskSummaryDMStore.getSchema(), taskMinMaxOrdinal,
+ minMaxIndex.getMaxValues(), BlockletDataMapRowIndexes.TASK_MAX_VALUES_INDEX, false);
ordinal++;
-
row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
-
// add file path
byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
row.setByteArray(filePathBytes, ordinal++);
-
- // add pages
- row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
-
// add version number
row.setShort(fileFooter.getVersionId().number(), ordinal++);
-
// add schema updated time
row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
-
- // add blocklet info
byte[] serializedData;
try {
+ // Add block footer offset, it is used if we need to read footer of block
+ row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
+ setLocations(blockMetaInfo.getLocationInfo(), row, ordinal++);
+ // Store block size
+ row.setLong(blockMetaInfo.getSize(), ordinal++);
+ // add blocklet info
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DataOutput dataOutput = new DataOutputStream(stream);
blockletInfo.write(dataOutput);
serializedData = stream.toByteArray();
row.setByteArray(serializedData, ordinal++);
- // Add block footer offset, it is used if we need to read footer of block
- row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
- setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
- ordinal++;
- // for relative blockelt id i.e blocklet id that belongs to a particular part file
- row.setShort((short) relativeBlockletId++, ordinal++);
- // Store block size
- row.setLong(blockMetaInfo.getSize(), ordinal);
+ // add pages
+ row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+ // for relative blocklet id i.e blocklet id that belongs to a particular carbondata file
+ row.setShort((short) relativeBlockletId++, ordinal);
memoryDMStore.addIndexRow(row);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
-
return summaryRow;
}
- private void setLocations(String[] locations, DataMapRow row, int ordinal)
- throws UnsupportedEncodingException {
- // Add location info
- String locationStr = StringUtils.join(locations, ',');
- row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), ordinal);
- }
-
- /**
- * Load information for the block.It is the case can happen only for old stores
- * where blocklet information is not available in index file. So load only block information
- * and read blocklet information in executor.
- */
- private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter,
- SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow,
- BlockMetaInfo blockMetaInfo) {
- int[] minMaxLen = segmentProperties.getColumnsValueSize();
- BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
- CarbonRowSchema[] schema = memoryDMStore.getSchema();
- // Add one row to maintain task level min max for segment pruning
- if (summaryRow == null) {
- summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
- }
- DataMapRow row = new DataMapRowImpl(schema);
- int ordinal = 0;
- int taskMinMaxOrdinal = 0;
- // add start key as index key
- row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++);
-
- BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
- byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
- byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
- // update min max values in case of old store
- byte[][] updatedMinValues =
- CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, true);
- byte[][] updatedMaxValues =
- CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false);
- row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal);
- // compute and set task level min values
- addTaskMinMaxValues(summaryRow, minMaxLen,
- summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
- TASK_MIN_VALUES_INDEX, true);
- ordinal++;
- taskMinMaxOrdinal++;
- row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal);
- // compute and set task level max values
- addTaskMinMaxValues(summaryRow, minMaxLen,
- summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
- TASK_MAX_VALUES_INDEX, false);
- ordinal++;
-
- row.setInt((int)fileFooter.getNumberOfRows(), ordinal++);
-
- // add file path
- byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
- row.setByteArray(filePathBytes, ordinal++);
-
- // add pages
- row.setShort((short) 0, ordinal++);
-
- // add version number
- row.setShort(fileFooter.getVersionId().number(), ordinal++);
-
- // add schema updated time
- row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
-
- // add blocklet info
- row.setByteArray(new byte[0], ordinal++);
-
- row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
- try {
- setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
- ordinal++;
- // for relative blocklet id. Value is -1 because in case of old store blocklet info will
- // not be present in the index file and in that case we will not knwo the total number of
- // blocklets
- row.setShort((short) -1, ordinal++);
-
- // store block size
- row.setLong(blockMetaInfo.getSize(), ordinal);
- memoryDMStore.addIndexRow(row);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return summaryRow;
- }
-
- private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, byte[] schemaBinary,
- byte[] filePath, byte[] fileName, byte[] segmentId) {
- // write the task summary info to unsafe memory store
- if (null != summaryRow) {
- // Add column schema , it is useful to generate segment properties in executor.
- // So we no need to read footer again there.
- if (schemaBinary != null) {
- summaryRow.setByteArray(schemaBinary, SCHEMA);
- }
- summaryRow.setByteArray(filePath, INDEX_PATH);
- summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
- summaryRow.setByteArray(segmentId, SEGMENTID);
- try {
- summaryDMStore.addIndexRow(summaryRow);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * Fill the measures min values with minimum , this is needed for backward version compatability
- * as older versions don't store min values for measures
- */
- private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) {
- byte[][] updatedValues = minValues;
- if (minValues.length < minMaxLen.length) {
- updatedValues = new byte[minMaxLen.length][];
- System.arraycopy(minValues, 0, updatedValues, 0, minValues.length);
- List<CarbonMeasure> measures = segmentProperties.getMeasures();
- ByteBuffer buffer = ByteBuffer.allocate(8);
- for (int i = 0; i < measures.size(); i++) {
- buffer.rewind();
- DataType dataType = measures.get(i).getDataType();
- if (dataType == DataTypes.BYTE) {
- buffer.putLong(Byte.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.SHORT) {
- buffer.putLong(Short.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.INT) {
- buffer.putLong(Integer.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.LONG) {
- buffer.putLong(Long.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- } else if (DataTypes.isDecimal(dataType)) {
- updatedValues[minValues.length + i] =
- DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
- } else {
- buffer.putDouble(Double.MIN_VALUE);
- updatedValues[minValues.length + i] = buffer.array().clone();
- }
- }
- }
- return updatedValues;
- }
-
- /**
- * Fill the measures max values with maximum , this is needed for backward version compatability
- * as older versions don't store max values for measures
- */
- private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) {
- byte[][] updatedValues = maxValues;
- if (maxValues.length < minMaxLen.length) {
- updatedValues = new byte[minMaxLen.length][];
- System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length);
- List<CarbonMeasure> measures = segmentProperties.getMeasures();
- ByteBuffer buffer = ByteBuffer.allocate(8);
- for (int i = 0; i < measures.size(); i++) {
- buffer.rewind();
- DataType dataType = measures.get(i).getDataType();
- if (dataType == DataTypes.BYTE) {
- buffer.putLong(Byte.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.SHORT) {
- buffer.putLong(Short.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.INT) {
- buffer.putLong(Integer.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- } else if (dataType == DataTypes.LONG) {
- buffer.putLong(Long.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- } else if (DataTypes.isDecimal(dataType)) {
- updatedValues[maxValues.length + i] =
- DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
- } else {
- buffer.putDouble(Double.MAX_VALUE);
- updatedValues[maxValues.length + i] = buffer.array().clone();
- }
- }
- }
- return updatedValues;
- }
-
- private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema,
- byte[][] minValues) {
- CarbonRowSchema[] minSchemas =
- ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
- DataMapRow minRow = new DataMapRowImpl(minSchemas);
- int minOrdinal = 0;
- // min value adding
- for (int i = 0; i < minMaxLen.length; i++) {
- minRow.setByteArray(minValues[i], minOrdinal++);
- }
- return minRow;
- }
-
- /**
- * This method will compute min/max values at task level
- *
- * @param taskMinMaxRow
- * @param minMaxLen
- * @param carbonRowSchema
- * @param minMaxValue
- * @param ordinal
- * @param isMinValueComparison
- */
- private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen,
- CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal,
- boolean isMinValueComparison) {
- DataMapRow row = taskMinMaxRow.getRow(ordinal);
- byte[][] updatedMinMaxValues = minMaxValue;
- if (null == row) {
- CarbonRowSchema[] minSchemas =
- ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
- row = new DataMapRowImpl(minSchemas);
- } else {
- byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal);
- // Compare and update min max values
- for (int i = 0; i < minMaxLen.length; i++) {
- int compare =
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]);
- if (isMinValueComparison) {
- if (compare < 0) {
- updatedMinMaxValues[i] = existingMinMaxValues[i];
- }
- } else if (compare > 0) {
- updatedMinMaxValues[i] = existingMinMaxValues[i];
- }
- }
- }
- int minMaxOrdinal = 0;
- // min/max value adding
- for (int i = 0; i < minMaxLen.length; i++) {
- row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++);
- }
- taskMinMaxRow.setRow(row, ordinal);
- }
-
- private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe)
- throws MemoryException {
- List<CarbonRowSchema> indexSchemas = new ArrayList<>();
-
- // Index key
- indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
- getMinMaxSchema(segmentProperties, indexSchemas);
-
- // for number of rows.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
-
- // for table block path
- indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
- // for number of pages.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
- // for version number.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
- // for schema updated time.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
- //for blocklet info
- indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
- // for block footer offset.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
- // for locations
- indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
- // for relative blocklet id i.e. blocklet id that belongs to a particular part file.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
- // for storing block length.
- indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
- CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
- memoryDMStore = getMemoryDMStore(schema, addToUnsafe);
- }
-
- /**
- * Creates the schema to store summary information or the information which can be stored only
- * once per datamap. It stores datamap level max/min of each column and partition information of
- * datamap
- * @param segmentProperties
- * @throws MemoryException
- */
- private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
- byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe)
- throws MemoryException {
- List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
- getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
- // for storing column schema
- taskMinMaxSchemas.add(
- new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length));
- // for storing file path
- taskMinMaxSchemas.add(
- new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, filePath.length));
- // for storing file name
- taskMinMaxSchemas.add(
- new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, fileName.length));
- // for storing segmentid
- taskMinMaxSchemas.add(
- new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
- CarbonRowSchema[] schema =
- taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
- summaryDMStore = getMemoryDMStore(schema, addToUnsafe);
- }
-
- private void getMinMaxSchema(SegmentProperties segmentProperties,
- List<CarbonRowSchema> minMaxSchemas) {
- // Index key
- int[] minMaxLen = segmentProperties.getColumnsValueSize();
- // do it 2 times, one for min and one for max.
- for (int k = 0; k < 2; k++) {
- CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
- for (int i = 0; i < minMaxLen.length; i++) {
- if (minMaxLen[i] <= 0) {
- boolean isVarchar = false;
- if (i < segmentProperties.getDimensions().size()
- && segmentProperties.getDimensions().get(i).getDataType() == DataTypes.VARCHAR) {
- isVarchar = true;
- }
- mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY,
- isVarchar);
- } else {
- mapSchemas[i] =
- new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
- }
- }
- CarbonRowSchema mapSchema =
- new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
- mapSchemas);
- minMaxSchemas.add(mapSchema);
- }
- }
-
- @Override
- public boolean isScanRequired(FilterResolverIntf filterExp) {
- FilterExecuter filterExecuter =
- FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
- for (int i = 0; i < summaryDMStore.getRowCount(); i++) {
- DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i);
- boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
- filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
- getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
- if (isScanRequired) {
- return true;
- }
- }
- return false;
- }
-
private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) {
if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
List<Blocklet> blocklets = new ArrayList<>();
+ CarbonRowSchema[] schema = memoryDMStore.getSchema();
--- End diff --
Use the prune from super class
---
[GitHub] carbondata issue #2437: [CARBONDATA-2645] Segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6796/
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5611/
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5568/
---
[GitHub] carbondata pull request #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200157458
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+/**
+ * holder for blocklet info indexes in a DataMap row
+ */
+public class BlockletDataMapRowIndexes {
--- End diff --
Change to interface
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5558/
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5545/
---
[GitHub] carbondata issue #2437: [CARBONDATA-2645] Segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6846/
---
[GitHub] carbondata pull request #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200149538
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java ---
@@ -26,70 +26,66 @@
*/
public abstract class DataMapRow implements Serializable {
- protected CarbonRowSchema[] schemas;
--- End diff --
I don't think it is required to remove the schema from here as it is a temporary object to read data. It does not occupy any heap memory. Many changes related to this is not required. Please revert the changes
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5608/
---
[GitHub] carbondata pull request #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200155521
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java ---
@@ -0,0 +1,879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.indexstore.schema.SchemaGenerator;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.profiler.ExplainCollector;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.xerial.snappy.Snappy;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockDataMap extends CoarseGrainDataMap implements Serializable {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockDataMap.class.getName());
+
+ protected static final long serialVersionUID = -2170289352240810993L;
+ /**
+ * for CACHE_LEVEL=BLOCK and legacy store default blocklet id will be -1
+ */
+ private static final short BLOCK_DEFAULT_BLOCKLET_ID = -1;
+
+ protected AbstractMemoryDMStore memoryDMStore;
+
+ protected AbstractMemoryDMStore taskSummaryDMStore;
+
+ // As it is a heavy object it is not recommended to serialize this object
+ protected transient SegmentProperties segmentProperties;
+
+ protected int[] columnCardinality;
+
+ protected long blockletSchemaTime;
+ /**
+ * flag to check for store from 1.1 or any prior version
+ */
+ protected boolean isLegacyStore;
+
+ @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
+ long startTime = System.currentTimeMillis();
+ assert (dataMapModel instanceof BlockletDataMapModel);
+ BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ List<DataFileFooter> indexInfo = fileFooterConverter
+ .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
+ Path path = new Path(blockletDataMapInfo.getFilePath());
+ byte[] filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+ byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+ byte[] segmentId =
+ blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+ byte[] schemaBinary = null;
+ if (!indexInfo.isEmpty()) {
+ DataFileFooter fileFooter = indexInfo.get(0);
+ // store for 1.1 or any prior version will not have any blocklet information in file footer
+ isLegacyStore = fileFooter.getBlockletList() == null;
+ // init segment properties and create schema
+ initSegmentProperties(fileFooter);
+ schemaBinary = convertSchemaToBinary(fileFooter.getColumnInTable());
+ createSchema(segmentProperties, blockletDataMapInfo.isAddToUnsafe(), true);
+ createSummarySchema(segmentProperties, schemaBinary, filePath, fileName, segmentId,
+ blockletDataMapInfo.isAddToUnsafe(), true);
+ }
+ // check for legacy store and load the metadata
+ DataMapRowImpl summaryRow = loadBlockMetadata(blockletDataMapInfo, indexInfo);
--- End diff --
Create protected methods for `createSchema`, `createSummarySchema` and `loadMetadata` and override the same in BloclkletDataMap
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6776/
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5598/
---
[GitHub] carbondata issue #2437: [CARBONDATA-2645] Segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5625/
---
[GitHub] carbondata issue #2437: [CARBONDATA-2645] Segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5642/
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6718/
---
[GitHub] carbondata issue #2437: [CARBONDATA-2645] Segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437
LGTM
---
[GitHub] carbondata issue #2437: [WIP] segregate block and blocklet cache
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6738/
---
[GitHub] carbondata issue #2437: [CARBONDATA-2645] Segregate block and blocklet cache
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2437
@ravipesala ..handled review comments. Kindly review and merge
---
[GitHub] carbondata pull request #2437: [WIP] segregate block and blocklet cache
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200149739
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java ---
@@ -0,0 +1,879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
--- End diff --
use `import java.io.*`
---