You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/12/01 16:35:44 UTC
[1/3] incubator-carbondata git commit: add file format version enum
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 90bc36699 -> 5406cee1b
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
deleted file mode 100644
index 8c2608b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.processing.store.writer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.writer.CarbonFooterWriter;
-import org.apache.carbondata.format.FileFooter;
-import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-
-public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFactDataWriter<int[]> {
-
- private static final LogService LOGGER = LogServiceFactory
- .getLogService(CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName());
-
- public CarbonFactDataWriterImplForIntIndexAndAggBlock(CarbonDataWriterVo dataWriterVo) {
- super(dataWriterVo);
- }
-
- @Override
- public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray, byte[][] dataArray,
- int entryCount, byte[] startKey, byte[] endKey, ValueCompressionModel compressionModel,
- byte[] noDictionaryStartKey, byte[] noDictionaryEndKey) throws CarbonDataWriterException {
- // if there are no NO-Dictionary column present in the table then
- // set the empty byte array
- if (null == noDictionaryEndKey) {
- noDictionaryEndKey = new byte[0];
- }
- if (null == noDictionaryStartKey) {
- noDictionaryStartKey = new byte[0];
- }
- // total measure length;
- int totalMsrArrySize = 0;
- // current measure length;
- int currentMsrLenght = 0;
- int totalKeySize = 0;
- int keyBlockSize = 0;
-
- boolean[] isSortedData = new boolean[keyStorageArray.length];
- int[] keyLengths = new int[keyStorageArray.length];
-
- //below will calculate min and max value for each column
- //for below 2d array, first index will be for column and second will be min max
- // value for same column
- // byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
-
- byte[][] allMinValue = new byte[keyStorageArray.length][];
- byte[][] allMaxValue = new byte[keyStorageArray.length][];
- byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
- boolean[] colGrpBlock = new boolean[keyStorageArray.length];
-
- for (int i = 0; i < keyLengths.length; i++) {
- keyLengths[i] = keyBlockData[i].length;
- isSortedData[i] = keyStorageArray[i].isAlreadySorted();
- if (!isSortedData[i]) {
- keyBlockSize++;
-
- }
- totalKeySize += keyLengths[i];
- if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
- allMinValue[i] = keyStorageArray[i].getMin();
- allMaxValue[i] = keyStorageArray[i].getMax();
- } else {
- allMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
- allMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
- }
- //if keyStorageArray is instance of ColGroupBlockStorage than it's colGroup chunk
- if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
- colGrpBlock[i] = true;
- }
- }
- int[] keyBlockIdxLengths = new int[keyBlockSize];
- byte[][] dataAfterCompression = new byte[keyBlockSize][];
- byte[][] indexMap = new byte[keyBlockSize][];
- int idx = 0;
- for (int i = 0; i < isSortedData.length; i++) {
- if (!isSortedData[i]) {
- dataAfterCompression[idx] =
- numberCompressor.compress(keyStorageArray[i].getDataAfterComp());
- if (null != keyStorageArray[i].getIndexMap()
- && keyStorageArray[i].getIndexMap().length > 0) {
- indexMap[idx] = numberCompressor.compress(keyStorageArray[i].getIndexMap());
- } else {
- indexMap[idx] = new byte[0];
- }
- keyBlockIdxLengths[idx] = (dataAfterCompression[idx].length + indexMap[idx].length)
- + CarbonCommonConstants.INT_SIZE_IN_BYTE;
- idx++;
- }
- }
- int compressDataBlockSize = 0;
- for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
- if (dataWriterVo.getAggBlocks()[i]) {
- compressDataBlockSize++;
- }
- }
- byte[][] compressedDataIndex = new byte[compressDataBlockSize][];
- int[] dataIndexMapLength = new int[compressDataBlockSize];
- idx = 0;
- for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
- if (dataWriterVo.getAggBlocks()[i]) {
- try {
- compressedDataIndex[idx] =
- numberCompressor.compress(keyStorageArray[i].getDataIndexMap());
- dataIndexMapLength[idx] = compressedDataIndex[idx].length;
- idx++;
- } catch (Exception e) {
- throw new CarbonDataWriterException(e.getMessage());
- }
- }
- }
-
- int[] msrLength = new int[dataWriterVo.getMeasureCount()];
- // calculate the total size required for all the measure and get the
- // each measure size
- for (int i = 0; i < dataArray.length; i++) {
- currentMsrLenght = dataArray[i].length;
- totalMsrArrySize += currentMsrLenght;
- msrLength[i] = currentMsrLenght;
- }
- NodeHolder holder = new NodeHolder();
- holder.setDataArray(dataArray);
- holder.setKeyArray(keyBlockData);
- // end key format will be <length of dictionary key><length of no
- // dictionary key><DictionaryKey><No Dictionary key>
- byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
- ByteBuffer buffer = ByteBuffer.allocate(
- CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
- + endKey.length + updatedNoDictionaryEndKey.length);
- buffer.putInt(endKey.length);
- buffer.putInt(updatedNoDictionaryEndKey.length);
- buffer.put(endKey);
- buffer.put(updatedNoDictionaryEndKey);
- buffer.rewind();
- holder.setEndKey(buffer.array());
- holder.setMeasureLenght(msrLength);
- byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
- // start key format will be <length of dictionary key><length of no
- // dictionary key><DictionaryKey><No Dictionary key>
- buffer = ByteBuffer.allocate(
- CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
- + startKey.length + updatedNoDictionaryStartKey.length);
- buffer.putInt(startKey.length);
- buffer.putInt(updatedNoDictionaryStartKey.length);
- buffer.put(startKey);
- buffer.put(updatedNoDictionaryStartKey);
- buffer.rewind();
- holder.setStartKey(buffer.array());
- holder.setEntryCount(entryCount);
- holder.setKeyLengths(keyLengths);
- holder.setKeyBlockIndexLength(keyBlockIdxLengths);
- holder.setIsSortedKeyBlock(isSortedData);
- holder.setCompressedIndex(dataAfterCompression);
- holder.setCompressedIndexMap(indexMap);
- holder.setDataIndexMapLength(dataIndexMapLength);
- holder.setCompressedDataIndex(compressedDataIndex);
- holder.setCompressionModel(compressionModel);
- holder.setTotalDimensionArrayLength(totalKeySize);
- holder.setTotalMeasureArrayLength(totalMsrArrySize);
- //setting column min max value
- holder.setColumnMaxData(allMaxValue);
- holder.setColumnMinData(allMinValue);
- holder.setAggBlocks(dataWriterVo.getAggBlocks());
- holder.setColGrpBlocks(colGrpBlock);
- return holder;
- }
-
- @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
- int indexBlockSize = 0;
- for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
- indexBlockSize += holder.getKeyBlockIndexLength()[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
- }
-
- for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
- indexBlockSize += holder.getDataIndexMapLength()[i];
- }
-
- long blockletDataSize =
- holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength()
- + indexBlockSize;
- updateBlockletFileChannel(blockletDataSize);
- // write data to file and get its offset
- long offset = writeDataToFile(holder, fileChannel);
- // get the blocklet info for currently added blocklet
- BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
- // add blocklet info to list
- blockletInfoList.add(blockletInfo);
- LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
- }
-
- /**
- * This method is responsible for writing blocklet to the data file
- *
- * @return file offset offset is the current position of the file
- * @throws CarbonDataWriterException if will throw CarbonDataWriterException when any thing
- * goes wrong while while writing the leaf file
- */
- private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
- throws CarbonDataWriterException {
- // create byte buffer
- byte[][] compressedIndex = nodeHolder.getCompressedIndex();
- byte[][] compressedIndexMap = nodeHolder.getCompressedIndexMap();
- byte[][] compressedDataIndex = nodeHolder.getCompressedDataIndex();
- int indexBlockSize = 0;
- int index = 0;
- for (int i = 0; i < nodeHolder.getKeyBlockIndexLength().length; i++) {
- indexBlockSize +=
- nodeHolder.getKeyBlockIndexLength()[index++] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
- }
-
- for (int i = 0; i < nodeHolder.getDataIndexMapLength().length; i++) {
- indexBlockSize += nodeHolder.getDataIndexMapLength()[i];
- }
- ByteBuffer byteBuffer = ByteBuffer.allocate(
- nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength()
- + indexBlockSize);
- long offset = 0;
- try {
- // get the current offset
- offset = channel.size();
- // add key array to byte buffer
- for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
- byteBuffer.put(nodeHolder.getKeyArray()[i]);
- }
- for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
- byteBuffer.put(nodeHolder.getDataArray()[i]);
- }
- // add measure data array to byte buffer
-
- ByteBuffer buffer1 = null;
- for (int i = 0; i < compressedIndex.length; i++) {
- buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]);
- buffer1.putInt(compressedIndex[i].length);
- buffer1.put(compressedIndex[i]);
- if (compressedIndexMap[i].length > 0) {
- buffer1.put(compressedIndexMap[i]);
- }
- buffer1.rewind();
- byteBuffer.put(buffer1.array());
-
- }
- for (int i = 0; i < compressedDataIndex.length; i++) {
- byteBuffer.put(compressedDataIndex[i]);
- }
- byteBuffer.flip();
- // write data to file
- channel.write(byteBuffer);
- } catch (IOException exception) {
- throw new CarbonDataWriterException("Problem in writing carbon file: ", exception);
- }
- // return the offset, this offset will be used while reading the file in
- // engine side to get from which position to start reading the file
- return offset;
- }
-
- /**
- * This method will be used to get the blocklet metadata
- *
- * @return BlockletInfo - blocklet metadata
- */
- protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
- // create the info object for leaf entry
- BlockletInfoColumnar info = new BlockletInfoColumnar();
- //add aggBlocks array
- info.setAggKeyBlock(nodeHolder.getAggBlocks());
- // add total entry count
- info.setNumberOfKeys(nodeHolder.getEntryCount());
-
- // add the key array length
- info.setKeyLengths(nodeHolder.getKeyLengths());
- // adding null measure index bit set
- info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
- //add column min max length
- info.setColumnMaxData(nodeHolder.getColumnMaxData());
- info.setColumnMinData(nodeHolder.getColumnMinData());
- long[] keyOffSets = new long[nodeHolder.getKeyLengths().length];
-
- for (int i = 0; i < keyOffSets.length; i++) {
- keyOffSets[i] = offset;
- offset += nodeHolder.getKeyLengths()[i];
- }
- // key offset will be 8 bytes from current offset because first 4 bytes
- // will be for number of entry in leaf, then next 4 bytes will be for
- // key lenght;
- // offset += CarbonCommonConstants.INT_SIZE_IN_BYTE * 2;
-
- // add key offset
- info.setKeyOffSets(keyOffSets);
-
- // add measure length
- info.setMeasureLength(nodeHolder.getMeasureLenght());
-
- long[] msrOffset = new long[dataWriterVo.getMeasureCount()];
-
- for (int i = 0; i < msrOffset.length; i++) {
- // increment the current offset by 4 bytes because 4 bytes will be
- // used for measure byte length
- // offset += CarbonCommonConstants.INT_SIZE_IN_BYTE;
- msrOffset[i] = offset;
- // now increment the offset by adding measure length to get the next
- // measure offset;
- offset += nodeHolder.getMeasureLenght()[i];
- }
- // add measure offset
- info.setMeasureOffset(msrOffset);
- info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
- info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
- long[] keyBlockIndexOffsets = new long[nodeHolder.getKeyBlockIndexLength().length];
- for (int i = 0; i < keyBlockIndexOffsets.length; i++) {
- keyBlockIndexOffsets[i] = offset;
- offset += nodeHolder.getKeyBlockIndexLength()[i];
- }
- info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
- long[] dataIndexMapOffsets = new long[nodeHolder.getDataIndexMapLength().length];
- for (int i = 0; i < dataIndexMapOffsets.length; i++) {
- dataIndexMapOffsets[i] = offset;
- offset += nodeHolder.getDataIndexMapLength()[i];
- }
- info.setDataIndexMapOffsets(dataIndexMapOffsets);
- info.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
- // set startkey
- info.setStartKey(nodeHolder.getStartKey());
- // set end key
- info.setEndKey(nodeHolder.getEndKey());
- info.setCompressionModel(nodeHolder.getCompressionModel());
- // return leaf metadata
-
- //colGroup Blocks
- info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
-
- return info;
- }
-
- /**
- * This method will write metadata at the end of file file format in thrift format
- */
- protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
- String filePath) throws CarbonDataWriterException {
- try {
- long currentPosition = channel.size();
- CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
- FileFooter convertFileMeta = CarbonMetadataUtil
- .convertFileFooter(infoList, localCardinality.length, localCardinality,
- thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
- fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
- writer.writeFooter(convertFileMeta, currentPosition);
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
new file mode 100644
index 0000000..19e781d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.store.writer.v1;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.NodeHolder;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
+
+ private static final LogService LOGGER = LogServiceFactory
+ .getLogService(CarbonFactDataWriterImplV1.class.getName());
+
+ public CarbonFactDataWriterImplV1(CarbonDataWriterVo dataWriterVo) {
+ super(dataWriterVo);
+ }
+
+ @Override
+ public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray, byte[][] dataArray,
+ int entryCount, byte[] startKey, byte[] endKey, ValueCompressionModel compressionModel,
+ byte[] noDictionaryStartKey, byte[] noDictionaryEndKey) throws CarbonDataWriterException {
+ // if there are no NO-Dictionary column present in the table then
+ // set the empty byte array
+ if (null == noDictionaryEndKey) {
+ noDictionaryEndKey = new byte[0];
+ }
+ if (null == noDictionaryStartKey) {
+ noDictionaryStartKey = new byte[0];
+ }
+ // total measure length;
+ int totalMsrArrySize = 0;
+ // current measure length;
+ int currentMsrLenght = 0;
+ int totalKeySize = 0;
+ int keyBlockSize = 0;
+
+ boolean[] isSortedData = new boolean[keyStorageArray.length];
+ int[] keyLengths = new int[keyStorageArray.length];
+
+ //below will calculate min and max value for each column
+ //for below 2d array, first index will be for column and second will be min max
+ // value for same column
+ // byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
+
+ byte[][] allMinValue = new byte[keyStorageArray.length][];
+ byte[][] allMaxValue = new byte[keyStorageArray.length][];
+ byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
+ boolean[] colGrpBlock = new boolean[keyStorageArray.length];
+
+ for (int i = 0; i < keyLengths.length; i++) {
+ keyLengths[i] = keyBlockData[i].length;
+ isSortedData[i] = keyStorageArray[i].isAlreadySorted();
+ if (!isSortedData[i]) {
+ keyBlockSize++;
+
+ }
+ totalKeySize += keyLengths[i];
+ if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
+ allMinValue[i] = keyStorageArray[i].getMin();
+ allMaxValue[i] = keyStorageArray[i].getMax();
+ } else {
+ allMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
+ allMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
+ }
+ //if keyStorageArray is instance of ColGroupBlockStorage than it's colGroup chunk
+ if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
+ colGrpBlock[i] = true;
+ }
+ }
+ int[] keyBlockIdxLengths = new int[keyBlockSize];
+ byte[][] dataAfterCompression = new byte[keyBlockSize][];
+ byte[][] indexMap = new byte[keyBlockSize][];
+ int idx = 0;
+ for (int i = 0; i < isSortedData.length; i++) {
+ if (!isSortedData[i]) {
+ dataAfterCompression[idx] =
+ numberCompressor.compress(keyStorageArray[i].getDataAfterComp());
+ if (null != keyStorageArray[i].getIndexMap()
+ && keyStorageArray[i].getIndexMap().length > 0) {
+ indexMap[idx] = numberCompressor.compress(keyStorageArray[i].getIndexMap());
+ } else {
+ indexMap[idx] = new byte[0];
+ }
+ keyBlockIdxLengths[idx] = (dataAfterCompression[idx].length + indexMap[idx].length)
+ + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+ idx++;
+ }
+ }
+ int compressDataBlockSize = 0;
+ for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+ if (dataWriterVo.getAggBlocks()[i]) {
+ compressDataBlockSize++;
+ }
+ }
+ byte[][] compressedDataIndex = new byte[compressDataBlockSize][];
+ int[] dataIndexMapLength = new int[compressDataBlockSize];
+ idx = 0;
+ for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+ if (dataWriterVo.getAggBlocks()[i]) {
+ try {
+ compressedDataIndex[idx] =
+ numberCompressor.compress(keyStorageArray[i].getDataIndexMap());
+ dataIndexMapLength[idx] = compressedDataIndex[idx].length;
+ idx++;
+ } catch (Exception e) {
+ throw new CarbonDataWriterException(e.getMessage());
+ }
+ }
+ }
+
+ int[] msrLength = new int[dataWriterVo.getMeasureCount()];
+ // calculate the total size required for all the measure and get the
+ // each measure size
+ for (int i = 0; i < dataArray.length; i++) {
+ currentMsrLenght = dataArray[i].length;
+ totalMsrArrySize += currentMsrLenght;
+ msrLength[i] = currentMsrLenght;
+ }
+ NodeHolder holder = new NodeHolder();
+ holder.setDataArray(dataArray);
+ holder.setKeyArray(keyBlockData);
+ // end key format will be <length of dictionary key><length of no
+ // dictionary key><DictionaryKey><No Dictionary key>
+ byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
+ ByteBuffer buffer = ByteBuffer.allocate(
+ CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ + endKey.length + updatedNoDictionaryEndKey.length);
+ buffer.putInt(endKey.length);
+ buffer.putInt(updatedNoDictionaryEndKey.length);
+ buffer.put(endKey);
+ buffer.put(updatedNoDictionaryEndKey);
+ buffer.rewind();
+ holder.setEndKey(buffer.array());
+ holder.setMeasureLenght(msrLength);
+ byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
+ // start key format will be <length of dictionary key><length of no
+ // dictionary key><DictionaryKey><No Dictionary key>
+ buffer = ByteBuffer.allocate(
+ CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ + startKey.length + updatedNoDictionaryStartKey.length);
+ buffer.putInt(startKey.length);
+ buffer.putInt(updatedNoDictionaryStartKey.length);
+ buffer.put(startKey);
+ buffer.put(updatedNoDictionaryStartKey);
+ buffer.rewind();
+ holder.setStartKey(buffer.array());
+ holder.setEntryCount(entryCount);
+ holder.setKeyLengths(keyLengths);
+ holder.setKeyBlockIndexLength(keyBlockIdxLengths);
+ holder.setIsSortedKeyBlock(isSortedData);
+ holder.setCompressedIndex(dataAfterCompression);
+ holder.setCompressedIndexMap(indexMap);
+ holder.setDataIndexMapLength(dataIndexMapLength);
+ holder.setCompressedDataIndex(compressedDataIndex);
+ holder.setCompressionModel(compressionModel);
+ holder.setTotalDimensionArrayLength(totalKeySize);
+ holder.setTotalMeasureArrayLength(totalMsrArrySize);
+ //setting column min max value
+ holder.setColumnMaxData(allMaxValue);
+ holder.setColumnMinData(allMinValue);
+ holder.setAggBlocks(dataWriterVo.getAggBlocks());
+ holder.setColGrpBlocks(colGrpBlock);
+ return holder;
+ }
+
+ @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+ int indexBlockSize = 0;
+ for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+ indexBlockSize += holder.getKeyBlockIndexLength()[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+ }
+
+ for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+ indexBlockSize += holder.getDataIndexMapLength()[i];
+ }
+
+ long blockletDataSize =
+ holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength()
+ + indexBlockSize;
+ updateBlockletFileChannel(blockletDataSize);
+ // write data to file and get its offset
+ long offset = writeDataToFile(holder, fileChannel);
+ // get the blocklet info for currently added blocklet
+ BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
+ // add blocklet info to list
+ blockletInfoList.add(blockletInfo);
+ LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
+ }
+
+ /**
+ * This method is responsible for writing blocklet to the data file
+ *
+ * @return file offset offset is the current position of the file
+ * @throws CarbonDataWriterException if will throw CarbonDataWriterException when any thing
+ * goes wrong while while writing the leaf file
+ */
+ private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
+ throws CarbonDataWriterException {
+ // create byte buffer
+ byte[][] compressedIndex = nodeHolder.getCompressedIndex();
+ byte[][] compressedIndexMap = nodeHolder.getCompressedIndexMap();
+ byte[][] compressedDataIndex = nodeHolder.getCompressedDataIndex();
+ int indexBlockSize = 0;
+ int index = 0;
+ for (int i = 0; i < nodeHolder.getKeyBlockIndexLength().length; i++) {
+ indexBlockSize +=
+ nodeHolder.getKeyBlockIndexLength()[index++] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+ }
+
+ for (int i = 0; i < nodeHolder.getDataIndexMapLength().length; i++) {
+ indexBlockSize += nodeHolder.getDataIndexMapLength()[i];
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(
+ nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength()
+ + indexBlockSize);
+ long offset = 0;
+ try {
+ // get the current offset
+ offset = channel.size();
+ // add key array to byte buffer
+ for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
+ byteBuffer.put(nodeHolder.getKeyArray()[i]);
+ }
+ for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+ byteBuffer.put(nodeHolder.getDataArray()[i]);
+ }
+ // add measure data array to byte buffer
+
+ ByteBuffer buffer1 = null;
+ for (int i = 0; i < compressedIndex.length; i++) {
+ buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]);
+ buffer1.putInt(compressedIndex[i].length);
+ buffer1.put(compressedIndex[i]);
+ if (compressedIndexMap[i].length > 0) {
+ buffer1.put(compressedIndexMap[i]);
+ }
+ buffer1.rewind();
+ byteBuffer.put(buffer1.array());
+
+ }
+ for (int i = 0; i < compressedDataIndex.length; i++) {
+ byteBuffer.put(compressedDataIndex[i]);
+ }
+ byteBuffer.flip();
+ // write data to file
+ channel.write(byteBuffer);
+ } catch (IOException exception) {
+ throw new CarbonDataWriterException("Problem in writing carbon file: ", exception);
+ }
+ // return the offset, this offset will be used while reading the file in
+ // engine side to get from which position to start reading the file
+ return offset;
+ }
+
+ /**
+ * This method will be used to get the blocklet metadata
+ *
+ * @return BlockletInfo - blocklet metadata
+ */
+ protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
+ // create the info object for leaf entry
+ BlockletInfoColumnar info = new BlockletInfoColumnar();
+ //add aggBlocks array
+ info.setAggKeyBlock(nodeHolder.getAggBlocks());
+ // add total entry count
+ info.setNumberOfKeys(nodeHolder.getEntryCount());
+
+ // add the key array length
+ info.setKeyLengths(nodeHolder.getKeyLengths());
+ // adding null measure index bit set
+ info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
+ //add column min max length
+ info.setColumnMaxData(nodeHolder.getColumnMaxData());
+ info.setColumnMinData(nodeHolder.getColumnMinData());
+ long[] keyOffSets = new long[nodeHolder.getKeyLengths().length];
+
+ for (int i = 0; i < keyOffSets.length; i++) {
+ keyOffSets[i] = offset;
+ offset += nodeHolder.getKeyLengths()[i];
+ }
+ // key offset will be 8 bytes from current offset because first 4 bytes
+ // will be for number of entry in leaf, then next 4 bytes will be for
+ // key lenght;
+ // offset += CarbonCommonConstants.INT_SIZE_IN_BYTE * 2;
+
+ // add key offset
+ info.setKeyOffSets(keyOffSets);
+
+ // add measure length
+ info.setMeasureLength(nodeHolder.getMeasureLenght());
+
+ long[] msrOffset = new long[dataWriterVo.getMeasureCount()];
+
+ for (int i = 0; i < msrOffset.length; i++) {
+ // increment the current offset by 4 bytes because 4 bytes will be
+ // used for measure byte length
+ // offset += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+ msrOffset[i] = offset;
+ // now increment the offset by adding measure length to get the next
+ // measure offset;
+ offset += nodeHolder.getMeasureLenght()[i];
+ }
+ // add measure offset
+ info.setMeasureOffset(msrOffset);
+ info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
+ info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
+ long[] keyBlockIndexOffsets = new long[nodeHolder.getKeyBlockIndexLength().length];
+ for (int i = 0; i < keyBlockIndexOffsets.length; i++) {
+ keyBlockIndexOffsets[i] = offset;
+ offset += nodeHolder.getKeyBlockIndexLength()[i];
+ }
+ info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
+ long[] dataIndexMapOffsets = new long[nodeHolder.getDataIndexMapLength().length];
+ for (int i = 0; i < dataIndexMapOffsets.length; i++) {
+ dataIndexMapOffsets[i] = offset;
+ offset += nodeHolder.getDataIndexMapLength()[i];
+ }
+ info.setDataIndexMapOffsets(dataIndexMapOffsets);
+ info.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
+ // set startkey
+ info.setStartKey(nodeHolder.getStartKey());
+ // set end key
+ info.setEndKey(nodeHolder.getEndKey());
+ info.setCompressionModel(nodeHolder.getCompressionModel());
+ // return leaf metadata
+
+ //colGroup Blocks
+ info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
+
+ return info;
+ }
+
+ /**
+ * This method will write metadata at the end of file file format in thrift format
+ */
+ protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+ String filePath) throws CarbonDataWriterException {
+ try {
+ long currentPosition = channel.size();
+ CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+ FileFooter convertFileMeta = CarbonMetadataUtil
+ .convertFileFooter(infoList, localCardinality.length, localCardinality,
+ thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+ fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+ writer.writeFooter(convertFileMeta, currentPosition);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
new file mode 100644
index 0000000..f9f45cc
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.NodeHolder;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
+
+/**
+ * Below method will be used to write the data in version 2 format
+ */
+public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
+
+ /**
+ * logger
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonFactDataWriterImplV2.class.getName());
+
+ /**
+ * Constructor create instance of this class
+ *
+ * @param dataWriterVo
+ */
+ public CarbonFactDataWriterImplV2(CarbonDataWriterVo dataWriterVo) {
+ super(dataWriterVo);
+ }
+
+ /**
+ * Below method will be used to write the data to carbon data file
+ *
+ * @param holder
+ * @throws CarbonDataWriterException any problem in writing operation
+ */
+ @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+ // size to calculate the size of the blocklet
+ int size = 0;
+ // get the blocklet info object
+ BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
+
+ List<DataChunk2> datachunks = null;
+ try {
+ // get all the data chunks
+ datachunks = CarbonMetadataUtil
+ .getDatachunk2(blockletInfo, thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while getting the data chunks", e);
+ }
+ // data chunk byte array
+ byte[][] dataChunkByteArray = new byte[datachunks.size()][];
+ for (int i = 0; i < dataChunkByteArray.length; i++) {
+ dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i));
+ // add the data chunk size
+ size += dataChunkByteArray[i].length;
+ }
+ // add row id index length
+ for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+ size += holder.getKeyBlockIndexLength()[i];
+ }
+ // add rle index length
+ for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+ size += holder.getDataIndexMapLength()[i];
+ }
+ // add dimension column data page and measure column data page size
+ long blockletDataSize =
+ holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
+ // if size of the file already reached threshold size then create a new file and get the file
+ // channel object
+ updateBlockletFileChannel(blockletDataSize);
+ // writer the version header in the file if current file size is zero
+ // this is done so carbondata file can be read separately
+ try {
+ if (fileChannel.size() == 0) {
+ ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+ byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes();
+ ByteBuffer buffer = ByteBuffer.allocate(header.length);
+ buffer.put(header);
+ buffer.rewind();
+ fileChannel.write(buffer);
+ }
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while getting the file channel size", e);
+ }
+ // write data to file and get its offset
+ writeDataToFile(holder, dataChunkByteArray, fileChannel);
+ // add blocklet info to list
+ blockletInfoList.add(blockletInfo);
+ LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
+ }
+
+ /**
+ * Below method will be used to write the data to file
+ * Data Format
+ * <DColumn1DataChunk><DColumnDataPage><DColumnRle>
+ * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle>
+ * <DColumn3DataChunk><DColumn3DataPage><column3RowIds>
+ * <MColumn1DataChunk><MColumn1DataPage>
+ * <MColumn2DataChunk><MColumn2DataPage>
+ * <MColumn2DataChunk><MColumn2DataPage>
+ *
+ * @param nodeHolder
+ * @param dataChunksBytes
+ * @param channel
+ * @throws CarbonDataWriterException
+ */
+ private void writeDataToFile(NodeHolder nodeHolder, byte[][] dataChunksBytes, FileChannel channel)
+ throws CarbonDataWriterException {
+ long offset = 0;
+ try {
+ offset = channel.size();
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while getting the file channel size");
+ }
+ List<Long> currentDataChunksOffset = new ArrayList<>();
+ List<Short> currentDataChunksLength = new ArrayList<>();
+ dataChunksLength.add(currentDataChunksLength);
+ dataChunksOffsets.add(currentDataChunksOffset);
+ int bufferSize = 0;
+ int rowIdIndex = 0;
+ int rleIndex = 0;
+ for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+ currentDataChunksOffset.add(offset);
+ currentDataChunksLength.add((short) dataChunksBytes[i].length);
+ bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] + (!nodeHolder
+ .getIsSortedKeyBlock()[i] ? nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
+ dataWriterVo.getAggBlocks()[i] ?
+ nodeHolder.getCompressedDataIndex()[rleIndex].length :
+ 0);
+ offset += dataChunksBytes[i].length;
+ offset += nodeHolder.getKeyLengths()[i];
+ if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+ offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex];
+ rowIdIndex++;
+ }
+ if (dataWriterVo.getAggBlocks()[i]) {
+ offset += nodeHolder.getDataIndexMapLength()[rleIndex];
+ rleIndex++;
+ }
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+ rleIndex = 0;
+ rowIdIndex = 0;
+ for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+ buffer.put(dataChunksBytes[i]);
+ buffer.put(nodeHolder.getKeyArray()[i]);
+ if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+ buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
+ buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
+ if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
+ buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
+ }
+ rowIdIndex++;
+ }
+ if (dataWriterVo.getAggBlocks()[i]) {
+ buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
+ rleIndex++;
+ }
+ }
+ try {
+ buffer.flip();
+ channel.write(buffer);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException(
+ "Problem while writing the dimension data in carbon data file", e);
+ }
+
+ int dataChunkIndex = nodeHolder.getKeyArray().length;
+ int totalLength = 0;
+ for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+ currentDataChunksOffset.add(offset);
+ currentDataChunksLength.add((short) dataChunksBytes[dataChunkIndex].length);
+ offset += dataChunksBytes[dataChunkIndex].length;
+ offset += nodeHolder.getDataArray()[i].length;
+ totalLength += dataChunksBytes[dataChunkIndex].length;
+ totalLength += nodeHolder.getDataArray()[i].length;
+ dataChunkIndex++;
+ }
+ buffer = ByteBuffer.allocate(totalLength);
+ dataChunkIndex = nodeHolder.getKeyArray().length;
+ for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+ buffer.put(dataChunksBytes[dataChunkIndex++]);
+ buffer.put(nodeHolder.getDataArray()[i]);
+ }
+ try {
+ buffer.flip();
+ channel.write(buffer);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException(
+ "Problem while writing the measure data in carbon data file", e);
+ }
+ }
+
+ /**
+ * This method will be used to get the blocklet metadata
+ *
+ * @return BlockletInfo - blocklet metadata
+ */
+ protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
+ // create the info object for leaf entry
+ BlockletInfoColumnar info = new BlockletInfoColumnar();
+ //add aggBlocks array
+ info.setAggKeyBlock(nodeHolder.getAggBlocks());
+ // add total entry count
+ info.setNumberOfKeys(nodeHolder.getEntryCount());
+
+ // add the key array length
+ info.setKeyLengths(nodeHolder.getKeyLengths());
+ // adding null measure index bit set
+ info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
+ //add column min max length
+ info.setColumnMaxData(nodeHolder.getColumnMaxData());
+ info.setColumnMinData(nodeHolder.getColumnMinData());
+
+ // add measure length
+ info.setMeasureLength(nodeHolder.getMeasureLenght());
+
+ info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
+ info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
+ info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
+ // set startkey
+ info.setStartKey(nodeHolder.getStartKey());
+ // set end key
+ info.setEndKey(nodeHolder.getEndKey());
+ info.setCompressionModel(nodeHolder.getCompressionModel());
+ // return leaf metadata
+
+ //colGroup Blocks
+ info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
+
+ return info;
+ }
+
+ /**
+ * This method will write metadata at the end of file file format in thrift format
+ */
+ protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+ String filePath) throws CarbonDataWriterException {
+ try {
+ // get the current file position
+ long currentPosition = channel.size();
+ CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+ // get thrift file footer instance
+ FileFooter convertFileMeta = CarbonMetadataUtil
+ .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList,
+ dataChunksOffsets, dataChunksLength);
+ // fill the carbon index details
+ fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+ // write the footer
+ writer.writeFooter(convertFileMeta, currentPosition);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 84192b8..328e33b 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.BlockIndexStore;
import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
@@ -52,7 +53,7 @@ public class BlockIndexStoreTest extends TestCase {
@BeforeClass public void setUp() {
property = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1");
StoreCreator.createCarbonStore();
indexStore = BlockIndexStore.getInstance();
}
@@ -61,7 +62,7 @@ public class BlockIndexStoreTest extends TestCase {
if(null!=property) {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, property);
}else {
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION+"");
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
}
}
@@ -71,7 +72,7 @@ public class BlockIndexStoreTest extends TestCase {
File file = getPartFile();
TableBlockInfo info =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length(),(short)1);
+ file.length(), ColumnarFormatVersion.V1);
CarbonTableIdentifier carbonTableIdentifier =
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -93,20 +94,20 @@ public class BlockIndexStoreTest extends TestCase {
File file = getPartFile();
TableBlockInfo info =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info1 =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info2 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info3 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info4 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
CarbonTableIdentifier carbonTableIdentifier =
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
@@ -148,31 +149,31 @@ public class BlockIndexStoreTest extends TestCase {
File file = getPartFile();
TableBlockInfo info =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info1 =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info2 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info3 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info4 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info5 =
new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
- file.length(),(short)1);
+ file.length(),ColumnarFormatVersion.V1);
TableBlockInfo info6 =
new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
TableBlockInfo info7 =
new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" },
- file.length(), (short)1);
+ file.length(), ColumnarFormatVersion.V1);
CarbonTableIdentifier carbonTableIdentifier =
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
[3/3] incubator-carbondata git commit: [CARBONDATA-480] Add file
format version enum. This closes #378
Posted by ra...@apache.org.
[CARBONDATA-480] Add file format version enum. This closes #378
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5406cee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5406cee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5406cee1
Branch: refs/heads/master
Commit: 5406cee1bb8c46dd903865ca785b8a039f753256
Parents: 90bc366 0ef3fb8
Author: ravipesala <ra...@gmail.com>
Authored: Thu Dec 1 22:05:11 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 1 22:05:11 2016 +0530
----------------------------------------------------------------------
conf/dataload.properties.template | 4 +-
.../core/carbon/ColumnarFormatVersion.java | 50 +++
.../carbon/datastore/block/TableBlockInfo.java | 11 +-
.../chunk/reader/CarbonDataReaderFactory.java | 17 +-
.../metadata/blocklet/DataFileFooter.java | 7 +-
.../core/constants/CarbonCommonConstants.java | 2 +-
.../util/AbstractDataFileFooterConverter.java | 4 +-
.../core/util/CarbonMetadataUtil.java | 11 +-
.../carbondata/core/util/CarbonProperties.java | 49 ++-
.../apache/carbondata/core/util/CarbonUtil.java | 5 -
.../core/util/DataFileFooterConverter.java | 3 +-
.../core/util/DataFileFooterConverter2.java | 3 +-
.../util/DataFileFooterConverterFactory.java | 14 +-
.../carbon/datastore/block/BlockInfoTest.java | 13 +-
.../datastore/block/TableBlockInfoTest.java | 33 +-
.../datastore/block/TableTaskInfoTest.java | 9 +-
.../core/util/CarbonMetadataUtilTest.java | 2 +-
.../carbondata/core/util/CarbonUtilTest.java | 9 +-
.../core/util/DataFileFooterConverterTest.java | 5 +-
.../carbondata/examples/CarbonExample.scala | 2 +-
.../carbondata/hadoop/CarbonInputFormat.java | 6 +-
.../carbondata/hadoop/CarbonInputSplit.java | 21 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 7 +-
.../TestQueryWithOldCarbonDataFile.scala | 8 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 7 +-
.../store/CarbonDataWriterFactory.java | 16 +-
.../store/CarbonFactDataHandlerColumnar.java | 4 +-
.../store/writer/CarbonFactDataWriterImpl2.java | 285 --------------
...actDataWriterImplForIntIndexAndAggBlock.java | 379 ------------------
.../writer/v1/CarbonFactDataWriterImplV1.java | 382 +++++++++++++++++++
.../writer/v2/CarbonFactDataWriterImplV2.java | 288 ++++++++++++++
.../carbon/datastore/BlockIndexStoreTest.java | 33 +-
32 files changed, 891 insertions(+), 798 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-carbondata git commit: add file format version enum
Posted by ra...@apache.org.
add file format version enum
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0ef3fb81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0ef3fb81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0ef3fb81
Branch: refs/heads/master
Commit: 0ef3fb81883e782fffef0beae01a429684893960
Parents: 90bc366
Author: jackylk <ja...@huawei.com>
Authored: Thu Dec 1 23:02:16 2016 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 1 22:04:26 2016 +0530
----------------------------------------------------------------------
conf/dataload.properties.template | 4 +-
.../core/carbon/ColumnarFormatVersion.java | 50 +++
.../carbon/datastore/block/TableBlockInfo.java | 11 +-
.../chunk/reader/CarbonDataReaderFactory.java | 17 +-
.../metadata/blocklet/DataFileFooter.java | 7 +-
.../core/constants/CarbonCommonConstants.java | 2 +-
.../util/AbstractDataFileFooterConverter.java | 4 +-
.../core/util/CarbonMetadataUtil.java | 11 +-
.../carbondata/core/util/CarbonProperties.java | 49 ++-
.../apache/carbondata/core/util/CarbonUtil.java | 5 -
.../core/util/DataFileFooterConverter.java | 3 +-
.../core/util/DataFileFooterConverter2.java | 3 +-
.../util/DataFileFooterConverterFactory.java | 14 +-
.../carbon/datastore/block/BlockInfoTest.java | 13 +-
.../datastore/block/TableBlockInfoTest.java | 33 +-
.../datastore/block/TableTaskInfoTest.java | 9 +-
.../core/util/CarbonMetadataUtilTest.java | 2 +-
.../carbondata/core/util/CarbonUtilTest.java | 9 +-
.../core/util/DataFileFooterConverterTest.java | 5 +-
.../carbondata/examples/CarbonExample.scala | 2 +-
.../carbondata/hadoop/CarbonInputFormat.java | 6 +-
.../carbondata/hadoop/CarbonInputSplit.java | 21 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 7 +-
.../TestQueryWithOldCarbonDataFile.scala | 8 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 7 +-
.../store/CarbonDataWriterFactory.java | 16 +-
.../store/CarbonFactDataHandlerColumnar.java | 4 +-
.../store/writer/CarbonFactDataWriterImpl2.java | 285 --------------
...actDataWriterImplForIntIndexAndAggBlock.java | 379 ------------------
.../writer/v1/CarbonFactDataWriterImplV1.java | 382 +++++++++++++++++++
.../writer/v2/CarbonFactDataWriterImplV2.java | 288 ++++++++++++++
.../carbon/datastore/BlockIndexStoreTest.java | 33 +-
32 files changed, 891 insertions(+), 798 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/conf/dataload.properties.template
----------------------------------------------------------------------
diff --git a/conf/dataload.properties.template b/conf/dataload.properties.template
index 59cad4a..d5e9d6a 100644
--- a/conf/dataload.properties.template
+++ b/conf/dataload.properties.template
@@ -18,14 +18,14 @@
#carbon store path
# you should change to the code path of your local machine
-carbon.storelocation=/Users/wangfei/code/incubator-carbondata/examples/spark2/target/store
+carbon.storelocation=/Users/jackylk/code/incubator-carbondata/examples/spark2/target/store
#true: use kettle to load data
#false: use new flow to load data
use_kettle=true
# you should change to the code path of your local machine
-carbon.kettle.home=/Users/wangfei/code/incubator-carbondata/processing/carbonplugins
+carbon.kettle.home=/Users/jackylk/code/incubator-carbondata/processing/carbonplugins
#csv delimiter character
delimiter=,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java b/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
new file mode 100644
index 0000000..bef345c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.carbon;
+
+public enum ColumnarFormatVersion {
+ V1((short)1),
+ V2((short)2);
+
+ private short version;
+ ColumnarFormatVersion(short version) {
+ this.version = version;
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnarFormatV" + version;
+ }
+
+ public short number() {
+ return version;
+ }
+
+ public static ColumnarFormatVersion valueOf(short version) {
+ switch (version) {
+ case 1:
+ return V1;
+ case 2:
+ return V2;
+ default:
+ throw new IllegalArgumentException("invalid format version: " + version);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index 0d60567..802a116 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.carbon.datastore.block;
import java.io.Serializable;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.path.CarbonTablePath;
import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
@@ -57,14 +58,14 @@ public class TableBlockInfo implements Distributable, Serializable {
private String[] locations;
- private short version;
+ private ColumnarFormatVersion version;
/**
* The class holds the blockletsinfo
*/
private BlockletInfos blockletInfos = new BlockletInfos();
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
- long blockLength, short version) {
+ long blockLength, ColumnarFormatVersion version) {
this.filePath = FileFactory.getUpdatedFilePath(filePath);
this.blockOffset = blockOffset;
this.segmentId = segmentId;
@@ -84,7 +85,7 @@ public class TableBlockInfo implements Distributable, Serializable {
* @param blockletInfos
*/
public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
- long blockLength, BlockletInfos blockletInfos, short version) {
+ long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version) {
this.filePath = FileFactory.getUpdatedFilePath(filePath);
this.blockOffset = blockOffset;
this.segmentId = segmentId;
@@ -259,11 +260,11 @@ public class TableBlockInfo implements Distributable, Serializable {
this.blockletInfos = blockletInfos;
}
- public short getVersion() {
+ public ColumnarFormatVersion getVersion() {
return version;
}
- public void setVersion(short version) {
+ public void setVersion(ColumnarFormatVersion version) {
this.version = version;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
index 08a1869..9bf7e62 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.carbondata.core.carbon.datastore.chunk.reader;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
@@ -60,15 +61,17 @@ public class CarbonDataReaderFactory {
* @param filePath carbon data file path
* @return dimension column data reader based on version number
*/
- public DimensionColumnChunkReader getDimensionColumnChunkReader(short version,
+ public DimensionColumnChunkReader getDimensionColumnChunkReader(ColumnarFormatVersion version,
BlockletInfo blockletInfo, int[] eachColumnValueSize, String filePath) {
switch (version) {
- case 2:
+ case V2:
return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, eachColumnValueSize,
filePath);
- default:
+ case V1:
return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, eachColumnValueSize,
filePath);
+ default:
+ throw new IllegalArgumentException("invalid format version: " + version);
}
}
@@ -80,13 +83,15 @@ public class CarbonDataReaderFactory {
* @param filePath carbon data file path
* @return measure column data reader based on version number
*/
- public MeasureColumnChunkReader getMeasureColumnChunkReader(short version,
+ public MeasureColumnChunkReader getMeasureColumnChunkReader(ColumnarFormatVersion version,
BlockletInfo blockletInfo, String filePath) {
switch (version) {
- case 2:
+ case V2:
return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, filePath);
- default:
+ case V1:
return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, filePath);
+ default:
+ throw new IllegalArgumentException("invalid format version: " + version);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
index be235ba..a82bac9 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.carbon.metadata.blocklet;
import java.io.Serializable;
import java.util.List;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
@@ -38,7 +39,7 @@ public class DataFileFooter implements Serializable {
/**
* version used for data compatibility
*/
- private short versionId;
+ private ColumnarFormatVersion versionId;
/**
* total number of rows in this file
@@ -73,14 +74,14 @@ public class DataFileFooter implements Serializable {
/**
* @return the versionId
*/
- public short getVersionId() {
+ public ColumnarFormatVersion getVersionId() {
return versionId;
}
/**
* @param versionId the versionId to set
*/
- public void setVersionId(short versionId) {
+ public void setVersionId(ColumnarFormatVersion versionId) {
this.versionId = versionId;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 443c8c4..1ac2ba1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -900,7 +900,7 @@ public final class CarbonCommonConstants {
/**
* current data file version
*/
- public static final short CARBON_DATA_FILE_DEFAULT_VERSION = 2;
+ public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V2";
/**
* number of column data will read in IO operation
* during query execution
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index db9c9be..7f50c34 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -99,7 +100,8 @@ public abstract class AbstractDataFileFooterConverter {
dataFileFooter = new DataFileFooter();
TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++);
tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
- tableBlockInfo.setVersion((short) readIndexHeader.getVersion());
+ tableBlockInfo.setVersion(
+ ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion()));
int blockletSize = getBlockletSize(readBlockIndexInfo);
tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
dataFileFooter.setBlockletIndex(blockletIndex);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 4f8a435..bdd6fae 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
import org.apache.carbondata.core.carbon.metadata.index.BlockIndexInfo;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -97,10 +98,9 @@ public class CarbonMetadataUtil {
SegmentInfo segmentInfo = new SegmentInfo();
segmentInfo.setNum_cols(columnSchemaList.size());
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
- short version = Short.parseShort(
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+ ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
FileFooter footer = new FileFooter();
- footer.setVersion(version);
+ footer.setVersion(version.number());
footer.setNum_rows(getTotalNumberOfRows(infoList));
footer.setSegment_info(segmentInfo);
footer.setTable_columns(columnSchemaList);
@@ -476,9 +476,8 @@ public class CarbonMetadataUtil {
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
// create index header object
IndexHeader indexHeader = new IndexHeader();
- short version = Short.parseShort(
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
- indexHeader.setVersion(version);
+ ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+ indexHeader.setVersion(version.number());
// set the segment info
indexHeader.setSegment_info(segmentInfo);
// set the column names
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index adb0e6a..f4ec63d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -27,6 +27,7 @@ import java.util.Properties;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
public final class CarbonProperties {
@@ -263,27 +264,25 @@ public final class CarbonProperties {
* if parameter is invalid current version will be set
*/
private void validateCarbonDataFileVersion() {
- short carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
String carbondataFileVersionString =
carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
- try {
- carbondataFileVersion = Short.parseShort(carbondataFileVersionString);
- } catch (NumberFormatException e) {
- carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
- LOGGER.info("Current Data file version property is invalid \"" + carbondataFileVersionString
- + "\" is invalid. Using the Current data file version value \"" + carbondataFileVersion);
- carbonProperties
- .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, carbondataFileVersion + "");
- }
- if (carbondataFileVersion > CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
- || carbondataFileVersion < 0) {
- LOGGER.info("Current Data file version property is invalid \"" + carbondataFileVersionString
- + "\" is invalid. Using the Current data file version value \"" + carbondataFileVersion);
- carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+ if (carbondataFileVersionString == null) {
+ // use default property if user does not specify version property
carbonProperties
- .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, carbondataFileVersion + "");
+ .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+ CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+ } else {
+ try {
+ ColumnarFormatVersion.valueOf(carbondataFileVersionString);
+ } catch (IllegalArgumentException e) {
+ // use default property if user specifies an invalid version property
+ LOGGER.warn("Specified file version property is invalid: " +
+ carbondataFileVersionString + ". Using " +
+ CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + " as default file version");
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+ CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+ }
}
-
}
/**
@@ -362,7 +361,23 @@ public final class CarbonProperties {
*/
public void addProperty(String key, String value) {
carbonProperties.setProperty(key, value);
+ }
+
+ private ColumnarFormatVersion getDefaultFormatVersion() {
+ return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+ }
+ public ColumnarFormatVersion getFormatVersion() {
+ String versionStr = getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
+ if (versionStr == null) {
+ return getDefaultFormatVersion();
+ } else {
+ try {
+ return ColumnarFormatVersion.valueOf(versionStr);
+ } catch (IllegalArgumentException e) {
+ return getDefaultFormatVersion();
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 162e9b9..41594ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1056,11 +1056,6 @@ public final class CarbonUtil {
/**
* Below method will be used to read the data file matadata
- *
- * @param filePath file path
- * @param blockOffset offset in the file
- * @return Data file metadata instance
- * @throws CarbonUtilException
*/
public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo)
throws CarbonUtilException {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index ea1324e..e766e85 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -55,7 +56,7 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
CarbonFooterReader reader =
new CarbonFooterReader(tableBlockInfo.getFilePath(), actualFooterOffset);
FileFooter footer = reader.readFooter();
- dataFileFooter.setVersionId((short) footer.getVersion());
+ dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
dataFileFooter.setNumberOfRows(footer.getNum_rows());
dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index d971756..02de383 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -46,7 +47,7 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
CarbonFooterReader reader =
new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset());
FileFooter footer = reader.readFooter();
- dataFileFooter.setVersionId((short) footer.getVersion());
+ dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
dataFileFooter.setNumberOfRows(footer.getNum_rows());
dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
index a079ad7..175a22b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.carbondata.core.util;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
/**
* Factory class to get the thrift reader object based on version
@@ -49,15 +50,18 @@ public class DataFileFooterConverterFactory {
/**
* Method will be used to get the file footer converter instance based on version
*
- * @param versionNumber
+ * @param version
* @return footer reader instance
*/
- public AbstractDataFileFooterConverter getDataFileFooterConverter(final short versionNumber) {
- switch (versionNumber) {
- case 2:
+ public AbstractDataFileFooterConverter getDataFileFooterConverter(
+ final ColumnarFormatVersion version) {
+ switch (version) {
+ case V2:
return new DataFileFooterConverter2();
- default:
+ case V1:
return new DataFileFooterConverter();
+ default:
+ throw new IllegalArgumentException("invalid format version: " + version);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
index eabf688..6d90a36 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.carbondata.core.carbon.datastore.block;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -28,7 +29,7 @@ public class BlockInfoTest {
static BlockInfo blockInfo;
@BeforeClass public static void setup() {
- blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6,(short)1));
+ blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1));
}
@Test public void hashCodeTest() {
@@ -44,7 +45,7 @@ public class BlockInfoTest {
@Test public void equalsTestWithSimilarObject() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, (short)1));
+ new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1));
Boolean res = blockInfo.equals(blockInfoTest);
assert (res);
}
@@ -61,28 +62,28 @@ public class BlockInfoTest {
@Test public void equalsTestWithDifferentSegmentId() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 6,(short)1));
+ new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1));
Boolean res = blockInfo.equals(blockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDifferentOffset() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, (short)1));
+ new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, ColumnarFormatVersion.V1));
Boolean res = blockInfo.equals(blockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDifferentBlockLength() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, (short)1));
+ new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, ColumnarFormatVersion.V1));
Boolean res = blockInfo.equals(blockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDiffFilePath() {
BlockInfo blockInfoTest =
- new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 62, (short)1));
+ new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 62, ColumnarFormatVersion.V1));
Boolean res = blockInfoTest.equals(blockInfo);
assert (!res);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
index 1b49f83..b6669ed 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.carbon.datastore.block;
import mockit.Mock;
import mockit.MockUp;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.path.CarbonTablePath;
import org.junit.BeforeClass;
@@ -34,8 +35,8 @@ public class TableBlockInfoTest {
static TableBlockInfo tableBlockInfos;
@BeforeClass public static void setup() {
- tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, (short) 1);
- tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), (short) 1);
+ tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1);
+ tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
}
@Test public void equalTestWithSameObject() {
@@ -44,7 +45,7 @@ public class TableBlockInfoTest {
}
@Test public void equalTestWithSimilarObject() {
- TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, (short) 1);
+ TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1);
Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
assert (res);
}
@@ -60,52 +61,52 @@ public class TableBlockInfoTest {
}
@Test public void equlsTestWithDiffSegmentId() {
- TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, (short) 1);
+ TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, ColumnarFormatVersion.V1);
Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void equlsTestWithDiffBlockOffset() {
- TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, (short) 1);
+ TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1);
Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDiffBlockLength() {
- TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, (short) 1);
+ TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, ColumnarFormatVersion.V1);
Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDiffBlockletNumber() {
TableBlockInfo tableBlockInfoTest =
- new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+ new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void equalsTestWithDiffFilePath() {
TableBlockInfo tableBlockInfoTest =
- new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+ new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
assert (!res);
}
@Test public void compareToTestForSegmentId() {
TableBlockInfo tableBlockInfo =
- new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+ new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
int res = tableBlockInfos.compareTo(tableBlockInfo);
int expectedResult = 2;
assertEquals(res, expectedResult);
TableBlockInfo tableBlockInfo1 =
- new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+ new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
int expectedResult1 = -1;
assertEquals(res1, expectedResult1);
TableBlockInfo tableBlockInfo2 =
- new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+ new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
int res2 = tableBlockInfos.compareTo(tableBlockInfo2);
int expectedresult2 = 1;
assertEquals(res2, expectedresult2);
@@ -130,18 +131,18 @@ public class TableBlockInfoTest {
};
- TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, (short) 1);
+ TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1);
int res = tableBlockInfos.compareTo(tableBlockInfo);
int expectedResult = -5;
assertEquals(res, expectedResult);
- TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, (short) 1);
+ TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1);
int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
int expectedResult1 = 1;
assertEquals(res1, expectedResult1);
TableBlockInfo tableBlockInfoTest =
- new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), (short) 1);
+ new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
int res2 = tableBlockInfos.compareTo(tableBlockInfoTest);
int expectedResult2 = -1;
assertEquals(res2, expectedResult2);
@@ -149,13 +150,13 @@ public class TableBlockInfoTest {
@Test public void compareToTestWithStartBlockletNo() {
TableBlockInfo tableBlockInfo =
- new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+ new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
int res = tableBlockInfos.compareTo(tableBlockInfo);
int expectedresult =-1;
assertEquals(res, expectedresult);
TableBlockInfo tableBlockInfo1 =
- new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), (short) 1);
+ new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), ColumnarFormatVersion.V1);
int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
int expectedresult1 = 1;
assertEquals(res1, expectedresult1);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
index 83b62a5..e9d09b8 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.carbondata.core.carbon.datastore.block;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -34,10 +35,10 @@ public class TableTaskInfoTest {
tableBlockInfoList = new ArrayList<>(5);
String[] locations = { "loc1", "loc2", "loc3" };
- tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, (short) 1));
+ tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1));
String[] locs = { "loc4", "loc5" };
- tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, (short) 1));
+ tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, ColumnarFormatVersion.V1));
tableTaskInfo = new TableTaskInfo("taskId", tableBlockInfoList);
}
@@ -68,10 +69,10 @@ public class TableTaskInfoTest {
List<TableBlockInfo> tableBlockInfoListTest = new ArrayList<>();
String[] locations = { "loc1", "loc2", "loc3" };
- tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, (short) 1));
+ tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1));
String[] locations1 = { "loc1", "loc2", "loc3" };
- tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, (short) 1));
+ tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, ColumnarFormatVersion.V1));
List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoListTest);
assert (res.equals(locs));
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index d959a5c..be270e4 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -166,7 +166,7 @@ public class CarbonMetadataUtilTest {
segmentInfo.setNum_cols(0);
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
IndexHeader indexHeader = new IndexHeader();
- indexHeader.setVersion(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+ indexHeader.setVersion(2);
indexHeader.setSegment_info(segmentInfo);
indexHeader.setTable_columns(columnSchemaList);
IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index c0d890c..869c8cc 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.util;
import mockit.Mock;
import mockit.MockUp;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
@@ -555,18 +556,18 @@ public class CarbonUtilTest {
@SuppressWarnings("unused") @Mock
public DataFileFooter readDataFileFooter(TableBlockInfo info) {
DataFileFooter fileFooter = new DataFileFooter();
- fileFooter.setVersionId((short)1);
+ fileFooter.setVersionId(ColumnarFormatVersion.V1);
return fileFooter;
}
};
- TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, (short)1);
+ TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
- assertEquals(CarbonUtil.readMetadatFile(info).getVersionId(), 1);
+ assertEquals(CarbonUtil.readMetadatFile(info).getVersionId().number(), 1);
}
@Test(expected = CarbonUtilException.class) public void testToReadMetadatFileWithException()
throws Exception {
- TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, (short)1);
+ TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
CarbonUtil.readMetadatFile(info);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index 62d1ac7..b7c48d7 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -22,6 +22,7 @@ package org.apache.carbondata.core.util;
import mockit.Mock;
import mockit.MockUp;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
@@ -152,7 +153,7 @@ public class DataFileFooterConverterTest {
}
};
String[] arr = { "a", "b", "c" };
- TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 3, (short) 1);
+ TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 3, ColumnarFormatVersion.V1);
tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3);
List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
tableBlockInfoList.add(tableBlockInfo);
@@ -254,7 +255,7 @@ public class DataFileFooterConverterTest {
segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols());
dataFileFooter.setNumberOfRows(3);
dataFileFooter.setSegmentInfo(segmentInfo);
- TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, (short)1);
+ TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info);
assertEquals(result.getNumberOfRows(), 3);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 9102c78..75fdd1c 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -26,7 +26,7 @@ object CarbonExample {
def main(args: Array[String]): Unit = {
// to run the example, plz change this path to your local machine path
- val rootPath = "/Users/wangfei/code/incubator-carbondata"
+ val rootPath = "/Users/jackylk/code/incubator-carbondata"
val spark = SparkSession
.builder()
.master("local")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 8b453c7..e707c4e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.DataRefNode;
import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
import org.apache.carbondata.core.carbon.datastore.IndexKey;
@@ -254,8 +255,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
if (segmentId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
continue;
}
- carbonSplits.add(CarbonInputSplit
- .from(segmentId, fileSplit, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION));
+ carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit,
+ ColumnarFormatVersion.valueOf(
+ CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION)));
}
return carbonSplits;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index efc4f77..8b87cad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -25,11 +25,12 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
import org.apache.carbondata.core.carbon.datastore.block.Distributable;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.hadoop.internal.index.Block;
import org.apache.hadoop.fs.Path;
@@ -55,17 +56,18 @@ public class CarbonInputSplit extends FileSplit
*/
private int numberOfBlocklets;
- private short version = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+ private ColumnarFormatVersion version;
public CarbonInputSplit() {
segmentId = null;
taskId = "0";
numberOfBlocklets = 0;
invalidSegments = new ArrayList<>();
+ version = CarbonProperties.getInstance().getFormatVersion();
}
private CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
- short version) {
+ ColumnarFormatVersion version) {
super(path, start, length, locations);
this.segmentId = segmentId;
this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -74,12 +76,13 @@ public class CarbonInputSplit extends FileSplit
}
public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
- int numberOfBlocklets, short version) {
+ int numberOfBlocklets, ColumnarFormatVersion version) {
this(segmentId, path, start, length, locations, version);
this.numberOfBlocklets = numberOfBlocklets;
}
- public static CarbonInputSplit from(String segmentId, FileSplit split, short version)
+ public static CarbonInputSplit from(String segmentId, FileSplit split,
+ ColumnarFormatVersion version)
throws IOException {
return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
split.getLocations(), version);
@@ -120,7 +123,7 @@ public class CarbonInputSplit extends FileSplit
@Override public void readFields(DataInput in) throws IOException {
super.readFields(in);
this.segmentId = in.readUTF();
- this.version = in.readShort();
+ this.version = ColumnarFormatVersion.valueOf(in.readShort());
int numInvalidSegment = in.readInt();
invalidSegments = new ArrayList<>(numInvalidSegment);
for (int i = 0; i < numInvalidSegment; i++) {
@@ -131,7 +134,7 @@ public class CarbonInputSplit extends FileSplit
@Override public void write(DataOutput out) throws IOException {
super.write(out);
out.writeUTF(segmentId);
- out.writeShort(version);
+ out.writeShort(version.number());
out.writeInt(invalidSegments.size());
for (String invalidSegment : invalidSegments) {
out.writeUTF(invalidSegment);
@@ -155,11 +158,11 @@ public class CarbonInputSplit extends FileSplit
return numberOfBlocklets;
}
- public short getVersion() {
+ public ColumnarFormatVersion getVersion() {
return version;
}
- public void setVersion(short version) {
+ public void setVersion(ColumnarFormatVersion version) {
this.version = version;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1801408..e5eb78a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier, ColumnarFormatVersion}
import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -801,14 +801,13 @@ object CarbonDataRDDFactory {
val jobContext = new Job(hadoopConfiguration)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
- val blockList = rawSplits.map(inputSplit => {
+ val blockList = rawSplits.map { inputSplit =>
val fileSplit = inputSplit.asInstanceOf[FileSplit]
new TableBlockInfo(fileSplit.getPath.toString,
fileSplit.getStart, "1",
- fileSplit.getLocations, fileSplit.getLength, 0
+ fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1
).asInstanceOf[Distributable]
}
- )
// group blocks to nodes, tasks
val startTime = System.currentTimeMillis
val activeNodes = DistributionUtil
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
index 431180c..749a6e8 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
*/
class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1");
sql("drop table if exists OldFormatTable")
sql("drop table if exists OldFormatTableHIVE")
sql("""
@@ -47,14 +47,14 @@ class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
name String, phonetype String, serialname String, salary Int)
row format delimited fields terminated by ','
""")
- sql("LOAD DATA local inpath './src/test/resources/OLDFORMATTABLE.csv' INTO table OldFormatTable");
+ sql("LOAD DATA local inpath './src/test/resources/OLDFORMATTABLE.csv' INTO table OldFormatTable")
sql(s"""
LOAD DATA LOCAL INPATH './src/test/resources/OLDFORMATTABLEHIVE.csv' into table OldFormatTableHIVE
""")
}
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "2");
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V2")
test("Test select * query") {
checkAnswer(
sql("select * from OldFormatTable"), sql("select * from OldFormatTableHIVE")
@@ -62,7 +62,7 @@ class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
}
override def afterAll {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1")
sql("drop table if exists OldFormatTable")
sql("drop table if exists OldFormatTableHIVE")
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cfae186..21c0fa7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier, ColumnarFormatVersion}
import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
@@ -810,14 +810,13 @@ object CarbonDataRDDFactory {
}
val jobContext = new Job(hadoopConfiguration)
val rawSplits = inputFormat.getSplits(jobContext).toArray
- val blockList = rawSplits.map(inputSplit => {
+ val blockList = rawSplits.map { inputSplit =>
val fileSplit = inputSplit.asInstanceOf[FileSplit]
new TableBlockInfo(fileSplit.getPath.toString,
fileSplit.getStart, "1",
- fileSplit.getLocations, fileSplit.getLength
+ fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1
).asInstanceOf[Distributable]
}
- )
// group blocks to nodes, tasks
val startTime = System.currentTimeMillis
val activeNodes = DistributionUtil
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 2fbb00e..047ac0d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -16,12 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.carbondata.processing.store;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImpl2;
-import org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImplForIntIndexAndAggBlock;
+import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
+import org.apache.carbondata.processing.store.writer.v2.CarbonFactDataWriterImplV2;
/**
* Factory class to get the writer instance
@@ -57,13 +59,15 @@ public class CarbonDataWriterFactory {
* @param carbonDataWriterVo writer vo object
* @return writer instance
*/
- public CarbonFactDataWriter<?> getFactDataWriter(final short version,
+ public CarbonFactDataWriter<?> getFactDataWriter(final ColumnarFormatVersion version,
final CarbonDataWriterVo carbonDataWriterVo) {
switch (version) {
- case 2:
- return new CarbonFactDataWriterImpl2(carbonDataWriterVo);
+ case V2:
+ return new CarbonFactDataWriterImplV2(carbonDataWriterVo);
+ case V1:
+ return new CarbonFactDataWriterImplV1(carbonDataWriterVo);
default:
- return new CarbonFactDataWriterImplForIntIndexAndAggBlock(carbonDataWriterVo);
+ throw new IllegalArgumentException("invalid format version: " + version);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index e560784..c961700 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -1400,8 +1401,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @return data writer instance
*/
private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
- short version = Short.parseShort(
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+ ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
return CarbonDataWriterFactory.getInstance()
.getFactDataWriter(version, getDataWriterVo(keyBlockSize));
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
deleted file mode 100644
index d399280..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.store.writer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.writer.CarbonFooterWriter;
-import org.apache.carbondata.format.DataChunk2;
-import org.apache.carbondata.format.FileFooter;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-
-/**
- * Below method will be used to write the data in version 2 format
- */
-public class CarbonFactDataWriterImpl2 extends CarbonFactDataWriterImplForIntIndexAndAggBlock {
-
- /**
- * logger
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonFactDataWriterImpl2.class.getName());
-
- /**
- * Constructor create instance of this class
- *
- * @param dataWriterVo
- */
- public CarbonFactDataWriterImpl2(CarbonDataWriterVo dataWriterVo) {
- super(dataWriterVo);
- }
-
- /**
- * Below method will be used to write the data to carbon data file
- *
- * @param holder
- * @throws CarbonDataWriterException any problem in writing operation
- */
- @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
- // size to calculate the size of the blocklet
- int size = 0;
- // get the blocklet info object
- BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
-
- List<DataChunk2> datachunks = null;
- try {
- // get all the data chunks
- datachunks = CarbonMetadataUtil
- .getDatachunk2(blockletInfo, thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while getting the data chunks", e);
- }
- // data chunk byte array
- byte[][] dataChunkByteArray = new byte[datachunks.size()][];
- for (int i = 0; i < dataChunkByteArray.length; i++) {
- dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i));
- // add the data chunk size
- size += dataChunkByteArray[i].length;
- }
- // add row id index length
- for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
- size += holder.getKeyBlockIndexLength()[i];
- }
- // add rle index length
- for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
- size += holder.getDataIndexMapLength()[i];
- }
- // add dimension column data page and measure column data page size
- long blockletDataSize =
- holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
- // if size of the file already reached threshold size then create a new file and get the file
- // channel object
- updateBlockletFileChannel(blockletDataSize);
- // writer the version header in the file if current file size is zero
- // this is done so carbondata file can be read separately
- try {
- if (fileChannel.size() == 0) {
- short version = Short.parseShort(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
- byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes();
- ByteBuffer buffer = ByteBuffer.allocate(header.length);
- buffer.put(header);
- buffer.rewind();
- fileChannel.write(buffer);
- }
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while getting the file channel size", e);
- }
- // write data to file and get its offset
- writeDataToFile(holder, dataChunkByteArray, fileChannel);
- // add blocklet info to list
- blockletInfoList.add(blockletInfo);
- LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
- }
-
- /**
- * Below method will be used to write the data to file
- * Data Format
- * <DColumn1DataChunk><DColumnDataPage><DColumnRle>
- * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle>
- * <DColumn3DataChunk><DColumn3DataPage><column3RowIds>
- * <MColumn1DataChunk><MColumn1DataPage>
- * <MColumn2DataChunk><MColumn2DataPage>
- * <MColumn2DataChunk><MColumn2DataPage>
- *
- * @param nodeHolder
- * @param dataChunksBytes
- * @param channel
- * @throws CarbonDataWriterException
- */
- private void writeDataToFile(NodeHolder nodeHolder, byte[][] dataChunksBytes, FileChannel channel)
- throws CarbonDataWriterException {
- long offset = 0;
- try {
- offset = channel.size();
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while getting the file channel size");
- }
- List<Long> currentDataChunksOffset = new ArrayList<>();
- List<Short> currentDataChunksLength = new ArrayList<>();
- dataChunksLength.add(currentDataChunksLength);
- dataChunksOffsets.add(currentDataChunksOffset);
- int bufferSize = 0;
- int rowIdIndex = 0;
- int rleIndex = 0;
- for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
- currentDataChunksOffset.add(offset);
- currentDataChunksLength.add((short) dataChunksBytes[i].length);
- bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] + (!nodeHolder
- .getIsSortedKeyBlock()[i] ? nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
- dataWriterVo.getAggBlocks()[i] ?
- nodeHolder.getCompressedDataIndex()[rleIndex].length :
- 0);
- offset += dataChunksBytes[i].length;
- offset += nodeHolder.getKeyLengths()[i];
- if (!nodeHolder.getIsSortedKeyBlock()[i]) {
- offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex];
- rowIdIndex++;
- }
- if (dataWriterVo.getAggBlocks()[i]) {
- offset += nodeHolder.getDataIndexMapLength()[rleIndex];
- rleIndex++;
- }
- }
- ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- rleIndex = 0;
- rowIdIndex = 0;
- for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
- buffer.put(dataChunksBytes[i]);
- buffer.put(nodeHolder.getKeyArray()[i]);
- if (!nodeHolder.getIsSortedKeyBlock()[i]) {
- buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
- buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
- if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
- buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
- }
- rowIdIndex++;
- }
- if (dataWriterVo.getAggBlocks()[i]) {
- buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
- rleIndex++;
- }
- }
- try {
- buffer.flip();
- channel.write(buffer);
- } catch (IOException e) {
- throw new CarbonDataWriterException(
- "Problem while writing the dimension data in carbon data file", e);
- }
-
- int dataChunkIndex = nodeHolder.getKeyArray().length;
- int totalLength = 0;
- for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
- currentDataChunksOffset.add(offset);
- currentDataChunksLength.add((short) dataChunksBytes[dataChunkIndex].length);
- offset += dataChunksBytes[dataChunkIndex].length;
- offset += nodeHolder.getDataArray()[i].length;
- totalLength += dataChunksBytes[dataChunkIndex].length;
- totalLength += nodeHolder.getDataArray()[i].length;
- dataChunkIndex++;
- }
- buffer = ByteBuffer.allocate(totalLength);
- dataChunkIndex = nodeHolder.getKeyArray().length;
- for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
- buffer.put(dataChunksBytes[dataChunkIndex++]);
- buffer.put(nodeHolder.getDataArray()[i]);
- }
- try {
- buffer.flip();
- channel.write(buffer);
- } catch (IOException e) {
- throw new CarbonDataWriterException(
- "Problem while writing the measure data in carbon data file", e);
- }
- }
-
- /**
- * This method will be used to get the blocklet metadata
- *
- * @return BlockletInfo - blocklet metadata
- */
- protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
- // create the info object for leaf entry
- BlockletInfoColumnar info = new BlockletInfoColumnar();
- //add aggBlocks array
- info.setAggKeyBlock(nodeHolder.getAggBlocks());
- // add total entry count
- info.setNumberOfKeys(nodeHolder.getEntryCount());
-
- // add the key array length
- info.setKeyLengths(nodeHolder.getKeyLengths());
- // adding null measure index bit set
- info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
- //add column min max length
- info.setColumnMaxData(nodeHolder.getColumnMaxData());
- info.setColumnMinData(nodeHolder.getColumnMinData());
-
- // add measure length
- info.setMeasureLength(nodeHolder.getMeasureLenght());
-
- info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
- info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
- info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
- // set startkey
- info.setStartKey(nodeHolder.getStartKey());
- // set end key
- info.setEndKey(nodeHolder.getEndKey());
- info.setCompressionModel(nodeHolder.getCompressionModel());
- // return leaf metadata
-
- //colGroup Blocks
- info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
-
- return info;
- }
-
- /**
- * This method will write metadata at the end of file file format in thrift format
- */
- protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
- String filePath) throws CarbonDataWriterException {
- try {
- // get the current file position
- long currentPosition = channel.size();
- CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
- // get thrift file footer instance
- FileFooter convertFileMeta = CarbonMetadataUtil
- .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList,
- dataChunksOffsets, dataChunksLength);
- // fill the carbon index details
- fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
- // write the footer
- writer.writeFooter(convertFileMeta, currentPosition);
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
- }
- }
-}