You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/26 14:54:31 UTC
[3/4] incubator-carbondata git commit: Added V3 Format Writer and
Reader Code
Added V3 Format Writer and Reader Code
Added code to support V3 Writer + Reader
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2cf1104d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2cf1104d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2cf1104d
Branch: refs/heads/master
Commit: 2cf1104db43a5591fe2bcabb97ba02202428132a
Parents: 922683e
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Feb 23 16:44:41 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Feb 26 22:42:32 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 38 +-
.../constants/CarbonV3DataFormatConstants.java | 84 ++++
.../datastore/chunk/AbstractRawColumnChunk.java | 11 +
.../chunk/reader/CarbonDataReaderFactory.java | 14 +-
.../AbstractChunkReaderV2V3Format.java | 126 +++++
...mpressedDimensionChunkFileBasedReaderV2.java | 127 ++---
...mpressedDimensionChunkFileBasedReaderV3.java | 268 ++++++++++
.../AbstractMeasureChunkReaderV2V3Format.java | 124 +++++
...CompressedMeasureChunkFileBasedReaderV2.java | 106 +---
...CompressedMeasureChunkFileBasedReaderV3.java | 239 +++++++++
.../SafeFixedLengthDimensionDataChunkStore.java | 9 +-
.../columnar/BlockIndexerStorageForShort.java | 228 +++++++++
.../columnar/ColumnWithShortIndex.java | 76 +++
.../ColumnWithShortIndexForNoDictionay.java | 46 ++
.../core/metadata/ColumnarFormatVersion.java | 9 +-
.../executor/impl/AbstractQueryExecutor.java | 10 +-
.../IncludeColGroupFilterExecuterImpl.java | 24 +
...velRangeLessThanEqualFilterExecuterImpl.java | 14 +-
.../RowLevelRangeLessThanFiterExecuterImpl.java | 14 +-
.../carbondata/core/util/BitSetGroup.java | 6 +-
.../core/util/CarbonMetadataUtil.java | 347 ++++++++++++-
.../carbondata/core/util/CarbonProperties.java | 110 +++-
.../apache/carbondata/core/util/CarbonUtil.java | 110 ++++
.../util/DataFileFooterConverterFactory.java | 5 +-
.../core/util/DataFileFooterConverterV3.java | 141 ++++++
.../apache/carbondata/core/util/NodeHolder.java | 430 ++++++++++++++++
.../core/util/CarbonMetadataUtilTest.java | 2 +-
format/src/main/thrift/carbondata.thrift | 27 +-
.../store/CarbonDataWriterFactory.java | 7 +-
.../store/CarbonFactDataHandlerColumnar.java | 56 ++-
.../store/writer/AbstractFactDataWriter.java | 54 +-
.../store/writer/CarbonFactDataWriter.java | 1 +
.../processing/store/writer/NodeHolder.java | 410 ---------------
.../writer/v1/CarbonFactDataWriterImplV1.java | 9 +-
.../writer/v2/CarbonFactDataWriterImplV2.java | 8 +-
.../writer/v3/CarbonFactDataWriterImplV3.java | 499 +++++++++++++++++++
.../store/writer/v3/DataWriterHolder.java | 68 +++
37 files changed, 3114 insertions(+), 743 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1142c4e..146b78e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -83,7 +83,7 @@ public final class CarbonCommonConstants {
/**
* min blocklet size
*/
- public static final int BLOCKLET_SIZE_MIN_VAL = 50;
+ public static final int BLOCKLET_SIZE_MIN_VAL = 2000;
/**
* max blocklet size
*/
@@ -791,7 +791,7 @@ public final class CarbonCommonConstants {
public static final String CARBON_MERGE_SORT_PREFETCH_DEFAULT = "true";
/**
- * default name of data base
+ * default name of data base
*/
public static final String DATABASE_DEFAULT_NAME = "default";
@@ -808,8 +808,7 @@ public final class CarbonCommonConstants {
/**
* this variable is to enable/disable identify high cardinality during first data loading
*/
- public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE =
- "high.cardinality.identify.enable";
+ public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE = "high.cardinality.identify.enable";
public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT = "true";
/**
@@ -843,26 +842,23 @@ public final class CarbonCommonConstants {
/**
* ZOOKEEPERLOCK TYPE
*/
- public static final String CARBON_LOCK_TYPE_ZOOKEEPER =
- "ZOOKEEPERLOCK";
+ public static final String CARBON_LOCK_TYPE_ZOOKEEPER = "ZOOKEEPERLOCK";
/**
* LOCALLOCK TYPE
*/
- public static final String CARBON_LOCK_TYPE_LOCAL =
- "LOCALLOCK";
+ public static final String CARBON_LOCK_TYPE_LOCAL = "LOCALLOCK";
/**
* HDFSLOCK TYPE
*/
- public static final String CARBON_LOCK_TYPE_HDFS =
- "HDFSLOCK";
+ public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK";
/**
* Invalid filter member log string
*/
- public static final String FILTER_INVALID_MEMBER = " Invalid Record(s) are present "
- + "while filter evaluation. ";
+ public static final String FILTER_INVALID_MEMBER =
+ " Invalid Record(s) are present while filter evaluation. ";
/**
* Number of unmerged segments to be merged.
@@ -880,25 +876,23 @@ public final class CarbonCommonConstants {
* Only accepted Range is 0 - 10000. Outside this range system will pick default value.
*/
public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
- "carbon.horizontal.update.compaction.threshold";
+ "carbon.horizontal.update.compaction.threshold";
/**
* Default count of segments which act as a threshold for IUD compaction merge.
*/
public static final String DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
-
/**
* Number of Delete Delta files which is the Threshold for IUD compaction.
* Only accepted Range is 0 - 10000. Outside this range system will pick default value.
*/
- public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
+ public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
"carbon.horizontal.delete.compaction.threshold";
/**
* Default count of segments which act as a threshold for IUD compaction merge.
*/
public static final String DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
-
/**
* default location of the carbon metastore db
*/
@@ -943,8 +937,7 @@ public final class CarbonCommonConstants {
* @Deprecated : This property has been deprecated.
* Property for enabling system level compaction lock.1 compaction can run at once.
*/
- public static String ENABLE_CONCURRENT_COMPACTION =
- "carbon.concurrent.compaction";
+ public static String ENABLE_CONCURRENT_COMPACTION = "carbon.concurrent.compaction";
/**
* Default value of Property for enabling system level compaction lock.1 compaction can run
@@ -1024,12 +1017,8 @@ public final class CarbonCommonConstants {
/**
* current data file version
*/
- public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V2";
- /**
- * number of column data will read in IO operation
- * during query execution
- */
- public static final short NUMBER_OF_COLUMN_READ_IN_IO = 10;
+ public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V3";
+
/**
* data file version header
*/
@@ -1105,7 +1094,6 @@ public final class CarbonCommonConstants {
/**
* Default carbon dictionary server port
-
*/
public static final String DICTIONARY_SERVER_PORT_DEFAULT = "2030";
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
new file mode 100644
index 0000000..060b55c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.constants;
+
+/**
+ * Constants for V3 data format
+ */
+public interface CarbonV3DataFormatConstants {
+
+ /**
+ * number of page per blocklet column
+ */
+ String NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN = "carbon.number.of.page.in.blocklet.column";
+
+ /**
+ * number of page per blocklet column default value
+ */
+ String NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE = "10";
+
+ /**
+ * number of page per blocklet column max value
+ */
+ short NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX = 20;
+
+ /**
+ * number of page per blocklet column min value
+ */
+ short NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN = 1;
+
+ /**
+ * number of column to be read in one IO in query
+ */
+ String NUMBER_OF_COLUMN_TO_READ_IN_IO = "number.of.column.to.read.in.io";
+
+ /**
+ * number of column to be read in one IO in query default value
+ */
+ String NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE = "10";
+
+ /**
+ * number of column to be read in one IO in query max value
+ */
+ short NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX = 20;
+
+ /**
+ * number of column to be read in one IO in query min value
+ */
+ short NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN = 1;
+
+ /**
+ * number of rows per blocklet column page
+ */
+ String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE = "number.of.rows.per.blocklet.column.page";
+
+ /**
+ * number of rows per blocklet column page default value
+ */
+ String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT = "32000";
+
+ /**
+ * number of rows per blocklet column page max value
+ */
+ short NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX = 32000;
+
+ /**
+ * number of rows per blocklet column page min value
+ */
+ short NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN = 8000;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
index d04077c..eebb382 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk;
import java.nio.ByteBuffer;
+import org.apache.carbondata.format.DataChunk3;
/**
* It contains group of uncompressed blocklets on one column.
@@ -44,6 +45,8 @@ public abstract class AbstractRawColumnChunk {
protected int length;
+ protected DataChunk3 dataChunkV3;
+
public AbstractRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length) {
this.blockletId = blockletId;
this.rawData = rawData;
@@ -121,4 +124,12 @@ public abstract class AbstractRawColumnChunk {
return length;
}
+ public DataChunk3 getDataChunkV3() {
+ return dataChunkV3;
+ }
+
+ public void setDataChunkV3(DataChunk3 dataChunkV3) {
+ this.dataChunkV3 = dataChunkV3;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
index e20fcbe..8fee760 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -18,8 +18,10 @@ package org.apache.carbondata.core.datastore.chunk.reader;
import org.apache.carbondata.core.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
import org.apache.carbondata.core.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.v3.CompressedDimensionChunkFileBasedReaderV3;
import org.apache.carbondata.core.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
import org.apache.carbondata.core.datastore.chunk.reader.measure.v2.CompressedMeasureChunkFileBasedReaderV2;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.v3.CompressedMeasureChunkFileBasedReaderV3;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
@@ -65,9 +67,13 @@ public class CarbonDataReaderFactory {
case V1:
return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, eachColumnValueSize,
filePath);
- default:
+ case V2:
return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, eachColumnValueSize,
filePath);
+ case V3:
+ default:
+ return new CompressedDimensionChunkFileBasedReaderV3(blockletInfo, eachColumnValueSize,
+ filePath);
}
}
@@ -84,8 +90,12 @@ public class CarbonDataReaderFactory {
switch (version) {
case V1:
return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, filePath);
- default:
+ case V2:
return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, filePath);
+ case V3:
+ default:
+ return new CompressedMeasureChunkFileBasedReaderV3(blockletInfo, filePath);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
new file mode 100644
index 0000000..f083612
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Abstract class for V2, V3 format dimension column reader
+ */
+public abstract class AbstractChunkReaderV2V3Format extends AbstractChunkReader {
+
+ /**
+ * dimension chunks offset
+ */
+ protected List<Long> dimensionChunksOffset;
+
+ /**
+ * dimension chunks length
+ */
+ protected List<Integer> dimensionChunksLength;
+
+ public AbstractChunkReaderV2V3Format(final BlockletInfo blockletInfo,
+ final int[] eachColumnValueSize, final String filePath) {
+ super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
+ dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
+ dimensionChunksLength = blockletInfo.getDimensionChunksLength();
+ }
+
+ /**
+ * Below method will be used to read the chunk based on block indexes
+ * Reading logic of below method is:
+ * Except last column all the column chunk can be read in group
+ * if not last column then read data of all the column present in block index
+ * together then process it.
+ * For last column read is separately and process
+ *
+ * @param fileReader file reader to read the blocks from file
+ * @param blockletIndexes blocks range to be read
+ * @return dimension column chunks
+ */
+ @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
+ final int[][] blockletIndexes) throws IOException {
+ // read the column chunk based on block index and add
+ DimensionRawColumnChunk[] dataChunks =
+ new DimensionRawColumnChunk[dimensionChunksOffset.size()];
+ // if blocklet index is empty then return empry data chunk
+ if (blockletIndexes.length == 0) {
+ return dataChunks;
+ }
+ DimensionRawColumnChunk[] groupChunk = null;
+ int index = 0;
+ // iterate till block indexes -1 as block index will be in sorted order, so to avoid
+ // the last column reading in group
+ for (int i = 0; i < blockletIndexes.length - 1; i++) {
+ index = 0;
+ groupChunk =
+ readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
+ for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
+ dataChunks[j] = groupChunk[index++];
+ }
+ }
+ // check last index is present in block index, if it is present then read separately
+ if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
+ dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
+ readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
+ }
+ // otherwise read the data in group
+ else {
+ groupChunk =
+ readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
+ blockletIndexes[blockletIndexes.length - 1][1]);
+ index = 0;
+ for (int j = blockletIndexes[blockletIndexes.length - 1][0];
+ j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
+ dataChunks[j] = groupChunk[index++];
+ }
+ }
+ return dataChunks;
+ }
+
+ /**
+ * Below method will be used to read measure chunk data in group.
+ * This method will be useful to avoid multiple IO while reading the
+ * data from
+ *
+ * @param fileReader file reader to read the data
+ * @param startColumnBlockletIndex first column blocklet index to be read
+ * @param endColumnBlockletIndex end column blocklet index to be read
+ * @return measure raw chunkArray
+ * @throws IOException
+ */
+ protected abstract DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+
+ /**
+ * Below method will be used to check whether particular encoding is present
+ * in the dimension or not
+ *
+ * @param encoding encoding to search
+ * @return if encoding is present in dimension
+ */
+ protected boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
+ return encodings.contains(encoding);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index 9d5849f..b2201cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v2;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
@@ -26,7 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataC
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -36,17 +35,7 @@ import org.apache.carbondata.format.Encoding;
/**
* Compressed dimension chunk reader class for version 2
*/
-public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReader {
-
- /**
- * dimension chunks offset
- */
- private List<Long> dimensionChunksOffset;
-
- /**
- * dimension chunks length
- */
- private List<Integer> dimensionChunksLength;
+public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReaderV2V3Format {
/**
* Constructor to get minimum parameter to create instance of this class
@@ -57,73 +46,18 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
*/
public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final int[] eachColumnValueSize, final String filePath) {
- super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
- this.dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
- this.dimensionChunksLength = blockletInfo.getDimensionChunksLength();
-
- }
-
- /**
- * Below method will be used to read the chunk based on block indexes
- * Reading logic of below method is:
- * Except last column all the column chunk can be read in group
- * if not last column then read data of all the column present in block index
- * together then process it.
- * For last column read is separately and process
- *
- * @param fileReader file reader to read the blocks from file
- * @param blockletIndexes blocks range to be read
- * @return dimension column chunks
- */
- @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
- final int[][] blockletIndexes) throws IOException {
- // read the column chunk based on block index and add
- DimensionRawColumnChunk[] dataChunks =
- new DimensionRawColumnChunk[dimensionChunksOffset.size()];
- // if blocklet index is empty then return empry data chunk
- if (blockletIndexes.length == 0) {
- return dataChunks;
- }
- DimensionRawColumnChunk[] groupChunk = null;
- int index = 0;
- // iterate till block indexes -1 as block index will be in sorted order, so to avoid
- // the last column reading in group
- for (int i = 0; i < blockletIndexes.length - 1; i++) {
- index = 0;
- groupChunk =
- readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
- for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
- dataChunks[j] = groupChunk[index++];
- }
- }
- // check last index is present in block index, if it is present then read separately
- if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
- dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
- readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
- }
- // otherwise read the data in group
- else {
- groupChunk =
- readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
- blockletIndexes[blockletIndexes.length - 1][1]);
- index = 0;
- for (int j = blockletIndexes[blockletIndexes.length - 1][0];
- j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
- dataChunks[j] = groupChunk[index++];
- }
- }
- return dataChunks;
+ super(blockletInfo, eachColumnValueSize, filePath);
}
/**
* Below method will be used to read the chunk based on block index
*
- * @param fileReader file reader to read the blocks from file
+ * @param fileReader file reader to read the blocks from file
* @param blockletIndex block to be read
* @return dimension column chunk
*/
- public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
- int blockletIndex) throws IOException {
+ public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int blockletIndex)
+ throws IOException {
int length = 0;
if (dimensionChunksOffset.size() - 1 == blockletIndex) {
// Incase of last block read only for datachunk and read remaining while converting it.
@@ -140,24 +74,35 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
new DimensionRawColumnChunk(blockletIndex, buffer, 0, length, this);
rawColumnChunk.setFileHolder(fileReader);
rawColumnChunk.setPagesCount(1);
- rawColumnChunk.setRowCount(new int[]{numberOfRows});
+ rawColumnChunk.setRowCount(new int[] { numberOfRows });
return rawColumnChunk;
}
- private DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
- int startBlockIndex, int endBlockIndex) throws IOException {
- long currentDimensionOffset = dimensionChunksOffset.get(startBlockIndex);
+ /**
+ * Below method will be used to read measure chunk data in group.
+ * This method will be useful to avoid multiple IO while reading the
+ * data from
+ *
+ * @param fileReader file reader to read the data
+ * @param startColumnBlockletIndex first column blocklet index to be read
+ * @param endColumnBlockletIndex end column blocklet index to be read
+ * @return measure raw chunkArray
+ * @throws IOException
+ */
+ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+ long currentDimensionOffset = dimensionChunksOffset.get(startColumnBlockletIndex);
ByteBuffer buffer = ByteBuffer.allocateDirect(
- (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+ (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
- (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+ (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
}
DimensionRawColumnChunk[] dataChunks =
- new DimensionRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+ new DimensionRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
int index = 0;
int runningLength = 0;
- for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+ for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
dataChunks[index] =
new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
@@ -181,8 +126,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
int blockIndex = dimensionRawColumnChunk.getBlockletId();
ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
if (dimensionChunksOffset.size() - 1 == blockIndex) {
- dimensionColumnChunk = CarbonUtil
- .readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
+ dimensionColumnChunk =
+ CarbonUtil.readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
int totalDimensionDataLength =
dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length
+ dimensionColumnChunk.rowid_page_length;
@@ -202,8 +147,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
rawData.position(copySourcePoint);
rawData.get(data);
// first read the data and uncompressed it
- dataPage =
- COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
+ dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
copySourcePoint += dimensionColumnChunk.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
@@ -223,8 +167,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
byte[] dataRle = new byte[dimensionColumnChunk.rle_page_length];
rawData.position(copySourcePoint);
rawData.get(dataRle);
- rlePage =
- numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
+ rlePage = numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
// uncompress the data with rle indexes
dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
}
@@ -250,16 +193,4 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
}
return columnDataChunk;
}
-
- /**
- * Below method will be used to check whether particular encoding is present
- * in the dimension or not
- *
- * @param encoding encoding to search
- * @return if encoding is present in dimension
- */
- private boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
- return encodings.contains(encoding);
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
new file mode 100644
index 0000000..acaa2fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.Encoding;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * Dimension column V3 Reader class which will be used to read and uncompress
+ * V3 format data
+ * data format
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ */
+public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkReaderV2V3Format {
+
+ /**
+ * end position of last dimension in carbon data file
+ */
+ private long lastDimensionOffsets;
+
+ public CompressedDimensionChunkFileBasedReaderV3(BlockletInfo blockletInfo,
+ int[] eachColumnValueSize, String filePath) {
+ super(blockletInfo, eachColumnValueSize, filePath);
+ lastDimensionOffsets = blockletInfo.getDimensionOffset();
+ }
+
+ /**
+ * Below method will be used to read the dimension column data form carbon data file
+ * Steps for reading
+ * 1. Get the length of the data to be read
+ * 2. Allocate the direct buffer
+ * 3. read the data from file
+ * 4. Get the data chunk object from data read
+ * 5. Create the raw chunk object and fill the details
+ *
+ * @param fileReader reader for reading the column from carbon data file
+ * @param blockletColumnIndex blocklet index of the column in carbon data file
+ * @return dimension raw chunk
+ */
+ public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
+ int blockletColumnIndex) throws IOException {
+ // get the current dimension offset
+ long currentDimensionOffset = dimensionChunksOffset.get(blockletColumnIndex);
+ int length = 0;
+ // to calculate the length of the data to be read
+ // column other than last column we can subtract the offset of current column with
+ // next column and get the total length.
+ // but for last column we need to use lastDimensionOffset which is the end position
+ // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
+ if (dimensionChunksOffset.size() - 1 == blockletColumnIndex) {
+ length = (int) (lastDimensionOffsets - currentDimensionOffset);
+ } else {
+ length = (int) (dimensionChunksOffset.get(blockletColumnIndex + 1) - currentDimensionOffset);
+ }
+ // allocate the buffer
+ ByteBuffer buffer = ByteBuffer.allocateDirect(length);
+ // read the data from carbon data file
+ synchronized (fileReader) {
+ fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset, length);
+ }
+ // get the data chunk which will have all the details about the data pages
+ DataChunk3 dataChunk = CarbonUtil.readDataChunk3(buffer, 0, length);
+ // creating a raw chunks instance and filling all the details
+ DimensionRawColumnChunk rawColumnChunk =
+ new DimensionRawColumnChunk(blockletColumnIndex, buffer, 0, length, this);
+ int numberOfPages = dataChunk.getPage_length().size();
+ byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+ byte[][] minValueOfEachPage = new byte[numberOfPages][];
+ int[] eachPageLength = new int[numberOfPages];
+ for (int i = 0; i < minValueOfEachPage.length; i++) {
+ maxValueOfEachPage[i] =
+ dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
+ minValueOfEachPage[i] =
+ dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
+ eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
+ }
+ rawColumnChunk.setDataChunkV3(dataChunk);
+ rawColumnChunk.setFileHolder(fileReader);
+ rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+ rawColumnChunk.setMaxValues(maxValueOfEachPage);
+ rawColumnChunk.setMinValues(minValueOfEachPage);
+ rawColumnChunk.setRowCount(eachPageLength);
+ rawColumnChunk.setLengths(ArrayUtils
+ .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+ rawColumnChunk.setOffsets(ArrayUtils
+ .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+ return rawColumnChunk;
+ }
+
+ /**
+ * Below method will be used to read the multiple dimension column data in group
+ * and divide into dimension raw chunk object
+ * Steps for reading
+ * 1. Get the length of the data to be read
+ * 2. Allocate the direct buffer
+ * 3. read the data from file
+ * 4. Get the data chunk object from file for each column
+ * 5. Create the raw chunk object and fill the details for each column
+ * 6. increment the offset of the data
+ *
+ * @param fileReader
+ * reader which will be used to read the dimension columns data from file
+ * @param startBlockletColumnIndex
+ * blocklet index of the first dimension column
+ * @param endBlockletColumnIndex
+ * blocklet index of the last dimension column
+ * @ DimensionRawColumnChunk array
+ */
+ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+ int startBlockletColumnIndex, int endBlockletColumnIndex) throws IOException {
+ // to calculate the length of the data to be read
+ // column we can subtract the offset of start column offset with
+ // end column+1 offset and get the total length.
+ long currentDimensionOffset = dimensionChunksOffset.get(startBlockletColumnIndex);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(
+ (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
+ // read the data from carbon data file
+ synchronized (fileReader) {
+ fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
+ (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
+ }
+ // create raw chunk for each dimension column
+ DimensionRawColumnChunk[] dimensionDataChunks =
+ new DimensionRawColumnChunk[endBlockletColumnIndex - startBlockletColumnIndex + 1];
+ int index = 0;
+ int runningLength = 0;
+ for (int i = startBlockletColumnIndex; i <= endBlockletColumnIndex; i++) {
+ int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
+ dimensionDataChunks[index] =
+ new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
+ DataChunk3 dataChunk =
+ CarbonUtil.readDataChunk3(buffer, runningLength, dimensionChunksLength.get(i));
+ int numberOfPages = dataChunk.getPage_length().size();
+ byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+ byte[][] minValueOfEachPage = new byte[numberOfPages][];
+ int[] eachPageLength = new int[numberOfPages];
+ for (int j = 0; j < minValueOfEachPage.length; j++) {
+ maxValueOfEachPage[j] =
+ dataChunk.getData_chunk_list().get(j).getMin_max().getMax_values().get(0).array();
+ minValueOfEachPage[j] =
+ dataChunk.getData_chunk_list().get(j).getMin_max().getMin_values().get(0).array();
+ eachPageLength[j] = dataChunk.getData_chunk_list().get(j).getNumberOfRowsInpage();
+ }
+ dimensionDataChunks[index].setDataChunkV3(dataChunk);
+ dimensionDataChunks[index].setFileHolder(fileReader);
+ dimensionDataChunks[index].setPagesCount(dataChunk.getPage_length().size());
+ dimensionDataChunks[index].setMaxValues(maxValueOfEachPage);
+ dimensionDataChunks[index].setMinValues(minValueOfEachPage);
+ dimensionDataChunks[index].setRowCount(eachPageLength);
+ dimensionDataChunks[index].setLengths(ArrayUtils
+ .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+ dimensionDataChunks[index].setOffsets(ArrayUtils
+ .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+ runningLength += currentLength;
+ index++;
+ }
+ return dimensionDataChunks;
+ }
+
+ /**
+ * Below method will be used to convert the compressed dimension chunk raw data to actual data
+ *
+ * @param dimensionRawColumnChunk dimension raw chunk
+ * @param page number
+ * @return DimensionColumnDataChunk
+ */
+ @Override public DimensionColumnDataChunk convertToDimensionChunk(
+ DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
+ byte[] dataPage = null;
+ int[] invertedIndexes = null;
+ int[] invertedIndexesReverse = null;
+ int[] rlePage = null;
+ // data chunk of page
+ DataChunk2 dimensionColumnChunk = null;
+ // data chunk of blocklet column
+ DataChunk3 dataChunk3 = dimensionRawColumnChunk.getDataChunkV3();
+ // get the data buffer
+ ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
+ dimensionColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+ // calculating the start point of data
+ // as buffer can contain multiple column data, start point will be datachunkoffset +
+ // data chunk length + page offset
+ int copySourcePoint = dimensionRawColumnChunk.getOffSet() + dimensionChunksLength
+ .get(dimensionRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+ byte[] data = new byte[dimensionColumnChunk.data_page_length];
+ rawData.position(copySourcePoint);
+ rawData.get(data);
+ // first read the data and uncompressed it
+ dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
+ copySourcePoint += dimensionColumnChunk.data_page_length;
+ // if row id block is present then read the row id chunk and uncompress it
+ if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+ invertedIndexes = CarbonUtil
+ .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, rawData,
+ copySourcePoint);
+ copySourcePoint += dimensionColumnChunk.rowid_page_length;
+ // get the reverse index
+ invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+ }
+ // if rle is applied then read the rle block chunk and then uncompress
+ //then actual data based on rle block
+ if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+ rlePage =
+ CarbonUtil.getIntArray(rawData, copySourcePoint, dimensionColumnChunk.rle_page_length);
+ // uncompress the data with rle indexes
+ dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage,
+ eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+ rlePage = null;
+ }
+ // fill chunk attributes
+ DimensionColumnDataChunk columnDataChunk = null;
+
+ if (dimensionColumnChunk.isRowMajor()) {
+ // to store fixed length column chunk values
+ columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage,
+ eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()],
+ dimensionRawColumnChunk.getRowCount()[pageNumber]);
+ }
+ // if no dictionary column then first create a no dictionary column chunk
+ // and set to data chunk instance
+ else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+ columnDataChunk =
+ new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+ dimensionRawColumnChunk.getRowCount()[pageNumber]);
+ } else {
+ // to store fixed length column chunk values
+ columnDataChunk =
+ new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+ dimensionRawColumnChunk.getRowCount()[pageNumber],
+ eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+ }
+ return columnDataChunk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
new file mode 100644
index 0000000..a94d08b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
+
+/**
+ * Abstract class for V2, V3 format measure column reader
+ */
+public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasureChunkReader {
+
+ /**
+ * measure column chunks offset
+ */
+ protected List<Long> measureColumnChunkOffsets;
+
+ /**
+ * measure column chunks length
+ */
+ protected List<Integer> measureColumnChunkLength;
+
+ public AbstractMeasureChunkReaderV2V3Format(final BlockletInfo blockletInfo,
+ final String filePath) {
+ super(filePath, blockletInfo.getNumberOfRows());
+ this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
+ this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
+ }
+
+ /**
+ * Below method will be used to read the chunk based on block indexes
+ * Reading logic of below method is: Except last column all the column chunk
+ * can be read in group if not last column then read data of all the column
+ * present in block index together then process it. For last column read is
+ * separately and process
+ *
+ * @param fileReader file reader to read the blocks from file
+ * @param blockIndexes blocks range to be read
+ * @return measure column chunks
+ * @throws IOException
+ */
+ public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+ throws IOException {
+ // read the column chunk based on block index and add
+ MeasureRawColumnChunk[] dataChunks =
+ new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
+ if (blockIndexes.length == 0) {
+ return dataChunks;
+ }
+ MeasureRawColumnChunk[] groupChunk = null;
+ int index = 0;
+ for (int i = 0; i < blockIndexes.length - 1; i++) {
+ index = 0;
+ groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+ for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+ dataChunks[j] = groupChunk[index++];
+ }
+ }
+ if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
+ dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
+ readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+ } else {
+ groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+ blockIndexes[blockIndexes.length - 1][1]);
+ index = 0;
+ for (int j = blockIndexes[blockIndexes.length - 1][0];
+ j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+ dataChunks[j] = groupChunk[index++];
+ }
+ }
+ return dataChunks;
+ }
+
+ /**
+ * Below method will be used to convert the thrift presence meta to wrapper
+ * presence meta
+ *
+ * @param presentMetadataThrift
+ * @return wrapper presence meta
+ */
+ protected PresenceMeta getPresenceMeta(
+ org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+ PresenceMeta presenceMeta = new PresenceMeta();
+ presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
+ presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
+ .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
+ return presenceMeta;
+ }
+
+ /**
+ * Below method will be used to read measure chunk data in group.
+ * This method will be useful to avoid multiple IO while reading the
+ * data from
+ *
+ * @param fileReader file reader to read the data
+ * @param startColumnBlockletIndex first column blocklet index to be read
+ * @param endColumnBlockletIndex end column blocklet index to be read
+ * @return measure raw chunkArray
+ * @throws IOException
+ */
+ protected abstract MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 7ac1578..7b6acee 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -19,37 +19,24 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
/**
* Class to read the measure column data for version 2
*/
-public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReader {
-
- /**
- * measure column chunks offset
- */
- private List<Long> measureColumnChunkOffsets;
-
- /**
- * measure column chunks length
- */
- private List<Integer> measureColumnChunkLength;
+public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReaderV2V3Format {
/**
* Constructor to get minimum parameter to create instance of this class
@@ -59,69 +46,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
*/
public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final String filePath) {
- super(filePath, blockletInfo.getNumberOfRows());
- this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
- this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
- }
-
- /**
- * Below method will be used to convert the thrift presence meta to wrapper
- * presence meta
- *
- * @param presentMetadataThrift
- * @return wrapper presence meta
- */
- private static PresenceMeta getPresenceMeta(
- org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
- PresenceMeta presenceMeta = new PresenceMeta();
- presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
- presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
- .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
- return presenceMeta;
- }
-
- /**
- * Below method will be used to read the chunk based on block indexes
- * Reading logic of below method is: Except last column all the column chunk
- * can be read in group if not last column then read data of all the column
- * present in block index together then process it. For last column read is
- * separately and process
- *
- * @param fileReader file reader to read the blocks from file
- * @param blockIndexes blocks range to be read
- * @return measure column chunks
- * @throws IOException
- */
- public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
- throws IOException {
- // read the column chunk based on block index and add
- MeasureRawColumnChunk[] dataChunks =
- new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
- if (blockIndexes.length == 0) {
- return dataChunks;
- }
- MeasureRawColumnChunk[] groupChunk = null;
- int index = 0;
- for (int i = 0; i < blockIndexes.length - 1; i++) {
- index = 0;
- groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
- for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
- dataChunks[j] = groupChunk[index++];
- }
- }
- if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
- dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
- readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
- } else {
- groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
- blockIndexes[blockIndexes.length - 1][1]);
- index = 0;
- for (int j = blockIndexes[blockIndexes.length - 1][0];
- j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
- dataChunks[j] = groupChunk[index++];
- }
- }
- return dataChunks;
+ super(blockletInfo, filePath);
}
@Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
@@ -146,20 +71,31 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
return rawColumnChunk;
}
- private MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
- int startBlockIndex, int endBlockIndex) throws IOException {
- long currentMeasureOffset = measureColumnChunkOffsets.get(startBlockIndex);
+ /**
+ * Below method will be used to read measure chunk data in group.
+ * This method will be useful to avoid multiple IO while reading the
+ * data from
+ *
+ * @param fileReader file reader to read the data
+ * @param startColumnBlockletIndex first column blocklet index to be read
+ * @param endColumnBlockletIndex end column blocklet index to be read
+ * @return measure raw chunkArray
+ * @throws IOException
+ */
+ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+ long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
ByteBuffer buffer = ByteBuffer.allocateDirect(
- (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+ (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
- (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+ (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
}
MeasureRawColumnChunk[] dataChunks =
- new MeasureRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+ new MeasureRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
int runningLength = 0;
int index = 0;
- for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+ for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
int currentLength =
(int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
MeasureRawColumnChunk measureRawColumnChunk =
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
new file mode 100644
index 0000000..307af41
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * Measure column V3 Reader class which will be used to read and uncompress
+ * V3 format data
+ * data format
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ */
+public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChunkReaderV2V3Format {
+
+ /**
+ * end position of last measure in carbon data file
+ */
+ private long measureOffsets;
+
+ public CompressedMeasureChunkFileBasedReaderV3(BlockletInfo blockletInfo, String filePath) {
+ super(blockletInfo, filePath);
+ measureOffsets = blockletInfo.getMeasureOffsets();
+ }
+
+ /**
+ * Below method will be used to read the measure column data form carbon data file
+ * 1. Get the length of the data to be read
+ * 2. Allocate the direct buffer
+ * 3. read the data from file
+ * 4. Get the data chunk object from data read
+ * 5. Create the raw chunk object and fill the details
+ *
+ * @param fileReader reader for reading the column from carbon data file
+ * @param blockletColumnIndex blocklet index of the column in carbon data file
+ * @return measure raw chunk
+ */
+ @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+ throws IOException {
+ int dataLength = 0;
+ // to calculate the length of the data to be read
+ // column other than last column we can subtract the offset of current column with
+ // next column and get the total length.
+ // but for last column we need to use lastDimensionOffset which is the end position
+ // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
+ if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
+ dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockIndex));
+ } else {
+ dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - measureColumnChunkOffsets
+ .get(blockIndex));
+ }
+ // allocate the buffer
+ ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength);
+ // read the data from carbon data file
+ synchronized (fileReader) {
+ fileReader
+ .readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength);
+ }
+ // get the data chunk which will have all the details about the data pages
+ DataChunk3 dataChunk =
+ CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockIndex));
+ // creating a raw chunks instance and filling all the details
+ MeasureRawColumnChunk rawColumnChunk =
+ new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this);
+ int numberOfPages = dataChunk.getPage_length().size();
+ byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+ byte[][] minValueOfEachPage = new byte[numberOfPages][];
+ int[] eachPageLength = new int[numberOfPages];
+ for (int i = 0; i < minValueOfEachPage.length; i++) {
+ maxValueOfEachPage[i] =
+ dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
+ minValueOfEachPage[i] =
+ dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
+ eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
+ }
+ rawColumnChunk.setDataChunkV3(dataChunk);
+ rawColumnChunk.setFileReader(fileReader);
+ rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+ rawColumnChunk.setMaxValues(maxValueOfEachPage);
+ rawColumnChunk.setMinValues(minValueOfEachPage);
+ rawColumnChunk.setRowCount(eachPageLength);
+ rawColumnChunk.setLengths(ArrayUtils
+ .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+ rawColumnChunk.setOffsets(ArrayUtils
+ .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+ return rawColumnChunk;
+ }
+
+ /**
+ * Below method will be used to read the multiple measure column data in group
+ * and divide into measure raw chunk object
+ * Steps for reading
+ * 1. Get the length of the data to be read
+ * 2. Allocate the direct buffer
+ * 3. read the data from file
+ * 4. Get the data chunk object from file for each column
+ * 5. Create the raw chunk object and fill the details for each column
+ * 6. increment the offset of the data
+ *
+ * @param fileReader
+ * reader which will be used to read the measure columns data from file
+ * @param startBlockletColumnIndex
+ * blocklet index of the first measure column
+ * @param endBlockletColumnIndex
+ * blocklet index of the last measure column
+ * @return MeasureRawColumnChunk array
+ */
+ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+ // to calculate the length of the data to be read
+ // column we can subtract the offset of start column offset with
+ // end column+1 offset and get the total length.
+ long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(
+ (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
+ // read the data from carbon data file
+ synchronized (fileReader) {
+ fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
+ (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
+ }
+ // create raw chunk for each measure column
+ MeasureRawColumnChunk[] measureDataChunk =
+ new MeasureRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
+ int runningLength = 0;
+ int index = 0;
+ for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
+ int currentLength =
+ (int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
+ MeasureRawColumnChunk measureRawColumnChunk =
+ new MeasureRawColumnChunk(i, buffer, runningLength, currentLength, this);
+ DataChunk3 dataChunk =
+ CarbonUtil.readDataChunk3(buffer, runningLength, measureColumnChunkLength.get(i));
+
+ int numberOfPages = dataChunk.getPage_length().size();
+ byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+ byte[][] minValueOfEachPage = new byte[numberOfPages][];
+ int[] eachPageLength = new int[numberOfPages];
+ for (int j = 0; j < minValueOfEachPage.length; j++) {
+ maxValueOfEachPage[j] =
+ dataChunk.getData_chunk_list().get(j).getMin_max().getMax_values().get(0).array();
+ minValueOfEachPage[j] =
+ dataChunk.getData_chunk_list().get(j).getMin_max().getMin_values().get(0).array();
+ eachPageLength[j] = dataChunk.getData_chunk_list().get(j).getNumberOfRowsInpage();
+ }
+ measureRawColumnChunk.setDataChunkV3(dataChunk);
+ ;
+ measureRawColumnChunk.setFileReader(fileReader);
+ measureRawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+ measureRawColumnChunk.setMaxValues(maxValueOfEachPage);
+ measureRawColumnChunk.setMinValues(minValueOfEachPage);
+ measureRawColumnChunk.setRowCount(eachPageLength);
+ measureRawColumnChunk.setLengths(ArrayUtils
+ .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+ measureRawColumnChunk.setOffsets(ArrayUtils
+ .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+ measureDataChunk[index] = measureRawColumnChunk;
+ runningLength += currentLength;
+ index++;
+ }
+ return measureDataChunk;
+ }
+
+ /**
+ * Below method will be used to convert the compressed measure chunk raw data to actual data
+ *
+ * @param measureRawColumnChunk measure raw chunk
+ * @param page number
+ * @return DimensionColumnDataChunk
+ */
+ @Override public MeasureColumnDataChunk convertToMeasureChunk(
+ MeasureRawColumnChunk measureRawColumnChunk, int pageNumber) throws IOException {
+ MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
+ // data chunk of blocklet column
+ DataChunk3 dataChunk3 = measureRawColumnChunk.getDataChunkV3();
+ // data chunk of page
+ DataChunk2 measureColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+ // calculating the start point of data
+ // as buffer can contain multiple column data, start point will be datachunkoffset +
+ // data chunk length + page offset
+ int copyPoint = measureRawColumnChunk.getOffSet() + measureColumnChunkLength
+ .get(measureRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+ List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
+ for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
+ valueEncodeMeta.add(CarbonUtil
+ .deserializeEncoderMetaNew(measureColumnChunk.getEncoder_meta().get(i).array()));
+ }
+ WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
+ ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
+ // uncompress
+ byte[] data = new byte[measureColumnChunk.data_page_length];
+ ByteBuffer rawData = measureRawColumnChunk.getRawData();
+ rawData.position(copyPoint);
+ rawData.get(data);
+ values.uncompress(compressionModel.getConvertedDataType()[0], data, 0,
+ measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
+ compressionModel.getMaxValue()[0], measureRawColumnChunk.getRowCount()[pageNumber]);
+ CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
+ // set the data chunk
+ datChunk.setMeasureDataHolder(measureDataHolder);
+ // set the null value indexes
+ datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
+ return datChunk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 14e7938..23af707 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
/**
* Below class will be used to store fixed length dimension data
@@ -66,13 +67,7 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
}
// below part is to convert the byte array to surrogate value
int startOffsetOfData = index * columnValueSize;
- int surrogate = 0;
- for (int i = 0; i < columnValueSize; i++) {
- surrogate <<= 8;
- surrogate ^= data[startOffsetOfData] & 0xFF;
- startOffsetOfData++;
- }
- return surrogate;
+ return CarbonUtil.getSurrogateInternal(data, startOffsetOfData, columnValueSize);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
new file mode 100644
index 0000000..346d8d8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
+
+ private boolean alreadySorted;
+
+ private short[] dataAfterComp;
+
+ private short[] indexMap;
+
+ private byte[][] keyBlock;
+
+ private short[] dataIndexMap;
+
+ private int totalSize;
+
+ public BlockIndexerStorageForShort(byte[][] keyBlock, boolean compressData,
+ boolean isNoDictionary, boolean isSortRequired) {
+ ColumnWithShortIndex[] columnWithIndexs = createColumnWithIndexArray(keyBlock, isNoDictionary);
+ if (isSortRequired) {
+ Arrays.sort(columnWithIndexs);
+ }
+ compressMyOwnWay(extractDataAndReturnIndexes(columnWithIndexs, keyBlock));
+ if (compressData) {
+ compressDataMyOwnWay(columnWithIndexs);
+ }
+ }
+
+ /**
+ * Create an object with each column array and respective index
+ *
+ * @return
+ */
+ private ColumnWithShortIndex[] createColumnWithIndexArray(byte[][] keyBlock,
+ boolean isNoDictionary) {
+ ColumnWithShortIndex[] columnWithIndexs;
+ if (isNoDictionary) {
+ columnWithIndexs = new ColumnWithShortIndex[keyBlock.length];
+ for (short i = 0; i < columnWithIndexs.length; i++) {
+ columnWithIndexs[i] = new ColumnWithShortIndexForNoDictionay(keyBlock[i], i);
+ }
+ } else {
+ columnWithIndexs = new ColumnWithShortIndex[keyBlock.length];
+ for (short i = 0; i < columnWithIndexs.length; i++) {
+ columnWithIndexs[i] = new ColumnWithShortIndex(keyBlock[i], i);
+ }
+ }
+ return columnWithIndexs;
+ }
+
+ private short[] extractDataAndReturnIndexes(ColumnWithShortIndex[] columnWithIndexs,
+ byte[][] keyBlock) {
+ short[] indexes = new short[columnWithIndexs.length];
+ for (int i = 0; i < indexes.length; i++) {
+ indexes[i] = columnWithIndexs[i].getIndex();
+ keyBlock[i] = columnWithIndexs[i].getColumn();
+ }
+ this.keyBlock = keyBlock;
+ return indexes;
+ }
+
+ /**
+ * It compresses depends up on the sequence numbers.
+ * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
+ * first array the start and end of sequential numbers and second array
+ * keeps the indexes of where sequential numbers starts. If there is no
+ * sequential numbers then the same array it returns with empty second
+ * array.
+ *
+ * @param indexes
+ */
+ public void compressMyOwnWay(short[] indexes) {
+ List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ int k = 0;
+ int i = 1;
+ for (; i < indexes.length; i++) {
+ if (indexes[i] - indexes[i - 1] == 1) {
+ k++;
+ } else {
+ if (k > 0) {
+ map.add(((short) list.size()));
+ list.add(indexes[i - k - 1]);
+ list.add(indexes[i - 1]);
+ } else {
+ list.add(indexes[i - 1]);
+ }
+ k = 0;
+ }
+ }
+ if (k > 0) {
+ map.add(((short) list.size()));
+ list.add(indexes[i - k - 1]);
+ list.add(indexes[i - 1]);
+ } else {
+ list.add(indexes[i - 1]);
+ }
+ double compressionPercentage = (((list.size() + map.size()) * 100) / indexes.length);
+ if (compressionPercentage > 70) {
+ dataAfterComp = indexes;
+ } else {
+ dataAfterComp = convertToArray(list);
+ }
+ if (indexes.length == dataAfterComp.length) {
+ indexMap = new short[0];
+ } else {
+ indexMap = convertToArray(map);
+ }
+ if (dataAfterComp.length == 2 && indexMap.length == 1) {
+ alreadySorted = true;
+ }
+ }
+
+ private short[] convertToArray(List<Short> list) {
+ short[] shortArray = new short[list.size()];
+ for (int i = 0; i < shortArray.length; i++) {
+ shortArray[i] = list.get(i);
+ }
+ return shortArray;
+ }
+
+ /**
+ * @return the alreadySorted
+ */
+ public boolean isAlreadySorted() {
+ return alreadySorted;
+ }
+
+ /**
+ * @return the dataAfterComp
+ */
+ public short[] getDataAfterComp() {
+ return dataAfterComp;
+ }
+
+ /**
+ * @return the indexMap
+ */
+ public short[] getIndexMap() {
+ return indexMap;
+ }
+
+ /**
+ * @return the keyBlock
+ */
+ public byte[][] getKeyBlock() {
+ return keyBlock;
+ }
+
+ private void compressDataMyOwnWay(ColumnWithShortIndex[] indexes) {
+ byte[] prvKey = indexes[0].getColumn();
+ List<ColumnWithShortIndex> list =
+ new ArrayList<ColumnWithShortIndex>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ list.add(indexes[0]);
+ short counter = 1;
+ short start = 0;
+ List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ for (int i = 1; i < indexes.length; i++) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, indexes[i].getColumn()) != 0) {
+ prvKey = indexes[i].getColumn();
+ list.add(indexes[i]);
+ map.add(start);
+ map.add(counter);
+ start += counter;
+ counter = 1;
+ continue;
+ }
+ counter++;
+ }
+ map.add(start);
+ map.add(counter);
+ this.keyBlock = convertToKeyArray(list);
+ if (indexes.length == keyBlock.length) {
+ dataIndexMap = new short[0];
+ } else {
+ dataIndexMap = convertToArray(map);
+ }
+ }
+
+ private byte[][] convertToKeyArray(List<ColumnWithShortIndex> list) {
+ byte[][] shortArray = new byte[list.size()][];
+ for (int i = 0; i < shortArray.length; i++) {
+ shortArray[i] = list.get(i).getColumn();
+ totalSize += shortArray[i].length;
+ }
+ return shortArray;
+ }
+
+ @Override public short[] getDataIndexMap() {
+ return dataIndexMap;
+ }
+
+ @Override public int getTotalSize() {
+ return totalSize;
+ }
+
+ @Override public byte[] getMin() {
+ return keyBlock[0];
+ }
+
+ @Override public byte[] getMax() {
+ return keyBlock[keyBlock.length - 1];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
new file mode 100644
index 0000000..57447a3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class ColumnWithShortIndex implements Comparable<ColumnWithShortIndex> {
+ protected byte[] column;
+
+ private short index;
+
+ public ColumnWithShortIndex(byte[] column, short index) {
+ this.column = column;
+ this.index = index;
+ }
+
+ /**
+ * @return the column
+ */
+ public byte[] getColumn() {
+ return column;
+ }
+
+ /**
+ * @param column the column to set
+ */
+ public void setColumn(byte[] column) {
+ this.column = column;
+ }
+
+ /**
+ * @return the index
+ */
+ public short getIndex() {
+ return index;
+ }
+
+ /**
+ * @param index the index to set
+ */
+ public void setIndex(short index) {
+ this.index = index;
+ }
+
+ @Override public int compareTo(ColumnWithShortIndex o) {
+ return ByteUtil.UnsafeComparer.INSTANCE.compareTo(column, o.column);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ ColumnWithShortIndex o = (ColumnWithShortIndex)obj;
+ return Arrays.equals(column, o.column) && index == o.index;
+ }
+
+ @Override public int hashCode() {
+ return Arrays.hashCode(column) + index;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
new file mode 100644
index 0000000..34cce63
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+public class ColumnWithShortIndexForNoDictionay extends ColumnWithShortIndex
+ implements Comparable<ColumnWithShortIndex> {
+
+ public ColumnWithShortIndexForNoDictionay(byte[] column, short index) {
+ super(column, index);
+ }
+
+ @Override public int compareTo(ColumnWithShortIndex o) {
+ return UnsafeComparer.INSTANCE
+ .compareTo(column, 2, column.length - 2, o.column, 2, o.column.length - 2);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ ColumnWithIntIndexForHighCard o = (ColumnWithIntIndexForHighCard) obj;
+ return Arrays.equals(column, o.column) && getIndex() == o.getIndex();
+ }
+
+ @Override public int hashCode() {
+ return Arrays.hashCode(column) + getIndex();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
index 7629895..240f891 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
@@ -19,7 +19,8 @@ package org.apache.carbondata.core.metadata;
public enum ColumnarFormatVersion {
V1((short)1),
- V2((short)2);
+ V2((short)2),
+ V3((short)3);
private short version;
ColumnarFormatVersion(short version) {
@@ -43,8 +44,12 @@ public enum ColumnarFormatVersion {
case 1:
// after multiple reader support, user can write new file with version 1
return V1;
- default:
+ case 2:
return V2;
+ case 3:
+ return V3;
+ default:
+ return V3;
}
}
}