You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:29:52 UTC
[46/56] [abbrv] carbondata git commit: add unsafe column page
add unsafe column page
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7359601b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7359601b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7359601b
Branch: refs/heads/streaming_ingest
Commit: 7359601b4a7808311bda33437333d627a6f8400d
Parents: 94c4910
Author: jackylk <ja...@huawei.com>
Authored: Mon Jun 19 11:59:37 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Mon Jun 19 11:59:37 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 10 +
.../chunk/impl/MeasureRawColumnChunk.java | 3 +-
.../chunk/reader/MeasureColumnChunkReader.java | 3 +-
.../AbstractMeasureChunkReaderV2V3Format.java | 3 +-
...CompressedMeasureChunkFileBasedReaderV1.java | 3 +-
...CompressedMeasureChunkFileBasedReaderV2.java | 3 +-
...CompressedMeasureChunkFileBasedReaderV3.java | 7 +-
.../core/datastore/page/ColumnPage.java | 496 ++++++-------
.../core/datastore/page/LazyColumnPage.java | 7 +-
.../datastore/page/SafeFixLengthColumnPage.java | 310 ++++++++
.../datastore/page/SafeVarLengthColumnPage.java | 71 ++
.../page/UnsafeFixLengthColumnPage.java | 334 +++++++++
.../page/UnsafeVarLengthColumnPage.java | 128 ++++
.../datastore/page/VarLengthColumnPageBase.java | 247 +++++++
.../page/encoding/AdaptiveCompressionCodec.java | 5 +-
.../page/encoding/AdaptiveIntegerCodec.java | 9 +-
.../page/encoding/ColumnPageCodec.java | 5 +-
.../page/encoding/CompressionCodec.java | 5 +-
.../page/encoding/DeltaIntegerCodec.java | 9 +-
.../encoding/UpscaleDeltaFloatingCodec.java | 9 +-
.../page/encoding/UpscaleFloatingCodec.java | 9 +-
.../page/statistics/ColumnPageStatsVO.java | 15 +-
.../core/memory/IntPointerBuffer.java | 16 +-
.../core/memory/UnsafeMemoryManager.java | 27 +-
.../core/metadata/datatype/DataType.java | 3 +
.../executer/RowLevelFilterExecuterImpl.java | 31 +-
.../core/scan/result/AbstractScannedResult.java | 30 +-
.../result/impl/FilterQueryScannedResult.java | 4 +-
.../core/scan/scanner/impl/FilterScanner.java | 3 +-
.../apache/carbondata/core/util/ByteUtil.java | 12 +
.../examples/CarbonSessionExample.scala | 2 +-
.../carbondata/examples/CompareTest.scala | 4 +-
.../TestNullAndEmptyFieldsUnsafe.scala | 119 ++++
.../TestLoadDataWithHiveSyntaxUnsafe.scala | 709 +++++++++++++++++++
.../carbondata/CarbonDataSourceSuite.scala | 2 +-
.../store/CarbonFactDataHandlerColumnar.java | 10 +-
.../carbondata/processing/store/TablePage.java | 39 +-
.../processing/store/TablePageEncoder.java | 6 +-
.../writer/v3/CarbonFactDataWriterImplV3.java | 2 +
39 files changed, 2333 insertions(+), 377 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ec13bd6..96c26b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1067,6 +1067,16 @@ public final class CarbonCommonConstants {
public static final int CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME = 250;
/**
+ * to enable unsafe column page in write step
+ */
+ public static final String ENABLE_UNSAFE_COLUMN_PAGE_LOADING = "enable.unsafe.columnpage";
+
+ /**
+ * default value of ENABLE_UNSAFE_COLUMN_PAGE_LOADING
+ */
+ public static final String ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT = "false";
+
+ /**
* to enable offheap sort
*/
public static final String ENABLE_UNSAFE_SORT = "enable.unsafe.sort";
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 4702abd..143dd4d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.memory.MemoryException;
/**
* Contains raw measure data
@@ -80,7 +81,7 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
if (dataChunks[index] == null) {
dataChunks[index] = chunkReader.convertToMeasureChunk(this, index);
}
- } catch (IOException e) {
+ } catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
index 39789b1..dba6823 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.memory.MemoryException;
/**
* Reader interface for reading the measure blocks from file
@@ -55,6 +56,6 @@ public interface MeasureColumnChunkReader {
* @throws IOException
*/
MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
- int pageNumber) throws IOException;
+ int pageNumber) throws IOException, MemoryException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
index eba1777..2f5af87 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
@@ -129,7 +130,7 @@ public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasu
protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk,
- DataChunk2 measureColumnChunk, int copyPoint) {
+ DataChunk2 measureColumnChunk, int copyPoint) throws MemoryException {
// for measure, it should have only one ValueEncoderMeta
assert (measureColumnChunk.getEncoder_meta().size() == 1);
byte[] encodedMeta = measureColumnChunk.getEncoder_meta().get(0).array();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index cdaaf81..6e59b9f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
@@ -92,7 +93,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
@Override
public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
- int pageNumber) throws IOException {
+ int pageNumber) throws IOException, MemoryException {
int blockIndex = measureRawColumnChunk.getBlockletId();
DataChunk dataChunk = measureColumnChunks.get(blockIndex);
ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index a94193a..d90c7fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
@@ -105,7 +106,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
}
public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
- int pageNumber) throws IOException {
+ int pageNumber) throws IOException, MemoryException {
MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
int copyPoint = measureRawColumnChunk.getOffSet();
int blockIndex = measureRawColumnChunk.getBlockletId();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 325387c..2ca7193 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
@@ -198,8 +199,10 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
* @param pageNumber number
* @return DimensionColumnDataChunk
*/
- @Override public MeasureColumnDataChunk convertToMeasureChunk(
- MeasureRawColumnChunk measureRawColumnChunk, int pageNumber) throws IOException {
+ @Override
+ public MeasureColumnDataChunk convertToMeasureChunk(
+ MeasureRawColumnChunk measureRawColumnChunk, int pageNumber)
+ throws IOException, MemoryException {
MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
// data chunk of blocklet column
DataChunk3 dataChunk3 = measureRawColumnChunk.getDataChunkV3();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 863d1b0..3c47a8c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -18,15 +18,13 @@
package org.apache.carbondata.core.datastore.page;
import java.math.BigDecimal;
-import java.nio.ByteBuffer;
import java.util.BitSet;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -39,130 +37,225 @@ import static org.apache.carbondata.core.metadata.datatype.DataType.INT;
import static org.apache.carbondata.core.metadata.datatype.DataType.LONG;
import static org.apache.carbondata.core.metadata.datatype.DataType.SHORT;
-/**
- * Represent a columnar data in one page for one column.
- */
-public class ColumnPage {
-
- private final int pageSize;
- private DataType dataType;
- private ColumnPageStatsVO stats;
+public abstract class ColumnPage {
- // Only one of following fields will be used
- private byte[] byteData;
- private short[] shortData;
- private int[] intData;
- private long[] longData;
- private float[] floatData;
- private double[] doubleData;
+ protected final int pageSize;
+ protected final DataType dataType;
- // for string and decimal data
- private byte[][] byteArrayData;
+ // statistics of this column page
+ private ColumnPageStatsVO stats;
// The index of the rowId whose value is null, will be set to 1
private BitSet nullBitSet;
+ protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
+ CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT));
+
protected ColumnPage(DataType dataType, int pageSize) {
- this.pageSize = pageSize;
this.dataType = dataType;
+ this.pageSize = pageSize;
+ this.stats = new ColumnPageStatsVO(dataType);
+ this.nullBitSet = new BitSet(pageSize);
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public ColumnPageStatsVO getStatistics() {
+ return stats;
+ }
+
+ public int getPageSize() {
+ return pageSize;
}
- // create a new page
- public static ColumnPage newPage(DataType dataType, int pageSize) {
+ private static ColumnPage createVarLengthPage(DataType dataType, int pageSize) {
+ if (unsafe) {
+ try {
+ return new UnsafeVarLengthColumnPage(dataType, pageSize);
+ } catch (MemoryException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return new SafeVarLengthColumnPage(dataType, pageSize);
+ }
+ }
+
+ private static ColumnPage createFixLengthPage(DataType dataType, int pageSize) {
+ if (unsafe) {
+ try {
+ return new UnsafeFixLengthColumnPage(dataType, pageSize);
+ } catch (MemoryException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return new SafeFixLengthColumnPage(dataType, pageSize);
+ }
+ }
+
+ private static ColumnPage createPage(DataType dataType, int pageSize) {
+ if (dataType.equals(BYTE_ARRAY) | dataType.equals(DECIMAL)) {
+ return createVarLengthPage(dataType, pageSize);
+ } else {
+ return createFixLengthPage(dataType, pageSize);
+ }
+ }
+
+ public static ColumnPage newVarLengthPath(DataType dataType, int pageSize) {
+ if (unsafe) {
+ try {
+ return new UnsafeVarLengthColumnPage(dataType, pageSize);
+ } catch (MemoryException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return new SafeVarLengthColumnPage(dataType, pageSize);
+ }
+ }
+
+ /**
+ * Create a new page of dataType and number of row = pageSize
+ */
+ public static ColumnPage newPage(DataType dataType, int pageSize) throws MemoryException {
ColumnPage instance;
- switch (dataType) {
- case BYTE:
- instance = newBytePage(new byte[pageSize]);
- break;
- case SHORT:
- instance = newShortPage(new short[pageSize]);
- break;
- case INT:
- instance = newIntPage(new int[pageSize]);
- break;
- case LONG:
- instance = newLongPage(new long[pageSize]);
- break;
- case FLOAT:
- instance = newFloatPage(new float[pageSize]);
- break;
- case DOUBLE:
- instance = newDoublePage(new double[pageSize]);
- break;
- case DECIMAL:
- instance = newDecimalPage(new byte[pageSize][]);
- break;
- case BYTE_ARRAY:
- instance = newVarLengthPage(new byte[pageSize][]);
- break;
- default:
- throw new RuntimeException("Unsupported data dataType: " + dataType);
+ if (unsafe) {
+ switch (dataType) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ instance = new UnsafeFixLengthColumnPage(dataType, pageSize);
+ break;
+ case DECIMAL:
+ case BYTE_ARRAY:
+ instance = new UnsafeVarLengthColumnPage(dataType, pageSize);
+ break;
+ default:
+ throw new RuntimeException("Unsupported data dataType: " + dataType);
+ }
+ } else {
+ switch (dataType) {
+ case BYTE:
+ instance = newBytePage(new byte[pageSize]);
+ break;
+ case SHORT:
+ instance = newShortPage(new short[pageSize]);
+ break;
+ case INT:
+ instance = newIntPage(new int[pageSize]);
+ break;
+ case LONG:
+ instance = newLongPage(new long[pageSize]);
+ break;
+ case FLOAT:
+ instance = newFloatPage(new float[pageSize]);
+ break;
+ case DOUBLE:
+ instance = newDoublePage(new double[pageSize]);
+ break;
+ case DECIMAL:
+ instance = newDecimalPage(new byte[pageSize][]);
+ break;
+ default:
+ throw new RuntimeException("Unsupported data dataType: " + dataType);
+ }
}
- instance.stats = new ColumnPageStatsVO(dataType);
- instance.nullBitSet = new BitSet(pageSize);
return instance;
}
- protected static ColumnPage newBytePage(byte[] byteData) {
- ColumnPage columnPage = new ColumnPage(BYTE, byteData.length);
- columnPage.byteData = byteData;
+ private static ColumnPage newBytePage(byte[] byteData) {
+ ColumnPage columnPage = createPage(BYTE, byteData.length);
+ columnPage.setBytePage(byteData);
return columnPage;
}
- protected static ColumnPage newShortPage(short[] shortData) {
- ColumnPage columnPage = new ColumnPage(SHORT, shortData.length);
- columnPage.shortData = shortData;
+ private static ColumnPage newShortPage(short[] shortData) {
+ ColumnPage columnPage = createPage(SHORT, shortData.length);
+ columnPage.setShortPage(shortData);
return columnPage;
}
- protected static ColumnPage newIntPage(int[] intData) {
- ColumnPage columnPage = new ColumnPage(INT, intData.length);
- columnPage.intData = intData;
+ private static ColumnPage newIntPage(int[] intData) {
+ ColumnPage columnPage = createPage(INT, intData.length);
+ columnPage.setIntPage(intData);
return columnPage;
}
- protected static ColumnPage newLongPage(long[] longData) {
- ColumnPage columnPage = new ColumnPage(LONG, longData.length);
- columnPage.longData = longData;
+ private static ColumnPage newLongPage(long[] longData) {
+ ColumnPage columnPage = createPage(LONG, longData.length);
+ columnPage.setLongPage(longData);
return columnPage;
}
- protected static ColumnPage newFloatPage(float[] floatData) {
- ColumnPage columnPage = new ColumnPage(FLOAT, floatData.length);
- columnPage.floatData = floatData;
+ private static ColumnPage newFloatPage(float[] floatData) {
+ ColumnPage columnPage = createPage(FLOAT, floatData.length);
+ columnPage.setFloatPage(floatData);
return columnPage;
}
- protected static ColumnPage newDoublePage(double[] doubleData) {
- ColumnPage columnPage = new ColumnPage(DOUBLE, doubleData.length);
- columnPage.doubleData = doubleData;
+ private static ColumnPage newDoublePage(double[] doubleData) {
+ ColumnPage columnPage = createPage(DOUBLE, doubleData.length);
+ columnPage.setDoublePage(doubleData);
return columnPage;
}
- protected static ColumnPage newDecimalPage(byte[][] decimalData) {
- ColumnPage columnPage = new ColumnPage(DECIMAL, decimalData.length);
- columnPage.byteArrayData = decimalData;
+ private static ColumnPage newDecimalPage(byte[][] byteArray) {
+ ColumnPage columnPage = createPage(DECIMAL, byteArray.length);
+ columnPage.setByteArrayPage(byteArray);
return columnPage;
}
- protected static ColumnPage newVarLengthPage(byte[][] stringData) {
- ColumnPage columnPage = new ColumnPage(BYTE_ARRAY, stringData.length);
- columnPage.byteArrayData = stringData;
- return columnPage;
+ private static ColumnPage newDecimalPage(byte[] lvEncodedByteArray) throws MemoryException {
+ return VarLengthColumnPageBase.newDecimalColumnPage(lvEncodedByteArray);
}
- public DataType getDataType() {
- return dataType;
- }
+ /**
+ * Set byte values to page
+ */
+ public abstract void setBytePage(byte[] byteData);
- public ColumnPageStatsVO getStatistics() {
- return stats;
- }
+ /**
+ * Set short values to page
+ */
+ public abstract void setShortPage(short[] shortData);
- public int getPageSize() {
- return pageSize;
- }
+ /**
+ * Set int values to page
+ */
+ public abstract void setIntPage(int[] intData);
+ /**
+ * Set long values to page
+ */
+ public abstract void setLongPage(long[] longData);
+
+ /**
+ * Set float values to page
+ */
+ public abstract void setFloatPage(float[] floatData);
+
+ /**
+ * Set double value to page
+ */
+ public abstract void setDoublePage(double[] doubleData);
+
+ /**
+ * Set byte array to page
+ */
+ public abstract void setByteArrayPage(byte[][] byteArray);
+
+ /**
+ * free memory as needed
+ */
+ public abstract void freeMemory();
+
+ /**
+ * Set value at rowId
+ */
public void putData(int rowId, Object value) {
if (value == null) {
putNull(rowId);
@@ -187,8 +280,6 @@ public class ColumnPage {
putDouble(rowId, (double) value);
break;
case DECIMAL:
- putDecimalBytes(rowId, (byte[]) value);
- break;
case BYTE_ARRAY:
putBytes(rowId, (byte[]) value);
break;
@@ -201,62 +292,44 @@ public class ColumnPage {
/**
* Set byte value at rowId
*/
- public void putByte(int rowId, byte value) {
- byteData[rowId] = value;
- }
+ public abstract void putByte(int rowId, byte value);
/**
* Set short value at rowId
*/
- public void putShort(int rowId, short value) {
- shortData[rowId] = value;
- }
+ public abstract void putShort(int rowId, short value);
/**
* Set integer value at rowId
*/
- public void putInt(int rowId, int value) {
- intData[rowId] = value;
- }
+ public abstract void putInt(int rowId, int value);
/**
* Set long value at rowId
*/
- public void putLong(int rowId, long value) {
- longData[rowId] = value;
- }
+ public abstract void putLong(int rowId, long value);
/**
* Set double value at rowId
*/
- public void putDouble(int rowId, double value) {
- doubleData[rowId] = value;
- }
+ public abstract void putDouble(int rowId, double value);
/**
- * Set decimal value at rowId
+ * Set byte array value at rowId
*/
- public void putDecimalBytes(int rowId, byte[] decimalInBytes) {
- // do LV (length value) coded of input bytes
- ByteBuffer byteBuffer = ByteBuffer.allocate(decimalInBytes.length +
- CarbonCommonConstants.INT_SIZE_IN_BYTE);
- byteBuffer.putInt(decimalInBytes.length);
- byteBuffer.put(decimalInBytes);
- byteBuffer.flip();
- byteArrayData[rowId] = byteBuffer.array();
- }
+ public abstract void putBytes(int rowId, byte[] bytes);
/**
- * Set string value at rowId
+ * Set byte array from offset to length at rowId
*/
- public void putBytes(int rowId, byte[] bytes) {
- byteArrayData[rowId] = bytes;
- }
+ public abstract void putBytes(int rowId, byte[] bytes, int offset, int length);
+
+ private static final byte[] ZERO = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
/**
* Set null at rowId
*/
- public void putNull(int rowId) {
+ private void putNull(int rowId) {
nullBitSet.set(rowId);
switch (dataType) {
case BYTE:
@@ -275,175 +348,100 @@ public class ColumnPage {
putDouble(rowId, 0.0);
break;
case DECIMAL:
- byte[] decimalInBytes = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
- putDecimalBytes(rowId, decimalInBytes);
+ putBytes(rowId, ZERO);
break;
}
}
/**
- * Get byte value at rowId
+ * Get null bitset page
*/
- public byte getByte(int rowId) {
- return byteData[rowId];
+ public BitSet getNullBitSet() {
+ return nullBitSet;
}
/**
+ * Get byte value at rowId
+ */
+ public abstract byte getByte(int rowId);
+
+ /**
* Get short value at rowId
*/
- public short getShort(int rowId) {
- return shortData[rowId];
- }
+ public abstract short getShort(int rowId);
/**
* Get int value at rowId
*/
- public int getInt(int rowId) {
- return intData[rowId];
- }
+ public abstract int getInt(int rowId);
/**
* Get long value at rowId
*/
- public long getLong(int rowId) {
- return longData[rowId];
- }
+ public abstract long getLong(int rowId);
/**
* Get float value at rowId
*/
- public float getFloat(int rowId) {
- return floatData[rowId];
- }
+ public abstract float getFloat(int rowId);
/**
* Get double value at rowId
*/
- public double getDouble(int rowId) {
- return doubleData[rowId];
- }
+ public abstract double getDouble(int rowId);
/**
* Get decimal value at rowId
*/
- public byte[] getDecimalBytes(int rowId) {
- return byteArrayData[rowId];
- }
-
- public BigDecimal getDecimal(int rowId) {
- byte[] bytes = getDecimalBytes(rowId);
- return DataTypeUtil.byteToBigDecimal(bytes);
- }
+ public abstract BigDecimal getDecimal(int rowId);
/**
* Get byte value page
*/
- public byte[] getBytePage() {
- return byteData;
- }
+ public abstract byte[] getBytePage();
/**
* Get short value page
*/
- public short[] getShortPage() {
- return shortData;
- }
+ public abstract short[] getShortPage();
/**
* Get int value page
*/
- public int[] getIntPage() {
- return intData;
- }
+ public abstract int[] getIntPage();
/**
* Get long value page
*/
- public long[] getLongPage() {
- return longData;
- }
+ public abstract long[] getLongPage();
/**
* Get float value page
*/
- public float[] getFloatPage() {
- return floatData;
- }
+ public abstract float[] getFloatPage();
/**
* Get double value page
*/
- public double[] getDoublePage() {
- return doubleData;
- }
+ public abstract double[] getDoublePage();
/**
- * Get decimal value page
+ * Get variable length page data
*/
- public byte[][] getDecimalPage() {
- return byteArrayData;
- }
+ public abstract byte[][] getByteArrayPage();
/**
- * Get string page
+ * For variable length page, get the flattened data
*/
- public byte[][] getByteArrayPage() {
- return byteArrayData;
- }
+ public abstract byte[] getFlattenedBytePage();
/**
- * Get null bitset page
+ * Encode the page data by codec (Visitor)
*/
- public BitSet getNullBitSet() {
- return nullBitSet;
- }
-
- public void freeMemory() {
- }
+ public abstract void encode(PrimitiveCodec codec);
/**
- * apply encoding to page data
- * @param codec type of transformation
- */
- public void encode(PrimitiveCodec codec) {
- switch (dataType) {
- case BYTE:
- for (int i = 0; i < pageSize; i++) {
- codec.encode(i, byteData[i]);
- }
- break;
- case SHORT:
- for (int i = 0; i < pageSize; i++) {
- codec.encode(i, shortData[i]);
- }
- break;
- case INT:
- for (int i = 0; i < pageSize; i++) {
- codec.encode(i, intData[i]);
- }
- break;
- case LONG:
- for (int i = 0; i < pageSize; i++) {
- codec.encode(i, longData[i]);
- }
- break;
- case FLOAT:
- for (int i = 0; i < pageSize; i++) {
- codec.encode(i, floatData[i]);
- }
- break;
- case DOUBLE:
- for (int i = 0; i < pageSize; i++) {
- codec.encode(i, doubleData[i]);
- }
- break;
- default:
- throw new UnsupportedOperationException("not support encode on " + dataType + " page");
- }
- }
-
- /**
- * compress page data using specified compressor
+ * Compress page data using specified compressor
*/
public byte[] compress(Compressor compressor) {
switch (dataType) {
@@ -460,21 +458,19 @@ public class ColumnPage {
case DOUBLE:
return compressor.compressDouble(getDoublePage());
case DECIMAL:
- byte[] flattenedDecimal = ByteUtil.flatten(getDecimalPage());
- return compressor.compressByte(flattenedDecimal);
+ return compressor.compressByte(getFlattenedBytePage());
case BYTE_ARRAY:
- byte[] flattenedString = ByteUtil.flatten(getByteArrayPage());
- return compressor.compressByte(flattenedString);
+ return compressor.compressByte(getFlattenedBytePage());
default:
throw new UnsupportedOperationException("unsupport compress column page: " + dataType);
}
}
/**
- * decompress data and create a column page using the decompressed data
+ * Decompress data and create a column page using the decompressed data
*/
public static ColumnPage decompress(Compressor compressor, DataType dataType,
- byte[] compressedData, int offset, int length) {
+ byte[] compressedData, int offset, int length) throws MemoryException {
switch (dataType) {
case BYTE:
byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
@@ -495,54 +491,12 @@ public class ColumnPage {
double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
return newDoublePage(doubleData);
case DECIMAL:
- byte[] decompressed = compressor.unCompressByte(compressedData, offset, length);
- byte[][] decimal = deflatten(decompressed);
- return newDecimalPage(decimal);
case BYTE_ARRAY:
- decompressed = compressor.unCompressByte(compressedData, offset, length);
- byte[][] string = deflatten(decompressed);
- return newVarLengthPage(string);
+ byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
+ return newDecimalPage(lvEncodedBytes);
default:
throw new UnsupportedOperationException("unsupport uncompress column page: " + dataType);
}
}
- // input byte[] is LV encoded, this function can expand it into byte[][]
- private static byte[][] deflatten(byte[] input) {
- int pageSize = Integer.valueOf(
- CarbonProperties.getInstance().getProperty(
- CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
- CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
- int numRows = 0;
- // offset of value of each row in input data
- int[] offsetOfRow = new int[pageSize];
- ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
- for (int currentLength = 0; currentLength < input.length;) {
- buffer.put(input, currentLength, CarbonCommonConstants.INT_SIZE_IN_BYTE);
- buffer.flip();
- int valueLength = buffer.getInt();
- offsetOfRow[numRows] = currentLength + CarbonCommonConstants.INT_SIZE_IN_BYTE;
- currentLength += CarbonCommonConstants.INT_SIZE_IN_BYTE + valueLength;
- buffer.clear();
- numRows++;
- }
- byte[][] byteArrayData = new byte[numRows][];
- for (int rowId = 0; rowId < numRows; rowId++) {
- int valueOffset = offsetOfRow[rowId];
- int valueLength;
- if (rowId != numRows - 1) {
- valueLength = offsetOfRow[rowId + 1] - valueOffset - CarbonCommonConstants.INT_SIZE_IN_BYTE;
- } else {
- // last row
- buffer.put(input, offsetOfRow[rowId] - CarbonCommonConstants.INT_SIZE_IN_BYTE,
- CarbonCommonConstants.INT_SIZE_IN_BYTE);
- buffer.flip();
- valueLength = buffer.getInt();
- }
- byte[] value = new byte[valueLength];
- System.arraycopy(input, valueOffset, value, 0, valueLength);
- byteArrayData[rowId] = value;
- }
- return byteArrayData;
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 165e027..1784136 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -21,7 +21,7 @@ package org.apache.carbondata.core.datastore.page;
* This is a decorator of column page, it performs transformation lazily (when caller calls getXXX
* method to get the value from the page)
*/
-public class LazyColumnPage extends ColumnPage {
+public class LazyColumnPage extends SafeFixLengthColumnPage {
// decorated column page
private ColumnPage columnPage;
@@ -79,4 +79,9 @@ public class LazyColumnPage extends ColumnPage {
throw new RuntimeException("internal error: " + this.toString());
}
}
+
+ @Override
+ public void freeMemory() {
+ columnPage.freeMemory();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
new file mode 100644
index 0000000..34ab01c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Represent a columnar data in one page for one column.
+ */
+public class SafeFixLengthColumnPage extends ColumnPage {
+
+ // Only one of following fields will be used
+ private byte[] byteData;
+ private short[] shortData;
+ private int[] intData;
+ private long[] longData;
+ private float[] floatData;
+ private double[] doubleData;
+
+ public SafeFixLengthColumnPage(DataType dataType, int pageSize) {
+ super(dataType, pageSize);
+ }
+
+ /**
+ * Set byte value at rowId
+ */
+ @Override
+ public void putByte(int rowId, byte value) {
+ byteData[rowId] = value;
+ }
+
+ /**
+ * Set short value at rowId
+ */
+ @Override
+ public void putShort(int rowId, short value) {
+ shortData[rowId] = value;
+ }
+
+ /**
+ * Set integer value at rowId
+ */
+ @Override
+ public void putInt(int rowId, int value) {
+ intData[rowId] = value;
+ }
+
+ /**
+ * Set long value at rowId
+ */
+ @Override
+ public void putLong(int rowId, long value) {
+ longData[rowId] = value;
+ }
+
+ /**
+ * Set double value at rowId
+ */
+ @Override
+ public void putDouble(int rowId, double value) {
+ doubleData[rowId] = value;
+ }
+
+ /**
+ * Set string value at rowId
+ */
+ @Override
+ public void putBytes(int rowId, byte[] bytes) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ /**
+ * Get byte value at rowId
+ */
+ @Override
+ public byte getByte(int rowId) {
+ return byteData[rowId];
+ }
+
+ /**
+ * Get short value at rowId
+ */
+ @Override
+ public short getShort(int rowId) {
+ return shortData[rowId];
+ }
+
+ /**
+ * Get int value at rowId
+ */
+ @Override
+ public int getInt(int rowId) {
+ return intData[rowId];
+ }
+
+ /**
+ * Get long value at rowId
+ */
+ @Override
+ public long getLong(int rowId) {
+ return longData[rowId];
+ }
+
+ /**
+ * Get float value at rowId
+ */
+ @Override
+ public float getFloat(int rowId) {
+ return floatData[rowId];
+ }
+
+ /**
+ * Get double value at rowId
+ */
+ @Override
+ public double getDouble(int rowId) {
+ return doubleData[rowId];
+ }
+
+ @Override
+ public BigDecimal getDecimal(int rowId) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ /**
+ * Get byte value page
+ */
+ @Override
+ public byte[] getBytePage() {
+ return byteData;
+ }
+
+ /**
+ * Get short value page
+ */
+ @Override
+ public short[] getShortPage() {
+ return shortData;
+ }
+
+ /**
+ * Get int value page
+ */
+ @Override
+ public int[] getIntPage() {
+ return intData;
+ }
+
+ /**
+ * Get long value page
+ */
+ @Override
+ public long[] getLongPage() {
+ return longData;
+ }
+
+ /**
+ * Get float value page
+ */
+ @Override
+ public float[] getFloatPage() {
+ return floatData;
+ }
+
+ /**
+ * Get double value page
+ */
+ @Override
+ public double[] getDoublePage() {
+ return doubleData;
+ }
+
+ /**
+ * Get string page
+ */
+ @Override
+ public byte[][] getByteArrayPage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public byte[] getFlattenedBytePage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ /**
+ * Set byte values to page
+ */
+ @Override
+ public void setBytePage(byte[] byteData) {
+ this.byteData = byteData;
+ }
+
+ /**
+ * Set short values to page
+ */
+ @Override
+ public void setShortPage(short[] shortData) {
+ this.shortData = shortData;
+ }
+
+ /**
+ * Set int values to page
+ */
+ @Override
+ public void setIntPage(int[] intData) {
+ this.intData = intData;
+ }
+
+ /**
+ * Set long values to page
+ */
+ @Override
+ public void setLongPage(long[] longData) {
+ this.longData = longData;
+ }
+
+ /**
+ * Set float values to page
+ */
+ @Override
+ public void setFloatPage(float[] floatData) {
+ this.floatData = floatData;
+ }
+
+ /**
+ * Set double value to page
+ */
+ @Override
+ public void setDoublePage(double[] doubleData) {
+ this.doubleData = doubleData;
+ }
+
+ /**
+ * Set decimal values to page
+ */
+ @Override
+ public void setByteArrayPage(byte[][] byteArray) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void freeMemory() {
+ }
+
+ /**
+ * apply encoding to page data
+ * @param codec type of transformation
+ */
+ @Override
+ public void encode(PrimitiveCodec codec) {
+ switch (dataType) {
+ case BYTE:
+ for (int i = 0; i < pageSize; i++) {
+ codec.encode(i, byteData[i]);
+ }
+ break;
+ case SHORT:
+ for (int i = 0; i < pageSize; i++) {
+ codec.encode(i, shortData[i]);
+ }
+ break;
+ case INT:
+ for (int i = 0; i < pageSize; i++) {
+ codec.encode(i, intData[i]);
+ }
+ break;
+ case LONG:
+ for (int i = 0; i < pageSize; i++) {
+ codec.encode(i, longData[i]);
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < pageSize; i++) {
+ codec.encode(i, floatData[i]);
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < pageSize; i++) {
+ codec.encode(i, doubleData[i]);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("not support encode on " + dataType + " page");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
new file mode 100644
index 0000000..3a76f55
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
+
+ // for string and decimal data
+ private byte[][] byteArrayData;
+
+ SafeVarLengthColumnPage(DataType dataType, int pageSize) {
+ super(dataType, pageSize);
+ byteArrayData = new byte[pageSize][];
+ }
+
+ @Override
+ public void freeMemory() {
+ }
+
+ @Override
+ public void putBytesAtRow(int rowId, byte[] bytes) {
+ byteArrayData[rowId] = bytes;
+ }
+
+ @Override
+ public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+ byteArrayData[rowId] = new byte[length];
+ System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length);
+ }
+
+ @Override
+ public BigDecimal getDecimal(int rowId) {
+ byte[] bytes = byteArrayData[rowId];
+ return DataTypeUtil.byteToBigDecimal(bytes);
+ }
+
+ @Override
+ public void setByteArrayPage(byte[][] byteArray) {
+ byteArrayData = byteArray;
+ }
+
+ @Override
+ public byte[][] getByteArrayPage() {
+ return byteArrayData;
+ }
+
+ @Override
+ void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
+ System.arraycopy(byteArrayData[rowId], 0, dest, destOffset, length);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
new file mode 100644
index 0000000..ac1961a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
+
+// This extension uses unsafe memory to store page data, for fix length data type only (byte,
+// short, integer, long, float, double)
+public class UnsafeFixLengthColumnPage extends ColumnPage {
+ // memory allocated by Unsafe
+ private MemoryBlock memoryBlock;
+
+ // base address of memoryBlock
+ private Object baseAddress;
+
+ // base offset of memoryBlock
+ private long baseOffset;
+
+ private static final int byteBits = BYTE.getSizeBits();
+ private static final int shortBits = DataType.SHORT.getSizeBits();
+ private static final int intBits = DataType.INT.getSizeBits();
+ private static final int longBits = DataType.LONG.getSizeBits();
+ private static final int floatBits = DataType.FLOAT.getSizeBits();
+ private static final int doubleBits = DataType.DOUBLE.getSizeBits();
+
+ UnsafeFixLengthColumnPage(DataType dataType, int pageSize) throws MemoryException {
+ super(dataType, pageSize);
+ switch (dataType) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ int size = pageSize << dataType.getSizeBits();
+ memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(size);
+ baseAddress = memoryBlock.getBaseObject();
+ baseOffset = memoryBlock.getBaseOffset();
+ break;
+ case DECIMAL:
+ case STRING:
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+ }
+
+ @Override
+ public void putByte(int rowId, byte value) {
+ long offset = rowId << byteBits;
+ CarbonUnsafe.unsafe.putByte(baseAddress, baseOffset + offset, value);
+ }
+
+ @Override
+ public void putShort(int rowId, short value) {
+ long offset = rowId << shortBits;
+ CarbonUnsafe.unsafe.putShort(baseAddress, baseOffset + offset, value);
+ }
+
+ @Override
+ public void putInt(int rowId, int value) {
+ long offset = rowId << intBits;
+ CarbonUnsafe.unsafe.putInt(baseAddress, baseOffset + offset, value);
+ }
+
+ @Override
+ public void putLong(int rowId, long value) {
+ long offset = rowId << longBits;
+ CarbonUnsafe.unsafe.putLong(baseAddress, baseOffset + offset, value);
+ }
+
+ @Override
+ public void putDouble(int rowId, double value) {
+ long offset = rowId << doubleBits;
+ CarbonUnsafe.unsafe.putDouble(baseAddress, baseOffset + offset, value);
+ }
+
+ @Override
+ public void putBytes(int rowId, byte[] bytes) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public byte getByte(int rowId) {
+ long offset = rowId << byteBits;
+ return CarbonUnsafe.unsafe.getByte(baseAddress, baseOffset + offset);
+ }
+
+ @Override
+ public short getShort(int rowId) {
+ long offset = rowId << shortBits;
+ return CarbonUnsafe.unsafe.getShort(baseAddress, baseOffset + offset);
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ long offset = rowId << intBits;
+ return CarbonUnsafe.unsafe.getInt(baseAddress, baseOffset + offset);
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ long offset = rowId << longBits;
+ return CarbonUnsafe.unsafe.getLong(baseAddress, baseOffset + offset);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ long offset = rowId << floatBits;
+ return CarbonUnsafe.unsafe.getFloat(baseAddress, baseOffset + offset);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ long offset = rowId << doubleBits;
+ return CarbonUnsafe.unsafe.getDouble(baseAddress, baseOffset + offset);
+ }
+
+ @Override
+ public BigDecimal getDecimal(int rowId) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public byte[] getBytePage() {
+ byte[] data = new byte[getPageSize()];
+ for (int i = 0; i < data.length; i++) {
+ long offset = i << byteBits;
+ data[i] = CarbonUnsafe.unsafe.getByte(baseAddress, baseOffset + offset);
+ }
+ return data;
+ }
+
+ @Override
+ public short[] getShortPage() {
+ short[] data = new short[getPageSize()];
+ for (int i = 0; i < data.length; i++) {
+ long offset = i << shortBits;
+ data[i] = CarbonUnsafe.unsafe.getShort(baseAddress, baseOffset + offset);
+ }
+ return data;
+ }
+
+ @Override
+ public int[] getIntPage() {
+ int[] data = new int[getPageSize()];
+ for (int i = 0; i < data.length; i++) {
+ long offset = i << intBits;
+ data[i] = CarbonUnsafe.unsafe.getInt(baseAddress, baseOffset + offset);
+ }
+ return data;
+ }
+
+ @Override
+ public long[] getLongPage() {
+ long[] data = new long[getPageSize()];
+ for (int i = 0; i < data.length; i++) {
+ long offset = i << longBits;
+ data[i] = CarbonUnsafe.unsafe.getLong(baseAddress, baseOffset + offset);
+ }
+ return data;
+ }
+
+ @Override
+ public float[] getFloatPage() {
+ float[] data = new float[getPageSize()];
+ for (int i = 0; i < data.length; i++) {
+ long offset = i << floatBits;
+ data[i] = CarbonUnsafe.unsafe.getFloat(baseAddress, baseOffset + offset);
+ }
+ return data;
+ }
+
+ @Override
+ public double[] getDoublePage() {
+ double[] data = new double[getPageSize()];
+ for (int i = 0; i < data.length; i++) {
+ long offset = i << doubleBits;
+ data[i] = CarbonUnsafe.unsafe.getDouble(baseAddress, baseOffset + offset);
+ }
+ return data;
+ }
+
+ @Override
+ public byte[][] getByteArrayPage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public byte[] getFlattenedBytePage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void setBytePage(byte[] byteData) {
+ for (int i = 0; i < byteData.length; i++) {
+ long offset = i << byteBits;
+ CarbonUnsafe.unsafe.putByte(baseAddress, baseOffset + offset, byteData[i]);
+ }
+ }
+
+ @Override
+ public void setShortPage(short[] shortData) {
+ for (int i = 0; i < shortData.length; i++) {
+ long offset = i << shortBits;
+ CarbonUnsafe.unsafe.putShort(baseAddress, baseOffset + offset, shortData[i]);
+ }
+ }
+
+ @Override
+ public void setIntPage(int[] intData) {
+ for (int i = 0; i < intData.length; i++) {
+ long offset = i << intBits;
+ CarbonUnsafe.unsafe.putInt(baseAddress, baseOffset + offset, intData[i]);
+ }
+ }
+
+ @Override
+ public void setLongPage(long[] longData) {
+ for (int i = 0; i < longData.length; i++) {
+ long offset = i << longBits;
+ CarbonUnsafe.unsafe.putLong(baseAddress, baseOffset + offset, longData[i]);
+ }
+ }
+
+ @Override
+ public void setFloatPage(float[] floatData) {
+ for (int i = 0; i < floatData.length; i++) {
+ long offset = i << floatBits;
+ CarbonUnsafe.unsafe.putFloat(baseAddress, baseOffset + offset, floatData[i]);
+ }
+ }
+
+ @Override
+ public void setDoublePage(double[] doubleData) {
+ for (int i = 0; i < doubleData.length; i++) {
+ long offset = i << doubleBits;
+ CarbonUnsafe.unsafe.putDouble(baseAddress, baseOffset + offset, doubleData[i]);
+ }
+ }
+
+ @Override
+ public void setByteArrayPage(byte[][] byteArray) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ public void freeMemory() {
+ if (memoryBlock != null) {
+ UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+ memoryBlock = null;
+ baseAddress = null;
+ baseOffset = 0;
+ }
+ }
+
+ @Override
+ public void encode(PrimitiveCodec codec) {
+ int pageSize = getPageSize();
+ switch (dataType) {
+ case BYTE:
+ for (int i = 0; i < pageSize; i++) {
+ long offset = i << byteBits;
+ codec.encode(i, CarbonUnsafe.unsafe.getByte(baseAddress, baseOffset + offset));
+ }
+ break;
+ case SHORT:
+ for (int i = 0; i < pageSize; i++) {
+ long offset = i << shortBits;
+ codec.encode(i, CarbonUnsafe.unsafe.getShort(baseAddress, baseOffset + offset));
+ }
+ break;
+ case INT:
+ for (int i = 0; i < pageSize; i++) {
+ long offset = i << intBits;
+ codec.encode(i, CarbonUnsafe.unsafe.getInt(baseAddress, baseOffset + offset));
+ }
+ break;
+ case LONG:
+ for (int i = 0; i < pageSize; i++) {
+ long offset = i << longBits;
+ codec.encode(i, CarbonUnsafe.unsafe.getLong(baseAddress, baseOffset + offset));
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < pageSize; i++) {
+ long offset = i << floatBits;
+ codec.encode(i, CarbonUnsafe.unsafe.getFloat(baseAddress, baseOffset + offset));
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < pageSize; i++) {
+ long offset = i << doubleBits;
+ codec.encode(i, CarbonUnsafe.unsafe.getDouble(baseAddress, baseOffset + offset));
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+ }
+
+ @Override
+ public byte[] compress(Compressor compressor) {
+ // TODO: use zero-copy raw compression
+ return super.compress(compressor);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
new file mode 100644
index 0000000..75b5312
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+// This extension uses unsafe memory to store page data, for variable length data type (string,
+// decimal)
+public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
+
+ // memory allocated by Unsafe
+ private MemoryBlock memoryBlock;
+
+ // base address of memoryBlock
+ private Object baseAddress;
+
+ // base offset of memoryBlock
+ private long baseOffset;
+
+ // size of the allocated memory, in bytes
+ private int capacity;
+
+ // default size for each row, grows as needed
+ private static final int DEFAULT_ROW_SIZE = 8;
+
+ private static final double FACTOR = 1.25;
+
+ UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException {
+ super(dataType, pageSize);
+ capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
+ memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+ baseAddress = memoryBlock.getBaseObject();
+ baseOffset = memoryBlock.getBaseOffset();
+ }
+
+ @Override
+ public void freeMemory() {
+ if (memoryBlock != null) {
+ UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+ memoryBlock = null;
+ baseAddress = null;
+ baseOffset = 0;
+ }
+ }
+
+ private void ensureMemory(int requestSize) throws MemoryException {
+ if (totalLength + requestSize > capacity) {
+ int newSize = 2 * capacity;
+ MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(newSize);
+ CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset,
+ newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
+ UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+ memoryBlock = newBlock;
+ baseAddress = newBlock.getBaseObject();
+ baseOffset = newBlock.getBaseOffset();
+ capacity = newSize;
+ }
+ }
+
+ @Override
+ public void putBytesAtRow(int rowId, byte[] bytes) {
+ try {
+ ensureMemory(bytes.length);
+ } catch (MemoryException e) {
+ throw new RuntimeException(e);
+ }
+ CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+ baseAddress, baseOffset + rowOffset[rowId], bytes.length);
+ }
+
+ @Override
+ public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+ CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset,
+ baseAddress, baseOffset + rowOffset[rowId], length);
+ }
+
+ @Override
+ public BigDecimal getDecimal(int rowId) {
+ int length = rowOffset[rowId + 1] - rowOffset[rowId];
+ byte[] bytes = new byte[length];
+ CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+ bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+
+ return DataTypeUtil.byteToBigDecimal(bytes);
+ }
+
+ @Override
+ public byte[][] getByteArrayPage() {
+ byte[][] bytes = new byte[pageSize][];
+ for (int rowId = 0; rowId < pageSize; rowId++) {
+ int length = rowOffset[rowId + 1] - rowOffset[rowId];
+ byte[] rowData = new byte[length];
+ CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+ rowData, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+ bytes[rowId] = rowData;
+ }
+ return bytes;
+ }
+
+ @Override
+ void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
+ CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+ dest, CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
new file mode 100644
index 0000000..fa5f478
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
+
+public abstract class VarLengthColumnPageBase extends ColumnPage {
+
+ // the offset of row in the unsafe memory, its size is pageSize + 1
+ int[] rowOffset;
+
+ // the length of bytes added in the page
+ int totalLength;
+
+ VarLengthColumnPageBase(DataType dataType, int pageSize) {
+ super(dataType, pageSize);
+ rowOffset = new int[pageSize + 1];
+ totalLength = 0;
+ }
+
+ @Override
+ public void setBytePage(byte[] byteData) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void setShortPage(short[] shortData) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void setIntPage(int[] intData) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void setLongPage(long[] longData) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void setFloatPage(float[] floatData) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void setDoublePage(double[] doubleData) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void setByteArrayPage(byte[][] byteArray) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ /**
+ * Create a new column page based on the LV (Length Value) encoded bytes
+ */
+ static ColumnPage newDecimalColumnPage(byte[] lvEncodedBytes) throws MemoryException {
+ // extract length and data, set them to rowOffset and unsafe memory correspondingly
+ int rowId = 0;
+ List<Integer> rowOffset = new ArrayList<>();
+ List<Integer> rowLength = new ArrayList<>();
+ int length;
+ int offset;
+ int lvEncodedOffset = 0;
+
+ // extract Length field in input and calculate total length
+ for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) {
+ length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
+ rowOffset.add(offset);
+ rowLength.add(length);
+ lvEncodedOffset += 4 + length;
+ rowId++;
+ }
+ rowOffset.add(offset);
+
+ int numRows = rowId;
+
+ VarLengthColumnPageBase page;
+ if (unsafe) {
+ page = new UnsafeVarLengthColumnPage(DECIMAL, numRows);
+ } else {
+ page = new SafeVarLengthColumnPage(DECIMAL, numRows);
+ }
+
+ // set total length and rowOffset in page
+ page.totalLength = offset;
+ page.rowOffset = new int[rowId + 1];
+ for (int i = 0; i < rowId + 1; i++) {
+ page.rowOffset[i] = rowOffset.get(i);
+ }
+
+ // set data in page
+ lvEncodedOffset = 0;
+ for (int i = 0; i < numRows; i++) {
+ length = rowLength.get(i);
+ page.putBytes(i, lvEncodedBytes, lvEncodedOffset + 4, length);
+ lvEncodedOffset += 4 + length;
+ }
+
+ return page;
+ }
+
+ @Override
+ public void putByte(int rowId, byte value) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void putShort(int rowId, short value) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void putInt(int rowId, int value) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void putLong(int rowId, long value) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public void putDouble(int rowId, double value) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ abstract void putBytesAtRow(int rowId, byte[] bytes);
+
+ @Override
+ public void putBytes(int rowId, byte[] bytes) {
+ if (rowId == 0) {
+ rowOffset[0] = 0;
+ }
+ rowOffset[rowId + 1] = rowOffset[rowId] + bytes.length;
+ putBytesAtRow(rowId, bytes);
+ totalLength += bytes.length;
+ }
+
+ @Override
+ public byte getByte(int rowId) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public short getShort(int rowId) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public byte[] getBytePage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public short[] getShortPage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public int[] getIntPage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public long[] getLongPage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public float[] getFloatPage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public double[] getDoublePage() {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ /**
+ * Copy `length` bytes from data at rowId to dest start from destOffset
+ */
+ abstract void copyBytes(int rowId, byte[] dest, int destOffset, int length);
+
+ @Override
+ public byte[] getFlattenedBytePage() {
+ // output LV encoded byte array
+ int offset = 0;
+ byte[] data = new byte[totalLength + pageSize * 4];
+ for (int rowId = 0; rowId < pageSize; rowId++) {
+ int length = rowOffset[rowId + 1] - rowOffset[rowId];
+ ByteUtil.setInt(data, offset, length);
+ copyBytes(rowId, data, offset + 4, length);
+ offset += 4 + length;
+ }
+ return data;
+ }
+
+ @Override
+ public void encode(PrimitiveCodec codec) {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
index c843b55..6127583 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page.encoding;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
/**
@@ -52,9 +53,9 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
public abstract String getName();
- public abstract byte[] encode(ColumnPage input);
+ public abstract byte[] encode(ColumnPage input) throws MemoryException;
- public abstract ColumnPage decode(byte[] input, int offset, int length);
+ public abstract ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
index f768a14..47bc390 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
/**
@@ -48,18 +49,20 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
}
@Override
- public byte[] encode(ColumnPage input) {
+ public byte[] encode(ColumnPage input) throws MemoryException {
if (srcDataType.equals(targetDataType)) {
return input.compress(compressor);
} else {
encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
input.encode(codec);
- return encodedPage.compress(compressor);
+ byte[] result = encodedPage.compress(compressor);
+ encodedPage.freeMemory();
+ return result;
}
}
@Override
- public ColumnPage decode(byte[] input, int offset, int length) {
+ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
if (srcDataType.equals(targetDataType)) {
return ColumnPage.decompress(compressor, targetDataType, input, offset, length);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
index 21913be..afba173 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.core.datastore.page.encoding;
import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
/**
* Codec for a column page data, implementation should not keep state across pages,
@@ -35,7 +36,7 @@ public interface ColumnPageCodec {
* @param input column page to apply
* @return encoded data
*/
- byte[] encode(ColumnPage input);
+ byte[] encode(ColumnPage input) throws MemoryException;
/**
* decode byte array from offset to a column page
@@ -44,5 +45,5 @@ public interface ColumnPageCodec {
* @param length length of data to decode
* @return decoded data
*/
- ColumnPage decode(byte[] input, int offset, int length);
+ ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
index 4568503..722ba21 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
/**
@@ -30,7 +31,7 @@ public class CompressionCodec implements ColumnPageCodec {
private Compressor compressor;
private DataType dataType;
- protected CompressionCodec(DataType dataType, Compressor compressor) {
+ private CompressionCodec(DataType dataType, Compressor compressor) {
this.compressor = compressor;
this.dataType = dataType;
}
@@ -50,7 +51,7 @@ public class CompressionCodec implements ColumnPageCodec {
}
@Override
- public ColumnPage decode(byte[] input, int offset, int length) {
+ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
return ColumnPage.decompress(compressor, dataType, input, offset, length);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
index e8e7779..e3ed032 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
/**
@@ -63,14 +64,16 @@ public class DeltaIntegerCodec extends AdaptiveCompressionCodec {
}
@Override
- public byte[] encode(ColumnPage input) {
+ public byte[] encode(ColumnPage input) throws MemoryException {
encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
input.encode(codec);
- return encodedPage.compress(compressor);
+ byte[] result = encodedPage.compress(compressor);
+ encodedPage.freeMemory();
+ return result;
}
@Override
- public ColumnPage decode(byte[] input, int offset, int length) {
+ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
return LazyColumnPage.newPage(page, codec);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
index c58a96f..70c761f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
/**
@@ -56,14 +57,16 @@ public class UpscaleDeltaFloatingCodec extends AdaptiveCompressionCodec {
}
@Override
- public byte[] encode(ColumnPage input) {
+ public byte[] encode(ColumnPage input) throws MemoryException {
encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
input.encode(codec);
- return encodedPage.compress(compressor);
+ byte[] result = encodedPage.compress(compressor);
+ encodedPage.freeMemory();
+ return result;
}
@Override
- public ColumnPage decode(byte[] input, int offset, int length) {
+ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
return LazyColumnPage.newPage(page, codec);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
index 4f5ee13..ae80684 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
/**
@@ -50,15 +51,17 @@ public class UpscaleFloatingCodec extends AdaptiveCompressionCodec {
}
@Override
- public byte[] encode(ColumnPage input) {
+ public byte[] encode(ColumnPage input) throws MemoryException {
encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
input.encode(codec);
- return encodedPage.compress(compressor);
+ byte[] result = encodedPage.compress(compressor);
+ encodedPage.freeMemory();
+ return result;
}
@Override
- public ColumnPage decode(byte[] input, int offset, int length) {
+ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
return LazyColumnPage.newPage(page, codec);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
index a5b3148..642e6b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
@@ -175,22 +175,23 @@ public class ColumnPageStatsVO {
private byte[] getValueAsBytes(Object value) {
ByteBuffer b;
switch (dataType) {
- case DOUBLE:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
b = ByteBuffer.allocate(8);
- b.putDouble((Double) value);
+ b.putLong((Long) value);
b.flip();
return b.array();
- case LONG:
- case INT:
- case SHORT:
+ case DOUBLE:
b = ByteBuffer.allocate(8);
- b.putLong((Long) value);
+ b.putDouble((Double) value);
b.flip();
return b.array();
case DECIMAL:
return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
default:
- throw new IllegalArgumentException("Invalid data type");
+ throw new IllegalArgumentException("Invalid data type: " + dataType);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
index 5519f2d..0d604fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
@@ -44,8 +44,8 @@ public class IntPointerBuffer {
pointerBlock = new int[length];
}
- public void set(int index, int value) {
- pointerBlock[index] = value;
+ public void set(int rowId, int value) {
+ pointerBlock[rowId] = value;
}
public void set(int value) {
@@ -55,16 +55,16 @@ public class IntPointerBuffer {
}
/**
- * Returns the value at position {@code index}.
+ * Returns the value at position {@code rowId}.
*/
- public int get(int index) {
- assert index >= 0 : "index (" + index + ") should >= 0";
- assert index < length : "index (" + index + ") should < length (" + length + ")";
+ public int get(int rowId) {
+ assert rowId >= 0 : "rowId (" + rowId + ") should >= 0";
+ assert rowId < length : "rowId (" + rowId + ") should < length (" + length + ")";
if (pointerBlock == null) {
return CarbonUnsafe.unsafe.getInt(pointerMemoryBlock.getBaseObject(),
- pointerMemoryBlock.getBaseOffset() + (index * 4));
+ pointerMemoryBlock.getBaseOffset() + (rowId << 2));
}
- return pointerBlock[index];
+ return pointerBlock[rowId];
}
public void loadToUnsafe() throws MemoryException {