You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/09/12 09:55:24 UTC
[3/4] carbondata git commit: [CARBONDATA-2851][CARBONDATA-2852]
Support zstd as column compressor in final store
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
index d85d6cd..9bed89f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
-import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.Encoding;
@@ -65,7 +64,8 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
int[] rlePage;
// uncompress the encoded column page
- byte[] bytes = CompressorFactory.getInstance().getCompressor()
+ byte[] bytes = CompressorFactory.getInstance().getCompressor(
+ encodedColumnPage.getActualPage().getColumnPageEncoderMeta().getCompressorName())
.unCompressByte(encodedColumnPage.getEncodedData().array(), offset,
encodedColumnPage.getPageMetadata().data_page_length);
@@ -94,15 +94,10 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
// disable encoding using local dictionary
encodedColumnPage.getActualPage().disableLocalDictEncoding();
- // get column spec for existing column page
- TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
-
- // get the dataType of column
- DataType dataType = encodedColumnPage.getActualPage().getDataType();
-
// create a new column page which will have actual data instead of encoded data
ColumnPage actualDataColumnPage =
- ColumnPage.newPage(columnSpec, dataType, encodedColumnPage.getActualPage().getPageSize());
+ ColumnPage.newPage(encodedColumnPage.getActualPage().getColumnPageEncoderMeta(),
+ encodedColumnPage.getActualPage().getPageSize());
// uncompressed data from encoded column page is dictionary data, get the dictionary data using
// keygenerator
@@ -120,6 +115,8 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
.putBytes(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(keyArray));
}
+ // get column spec for existing column page
+ TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
FallbackEncodedColumnPage fallBackEncodedColumnPage =
CarbonUtil.getFallBackEncodedColumnPage(actualDataColumnPage, pageIndex, columnSpec);
// here freeing the memory of new column page created as fallback is done and
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 255e078..605fe4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -37,7 +37,7 @@ public class LazyColumnPage extends ColumnPage {
private ColumnPageValueConverter converter;
private LazyColumnPage(ColumnPage columnPage, ColumnPageValueConverter converter) {
- super(columnPage.getColumnSpec(), columnPage.getDataType(), columnPage.getPageSize());
+ super(columnPage.getColumnPageEncoderMeta(), columnPage.getPageSize());
this.columnPage = columnPage;
this.converter = converter;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
index 904d7ef..fced016 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -76,14 +76,13 @@ public class LocalDictColumnPage extends ColumnPage {
protected LocalDictColumnPage(ColumnPage actualDataColumnPage, ColumnPage encodedColumnpage,
LocalDictionaryGenerator localDictionaryGenerator, boolean isComplexTypePrimitive,
boolean isDecoderBasedFallBackEnabled) {
- super(actualDataColumnPage.getColumnSpec(), actualDataColumnPage.getDataType(),
- actualDataColumnPage.getPageSize());
+ super(actualDataColumnPage.getColumnPageEncoderMeta(), actualDataColumnPage.getPageSize());
// if threshold is not reached then create page level dictionary
// for encoding with local dictionary
if (!localDictionaryGenerator.isThresholdReached()) {
pageLevelDictionary = new PageLevelDictionary(localDictionaryGenerator,
actualDataColumnPage.getColumnSpec().getFieldName(), actualDataColumnPage.getDataType(),
- isComplexTypePrimitive);
+ isComplexTypePrimitive, actualDataColumnPage.getColumnCompressorName());
this.encodedDataColumnPage = encodedColumnpage;
this.keyGenerator = KeyGeneratorFactory
.getKeyGenerator(new int[] { CarbonCommonConstants.LOCAL_DICTIONARY_MAX + 1 });
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index 89ac4a4..d3e945d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -19,8 +19,7 @@ package org.apache.carbondata.core.datastore.page;
import java.math.BigDecimal;
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.util.ByteUtil;
/**
@@ -36,8 +35,8 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
private byte[] shortIntData;
private byte[][] byteArrayData;
- SafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
- super(columnSpec, dataType, pageSize);
+ SafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+ super(columnPageEncoderMeta, pageSize);
byteArrayData = new byte[pageSize][];
}
@@ -189,8 +188,8 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
}
break;
default:
- throw new UnsupportedOperationException(
- "not support value conversion on " + dataType + " page");
+ throw new UnsupportedOperationException("not support value conversion on "
+ + columnPageEncoderMeta.getStoreDataType() + " page");
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 82f1510..b355220 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -22,7 +22,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.ByteUtil;
@@ -45,8 +45,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
// total number of entries in array
private int arrayElementCount = 0;
- SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
- super(columnSpec, dataType, pageSize);
+ SafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+ super(columnPageEncoderMeta, pageSize);
this.fixedLengthdata = new byte[pageSize][];
}
@@ -120,17 +120,20 @@ public class SafeFixLengthColumnPage extends ColumnPage {
@Override
public void putBytes(int rowId, byte[] bytes, int offset, int length) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putDecimal(int rowId, BigDecimal decimal) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getDecimalPage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
/**
@@ -190,7 +193,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
}
@Override public BigDecimal getDecimal(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
@@ -267,7 +271,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
@Override
public byte[] getLVFlattenedBytePage() throws IOException {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
@@ -345,7 +350,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
*/
@Override
public void setByteArrayPage(byte[][] byteArray) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
@@ -366,33 +372,33 @@ public class SafeFixLengthColumnPage extends ColumnPage {
*/
@Override
public void convertValue(ColumnPageValueConverter codec) {
- if (dataType == DataTypes.BYTE) {
+ if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
for (int i = 0; i < arrayElementCount; i++) {
codec.encode(i, byteData[i]);
}
- } else if (dataType == DataTypes.SHORT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
for (int i = 0; i < arrayElementCount; i++) {
codec.encode(i, shortData[i]);
}
- } else if (dataType == DataTypes.INT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
for (int i = 0; i < arrayElementCount; i++) {
codec.encode(i, intData[i]);
}
- } else if (dataType == DataTypes.LONG) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
for (int i = 0; i < arrayElementCount; i++) {
codec.encode(i, longData[i]);
}
- } else if (dataType == DataTypes.FLOAT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
for (int i = 0; i < arrayElementCount; i++) {
codec.encode(i, floatData[i]);
}
- } else if (dataType == DataTypes.DOUBLE) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
for (int i = 0; i < arrayElementCount; i++) {
codec.encode(i, doubleData[i]);
}
} else {
- throw new UnsupportedOperationException("not support value conversion on " +
- dataType + " page");
+ throw new UnsupportedOperationException("not support value conversion on "
+ + columnPageEncoderMeta.getStoreDataType() + " page");
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 274b8a7..9b47e86 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -24,16 +24,15 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
// for string and decimal data
private List<byte[]> byteArrayData;
- SafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
- super(columnSpec, dataType, pageSize);
+ SafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+ super(columnPageEncoderMeta, pageSize);
byteArrayData = new ArrayList<>();
}
@@ -54,12 +53,14 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
}
@Override public void putDecimal(int rowId, BigDecimal decimal) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public BigDecimal getDecimal(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
index 96aeac2..829fad4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -19,11 +19,10 @@ package org.apache.carbondata.core.datastore.page;
import java.math.BigDecimal;
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.ByteUtil;
@@ -32,36 +31,35 @@ import org.apache.carbondata.core.util.ByteUtil;
*/
public class UnsafeDecimalColumnPage extends DecimalColumnPage {
- UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+ UnsafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
throws MemoryException {
- super(columnSpec, dataType, pageSize);
- capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
- initMemory();
+ this(columnPageEncoderMeta, pageSize, (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR));
}
- UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
- int capacity) throws MemoryException {
- super(columnSpec, dataType, pageSize);
+ UnsafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int capacity)
+ throws MemoryException {
+ super(columnPageEncoderMeta, pageSize);
this.capacity = capacity;
initMemory();
}
private void initMemory() throws MemoryException {
- if (dataType == DataTypes.BYTE ||
- dataType == DataTypes.SHORT ||
- dataType == DataTypes.INT ||
- dataType == DataTypes.LONG) {
- int size = pageSize << dataType.getSizeBits();
+ if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.INT ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
+ int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits();
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
- } else if (dataType == DataTypes.SHORT_INT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
int size = pageSize * 3;
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
- } else if (DataTypes.isDecimal(dataType)) {
+ } else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType())) {
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
- } else if (dataType == DataTypes.BYTE_ARRAY) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
} else {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
@@ -255,8 +253,8 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
}
break;
default:
- throw new UnsupportedOperationException(
- "not support value conversion on " + dataType + " page");
+ throw new UnsupportedOperationException("not support value conversion on "
+ + columnPageEncoderMeta.getStoreDataType() + " page");
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index f75deb6..8a53840 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -20,13 +20,12 @@ package org.apache.carbondata.core.datastore.page;
import java.io.IOException;
import java.math.BigDecimal;
-import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
@@ -61,40 +60,41 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
private static final int floatBits = DataTypes.FLOAT.getSizeBits();
private static final int doubleBits = DataTypes.DOUBLE.getSizeBits();
- UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+ UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
throws MemoryException {
- super(columnSpec, dataType, pageSize);
- if (dataType == DataTypes.BOOLEAN ||
- dataType == DataTypes.BYTE ||
- dataType == DataTypes.SHORT ||
- dataType == DataTypes.INT ||
- dataType == DataTypes.LONG ||
- dataType == DataTypes.FLOAT ||
- dataType == DataTypes.DOUBLE) {
- int size = pageSize << dataType.getSizeBits();
+ super(columnPageEncoderMeta, pageSize);
+ if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BOOLEAN ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.INT ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
+ int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits();
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
capacity = size;
- } else if (dataType == DataTypes.SHORT_INT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
int size = pageSize * 3;
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
baseAddress = memoryBlock.getBaseObject();
baseOffset = memoryBlock.getBaseOffset();
capacity = size;
- } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.STRING) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ } else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType()) ||
+ columnPageEncoderMeta.getStoreDataType() == DataTypes.STRING) {
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
totalLength = 0;
}
- UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
- int eachRowSize)
- throws MemoryException {
- this(columnSpec, dataType, pageSize);
+ UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize,
+ int eachRowSize) throws MemoryException {
+ this(columnPageEncoderMeta, pageSize);
this.eachRowSize = eachRowSize;
totalLength = 0;
- if (dataType == DataTypes.BYTE_ARRAY) {
+ if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
memoryBlock =
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize);
baseAddress = memoryBlock.getBaseObject();
@@ -217,11 +217,13 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
@Override
public void putBytes(int rowId, byte[] bytes, int offset, int length) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override public void putDecimal(int rowId, BigDecimal decimal) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
@@ -272,7 +274,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
@Override
public BigDecimal getDecimal(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
@@ -288,7 +291,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
}
@Override public byte[] getDecimalPage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
@@ -375,7 +379,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
@Override
public byte[] getLVFlattenedBytePage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override public byte[] getComplexChildrenLVFlattenedBytePage() {
@@ -441,7 +446,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
@Override
public void setByteArrayPage(byte[][] byteArray) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
public void freeMemory() {
@@ -455,68 +461,70 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
@Override public void convertValue(ColumnPageValueConverter codec) {
int endLoop = getEndLoop();
- if (dataType == DataTypes.BYTE) {
+ if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
for (long i = 0; i < endLoop; i++) {
long offset = i << byteBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
}
- } else if (dataType == DataTypes.SHORT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
for (long i = 0; i < endLoop; i++) {
long offset = i << shortBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
}
- } else if (dataType == DataTypes.INT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
for (long i = 0; i < endLoop; i++) {
long offset = i << intBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
}
- } else if (dataType == DataTypes.LONG) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
for (long i = 0; i < endLoop; i++) {
long offset = i << longBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
}
- } else if (dataType == DataTypes.FLOAT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
for (long i = 0; i < endLoop; i++) {
long offset = i << floatBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
}
- } else if (dataType == DataTypes.DOUBLE) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
for (long i = 0; i < endLoop; i++) {
long offset = i << doubleBits;
codec.encode((int) i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
}
} else {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
}
private int getEndLoop() {
- if (dataType == DataTypes.BYTE) {
+ if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
return totalLength / ByteUtil.SIZEOF_BYTE;
- } else if (dataType == DataTypes.SHORT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
return totalLength / ByteUtil.SIZEOF_SHORT;
- } else if (dataType == DataTypes.SHORT_INT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
return totalLength / ByteUtil.SIZEOF_SHORT_INT;
- } else if (dataType == DataTypes.INT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
return totalLength / ByteUtil.SIZEOF_INT;
- } else if (dataType == DataTypes.LONG) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
return totalLength / ByteUtil.SIZEOF_LONG;
- } else if (dataType == DataTypes.FLOAT) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
return totalLength / DataTypes.FLOAT.getSizeInBytes();
- } else if (dataType == DataTypes.DOUBLE) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
return totalLength / DataTypes.DOUBLE.getSizeInBytes();
- } else if (dataType == DataTypes.BYTE_ARRAY) {
+ } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
return totalLength / eachRowSize;
} else {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
}
@Override public byte[] compress(Compressor compressor) throws MemoryException, IOException {
- if (UnsafeMemoryManager.isOffHeap()) {
+ if (UnsafeMemoryManager.isOffHeap() && compressor.supportUnsafe()) {
// use raw compression and copy to byte[]
int inputSize = totalLength;
- int compressedMaxSize = compressor.maxCompressedLength(inputSize);
+ long compressedMaxSize = compressor.maxCompressedLength(inputSize);
MemoryBlock compressed =
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize);
long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index ae57dcd..4693dba 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -19,11 +19,10 @@ package org.apache.carbondata.core.datastore.page;
import java.math.BigDecimal;
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
/**
* This extension uses unsafe memory to store page data, for variable length data type (string)
@@ -33,9 +32,9 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
/**
* create a page
*/
- UnsafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+ UnsafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
throws MemoryException {
- super(columnSpec, dataType, pageSize);
+ super(columnPageEncoderMeta, pageSize);
capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
baseAddress = memoryBlock.getBaseObject();
@@ -85,7 +84,8 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
@Override
public BigDecimal getDecimal(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 4edd201..7f0b2a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
@@ -64,13 +65,14 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
// size of the allocated memory, in bytes
int capacity;
- VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
- super(columnSpec, dataType, pageSize);
- TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
- .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
+ VarLengthColumnPageBase(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+ super(columnPageEncoderMeta, pageSize);
+ TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance(
+ columnPageEncoderMeta.getColumnSpec().getFieldName(), DataTypes.INT, ColumnType.MEASURE);
try {
- rowOffset =
- ColumnPage.newPage(spec, DataTypes.INT, pageSize);
+ rowOffset = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(spec, DataTypes.INT, columnPageEncoderMeta.getCompressorName()),
+ pageSize);
} catch (MemoryException e) {
throw new RuntimeException(e);
}
@@ -79,44 +81,51 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
@Override
public void setBytePage(byte[] byteData) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setShortPage(short[] shortData) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setShortIntPage(byte[] shortIntData) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setIntPage(int[] intData) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setLongPage(long[] longData) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setFloatPage(float[] floatData) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void setDoublePage(double[] doubleData) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
/**
* Create a new column page for decimal page
*/
- static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes)
- throws MemoryException {
+ static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
+ String compressorName) throws MemoryException {
DecimalConverterFactory.DecimalConverter decimalConverter =
DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
columnSpec.getScale());
@@ -124,10 +133,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
if (size < 0) {
return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
- CarbonCommonConstants.INT_SIZE_IN_BYTE);
+ CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName);
} else {
// Here the size is always fixed.
- return getDecimalColumnPage(columnSpec, lvEncodedBytes, size);
+ return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName);
}
}
@@ -135,23 +144,26 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
* Create a new column page based on the LV (Length Value) encoded bytes
*/
static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
- int lvLength) throws MemoryException {
- return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
+ int lvLength, String compressorName) throws MemoryException {
+ return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY,
+ lvLength, compressorName);
}
/**
* Create a new column page based on the LV (Length Value) encoded bytes
*/
static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedBytes, int lvLength) throws MemoryException {
- return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
+ byte[] lvEncodedBytes, int lvLength, String compressorName) throws MemoryException {
+ return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY,
+ lvLength, compressorName);
}
private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedBytes, int size) throws MemoryException {
+ byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException {
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
.newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
- ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+ ColumnPage rowOffset = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
int offset;
int rowId = 0;
@@ -165,9 +177,13 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
VarLengthColumnPageBase page;
if (unsafe) {
- page = new UnsafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId);
+ page = new UnsafeDecimalColumnPage(
+ new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+ rowId);
} else {
- page = new SafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId);
+ page = new SafeDecimalColumnPage(
+ new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+ rowId);
}
// set total length and rowOffset in page
@@ -181,13 +197,14 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
}
private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedBytes, DataType dataType, int lvLength)
+ byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName)
throws MemoryException {
// extract length and data, set them to rowOffset and unsafe memory correspondingly
int rowId = 0;
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
.newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
- ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+ ColumnPage rowOffset = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
int length;
int offset;
@@ -202,20 +219,19 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
counter++;
}
rowOffset.putInt(counter, offset);
- VarLengthColumnPageBase page =
- getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
- offset);
- return page;
+ return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType,
+ lvLength, rowId, rowOffset, offset, compressorName);
}
private static ColumnPage getComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedBytes, DataType dataType, int lvLength)
+ byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName)
throws MemoryException {
// extract length and data, set them to rowOffset and unsafe memory correspondingly
int rowId = 0;
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
.newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
- ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+ ColumnPage rowOffset = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
int length;
int offset;
@@ -231,15 +247,13 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
}
rowOffset.putInt(counter, offset);
- VarLengthColumnPageBase page =
- getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
- offset);
- return page;
+ return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType,
+ lvLength, rowId, rowOffset, offset, compressorName);
}
private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSpec columnSpec,
byte[] lvEncodedBytes, DataType dataType, int lvLength, int rowId, ColumnPage rowOffset,
- int offset) throws MemoryException {
+ int offset, String compressorName) throws MemoryException {
int lvEncodedOffset;
int length;
int numRows = rowId;
@@ -247,9 +261,12 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
VarLengthColumnPageBase page;
int inputDataLength = offset;
if (unsafe) {
- page = new UnsafeDecimalColumnPage(columnSpec, dataType, numRows, inputDataLength);
+ page = new UnsafeDecimalColumnPage(
+ new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows,
+ inputDataLength);
} else {
- page = new SafeDecimalColumnPage(columnSpec, dataType, numRows);
+ page = new SafeDecimalColumnPage(
+ new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows);
}
// set total length and rowOffset in page
@@ -269,32 +286,38 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
@Override
public void putByte(int rowId, byte value) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putShort(int rowId, short value) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putShortInt(int rowId, int value) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putInt(int rowId, int value) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putLong(int rowId, long value) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public void putDouble(int rowId, double value) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
abstract void putBytesAtRow(int rowId, byte[] bytes);
@@ -317,72 +340,86 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
@Override
public byte getByte(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public short getShort(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public int getShortInt(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public int getInt(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public long getLong(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public float getFloat(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public double getDouble(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getBytePage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public short[] getShortPage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public byte[] getShortIntPage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public int[] getIntPage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public long[] getLongPage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public float[] getFloatPage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
public double[] getDoublePage() {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
@Override
@@ -445,7 +482,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
@Override
public void convertValue(ColumnPageValueConverter codec) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ throw new UnsupportedOperationException(
+ "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index b5a63f8..2ed12a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -84,7 +84,8 @@ public abstract class ColumnPageEncoder {
}
private void fillBasicFields(ColumnPage inputPage, DataChunk2 dataChunk) {
- dataChunk.setChunk_meta(CarbonMetadataUtil.getSnappyChunkCompressionMeta());
+ dataChunk.setChunk_meta(
+ CarbonMetadataUtil.getChunkCompressorMeta(inputPage.getColumnCompressorName()));
dataChunk.setNumberOfRowsInpage(inputPage.getPageSize());
dataChunk.setRowMajor(false);
}
@@ -92,7 +93,8 @@ public abstract class ColumnPageEncoder {
private void fillNullBitSet(ColumnPage inputPage, DataChunk2 dataChunk) {
PresenceMeta presenceMeta = new PresenceMeta();
presenceMeta.setPresent_bit_streamIsSet(true);
- Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ inputPage.getColumnCompressorName());
presenceMeta.setPresent_bit_stream(
compressor.compressByte(inputPage.getNullBits().toByteArray()));
dataChunk.setPresence(presenceMeta);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 4e04186..971cf24 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -45,14 +45,15 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
// storage data type of this column, it could be different from data type in the column spec
private DataType storeDataType;
- // compressor name for compressing and decompressing this column
- private String compressorName;
+ // compressor name for compressing and decompressing this column.
+ // Make it protected for RLEEncoderMeta
+ protected String compressorName;
public ColumnPageEncoderMeta() {
}
public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType,
- SimpleStatsResult stats, String compressorName) {
+ String compressorName) {
if (columnSpec == null) {
throw new IllegalArgumentException("columm spec must not be null");
}
@@ -66,6 +67,11 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
this.storeDataType = storeDataType;
this.compressorName = compressorName;
setType(DataType.convertType(storeDataType));
+ }
+
+ public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType,
+ SimpleStatsResult stats, String compressorName) {
+ this(columnSpec, storeDataType, compressorName);
if (stats != null) {
setDecimal(stats.getDecimalCount());
setMaxValue(stats.getMax());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 1cc2ba8..29772d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -20,8 +20,6 @@ package org.apache.carbondata.core.datastore.page.encoding;
import java.math.BigDecimal;
import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec;
@@ -73,39 +71,36 @@ public class DefaultEncodingFactory extends EncodingFactory {
private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
ColumnPage inputPage) {
- Compressor compressor = CompressorFactory.getInstance().getCompressor();
switch (columnSpec.getColumnType()) {
case GLOBAL_DICTIONARY:
case DIRECT_DICTIONARY:
case PLAIN_VALUE:
return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
case COMPLEX:
- return new ComplexDimensionIndexCodec(false, false, compressor).createEncoder(null);
+ return new ComplexDimensionIndexCodec(false, false).createEncoder(null);
default:
- throw new RuntimeException("unsupported dimension type: " +
- columnSpec.getColumnType());
+ throw new RuntimeException("unsupported dimension type: " + columnSpec.getColumnType());
}
}
private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec dimensionSpec) {
- Compressor compressor = CompressorFactory.getInstance().getCompressor();
switch (dimensionSpec.getColumnType()) {
case GLOBAL_DICTIONARY:
return new DictDimensionIndexCodec(
dimensionSpec.isInSortColumns(),
- dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
- compressor).createEncoder(null);
+ dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
+ .createEncoder(null);
case DIRECT_DICTIONARY:
return new DirectDictDimensionIndexCodec(
dimensionSpec.isInSortColumns(),
- dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
- compressor).createEncoder(null);
+ dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
+ .createEncoder(null);
case PLAIN_VALUE:
return new HighCardDictDimensionIndexCodec(
dimensionSpec.isInSortColumns(),
dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
- dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR,
- compressor).createEncoder(null);
+ dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR)
+ .createEncoder(null);
default:
throw new RuntimeException("unsupported dimension type: " +
dimensionSpec.getColumnType());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 8bc67c0..d119c8f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -64,8 +64,8 @@ public abstract class EncodingFactory {
/**
* Return new decoder based on encoder metadata read from file
*/
- public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
- throws IOException {
+ public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
+ String compressor) throws IOException {
assert (encodings.size() == 1);
assert (encoderMetas.size() == 1);
Encoding encoding = encodings.get(0);
@@ -111,21 +111,20 @@ public abstract class EncodingFactory {
} else {
// for backward compatibility
ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
- return createDecoderLegacy(metadata);
+ return createDecoderLegacy(metadata, compressor);
}
}
/**
* Old way of creating decoder, based on algorithm
*/
- public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
+ public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) {
if (null == metadata) {
throw new RuntimeException("internal error");
}
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
TableSpec.ColumnSpec spec =
TableSpec.ColumnSpec.newInstanceLegacy("legacy", stats.getDataType(), ColumnType.MEASURE);
- String compressor = "snappy";
DataType dataType = DataType.getDataType(metadata.getType());
if (dataType == DataTypes.BYTE ||
dataType == DataTypes.SHORT ||
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 0e8d1c0..bb928c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -67,16 +67,19 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
@Override
public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
- final Compressor compressor = CompressorFactory.getInstance().getCompressor();
return new ColumnPageEncoder() {
@Override
protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
if (encodedPage != null) {
throw new IllegalStateException("already encoded");
}
- encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+ encodedPage = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+ targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
input.getPageSize());
input.convertValue(converter);
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ input.getColumnCompressorName());
byte[] result = encodedPage.compress(compressor);
encodedPage.freeMemory();
return result;
@@ -92,7 +95,7 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
- compressor.getName());
+ inputPage.getColumnCompressorName());
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index f20422c..ac9693d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -78,16 +78,19 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
@Override
public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
return new ColumnPageEncoder() {
- final Compressor compressor = CompressorFactory.getInstance().getCompressor();
@Override
protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
if (encodedPage != null) {
throw new IllegalStateException("already encoded");
}
- encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+ encodedPage = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+ targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
input.getPageSize());
input.convertValue(converter);
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ input.getColumnCompressorName());
byte[] result = encodedPage.compress(compressor);
encodedPage.freeMemory();
return result;
@@ -96,7 +99,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType,
- inputPage.getStatistics(), compressor.getName());
+ inputPage.getStatistics(), inputPage.getColumnCompressorName());
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 6d7697b..028fa71 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -59,15 +59,18 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
@Override
public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
- final Compressor compressor = CompressorFactory.getInstance().getCompressor();
return new ColumnPageEncoder() {
@Override
protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
if (encodedPage != null) {
throw new IllegalStateException("already encoded");
}
- encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+ encodedPage = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+ targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
input.getPageSize());
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ input.getColumnCompressorName());
input.convertValue(converter);
byte[] result = encodedPage.compress(compressor);
encodedPage.freeMemory();
@@ -84,7 +87,7 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
- compressor.getName());
+ inputPage.getColumnCompressorName());
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index cfc26c7..a9cf742 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -56,15 +56,18 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
@Override
public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
- final Compressor compressor = CompressorFactory.getInstance().getCompressor();
return new ColumnPageEncoder() {
@Override
protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
if (encodedPage != null) {
throw new IllegalStateException("already encoded");
}
- encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+ encodedPage = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+ targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
input.getPageSize());
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ input.getColumnCompressorName());
input.convertValue(converter);
byte[] result = encodedPage.compress(compressor);
encodedPage.freeMemory();
@@ -81,7 +84,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
- compressor.getName());
+ inputPage.getColumnCompressorName());
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 7e1e9dd..aa03ec1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -55,69 +54,53 @@ public class DirectCompressCodec implements ColumnPageCodec {
@Override
public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
- // TODO: make compressor configurable in create table
- return new DirectCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR);
- }
-
- @Override
- public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
- return new DirectDecompressor(meta);
- }
-
- private class DirectCompressor extends ColumnPageEncoder {
-
- private Compressor compressor;
-
- DirectCompressor(String compressorName) {
- this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
- }
-
- @Override
- protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
- return input.compress(compressor);
- }
+ return new ColumnPageEncoder() {
- @Override
- protected List<Encoding> getEncodingList() {
- List<Encoding> encodings = new ArrayList<>();
- encodings.add(dataType == DataTypes.VARCHAR ?
- Encoding.DIRECT_COMPRESS_VARCHAR :
- Encoding.DIRECT_COMPRESS);
- return encodings;
- }
+ @Override
+ protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ input.getColumnCompressorName());
+ return input.compress(compressor);
+ }
- @Override
- protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
- return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
- inputPage.getStatistics(), compressor.getName());
- }
+ @Override
+ protected List<Encoding> getEncodingList() {
+ List<Encoding> encodings = new ArrayList<>();
+ encodings.add(dataType == DataTypes.VARCHAR ?
+ Encoding.DIRECT_COMPRESS_VARCHAR :
+ Encoding.DIRECT_COMPRESS);
+ return encodings;
+ }
+ @Override
+ protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+ return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
+ inputPage.getStatistics(), inputPage.getColumnCompressorName());
+ }
+ };
}
- private class DirectDecompressor implements ColumnPageDecoder {
-
- private ColumnPageEncoderMeta meta;
-
- DirectDecompressor(ColumnPageEncoderMeta meta) {
- this.meta = meta;
- }
-
- @Override
- public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
- ColumnPage decodedPage;
- if (DataTypes.isDecimal(dataType)) {
- decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
- } else {
- decodedPage = ColumnPage.decompress(meta, input, offset, length, false);
+ @Override
+ public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+ return new ColumnPageDecoder() {
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+ ColumnPage decodedPage;
+ if (DataTypes.isDecimal(dataType)) {
+ decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+ } else {
+ decodedPage = ColumnPage.decompress(meta, input, offset, length, false);
+ }
+ return LazyColumnPage.newPage(decodedPage, converter);
}
- return LazyColumnPage.newPage(decodedPage, converter);
- }
- @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+ @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
- return LazyColumnPage
- .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
- }
+ return LazyColumnPage.newPage(
+ ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
+ }
+ };
}
private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
index e37b8f6..cc044cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
import org.apache.carbondata.core.util.ByteUtil;
@@ -31,9 +32,8 @@ import org.apache.carbondata.format.Encoding;
public class ComplexDimensionIndexCodec extends IndexStorageCodec {
- public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
- Compressor compressor) {
- super(isSort, isInvertedIndex, compressor);
+ public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+ super(isSort, isInvertedIndex);
}
@Override
@@ -49,6 +49,8 @@ public class ComplexDimensionIndexCodec extends IndexStorageCodec {
IndexStorage indexStorage =
new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), false, false, false);
byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ inputPage.getColumnCompressorName());
byte[] compressed = compressor.compressByte(flattened);
super.indexStorage = indexStorage;
super.compressedDataPage = compressed;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
index d157654..66f5f1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
import org.apache.carbondata.core.util.ByteUtil;
@@ -32,8 +33,8 @@ import org.apache.carbondata.format.Encoding;
public class DictDimensionIndexCodec extends IndexStorageCodec {
- public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
- super(isSort, isInvertedIndex, compressor);
+ public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+ super(isSort, isInvertedIndex);
}
@Override
@@ -54,6 +55,8 @@ public class DictDimensionIndexCodec extends IndexStorageCodec {
indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
}
byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ inputPage.getColumnCompressorName());
super.compressedDataPage = compressor.compressByte(flattened);
super.indexStorage = indexStorage;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
index 1e5015b..a130cbd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
import org.apache.carbondata.core.util.ByteUtil;
@@ -32,9 +33,8 @@ import org.apache.carbondata.format.Encoding;
public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
- public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
- Compressor compressor) {
- super(isSort, isInvertedIndex, compressor);
+ public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+ super(isSort, isInvertedIndex);
}
@Override
@@ -55,6 +55,8 @@ public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
}
byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ inputPage.getColumnCompressorName());
super.compressedDataPage = compressor.compressByte(flattened);
super.indexStorage = indexStorage;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
index f9c124f..bce8523 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
import org.apache.carbondata.core.util.ByteUtil;
@@ -37,8 +38,8 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
private boolean isVarcharType;
public HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
- boolean isVarcharType, Compressor compressor) {
- super(isSort, isInvertedIndex, compressor);
+ boolean isVarcharType) {
+ super(isSort, isInvertedIndex);
this.isVarcharType = isVarcharType;
}
@@ -63,6 +64,8 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
new BlockIndexerStorageForNoInvertedIndexForShort(data, isDictionary);
}
byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+ Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ input.getColumnCompressorName());
super.compressedDataPage = compressor.compressByte(flattened);
super.indexStorage = indexStorage;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
index cb6b387..13a9215 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
@@ -17,20 +17,17 @@
package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
-import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
public abstract class IndexStorageCodec implements ColumnPageCodec {
- protected Compressor compressor;
protected boolean isSort;
protected boolean isInvertedIndex;
- IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+ IndexStorageCodec(boolean isSort, boolean isInvertedIndex) {
this.isSort = isSort;
this.isInvertedIndex = isInvertedIndex;
- this.compressor = compressor;
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index fa03809..e7d4118 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -66,7 +66,7 @@ public class RLECodec implements ColumnPageCodec {
public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
assert meta instanceof RLEEncoderMeta;
RLEEncoderMeta codecMeta = (RLEEncoderMeta) meta;
- return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize());
+ return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize(), meta.getCompressorName());
}
// This codec supports integral type only
@@ -151,7 +151,10 @@ public class RLECodec implements ColumnPageCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
return new RLEEncoderMeta(inputPage.getColumnSpec(),
- inputPage.getDataType(), inputPage.getPageSize(), inputPage.getStatistics());
+ inputPage.getDataType(),
+ inputPage.getPageSize(),
+ inputPage.getStatistics(),
+ inputPage.getColumnCompressorName());
}
private void putValue(Object value) throws IOException {
@@ -281,11 +284,13 @@ public class RLECodec implements ColumnPageCodec {
private TableSpec.ColumnSpec columnSpec;
private int pageSize;
+ private String compressorName;
- private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize) {
+ private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize, String compressorName) {
validateDataType(columnSpec.getSchemaDataType());
this.columnSpec = columnSpec;
this.pageSize = pageSize;
+ this.compressorName = compressorName;
}
@Override
@@ -293,7 +298,8 @@ public class RLECodec implements ColumnPageCodec {
throws MemoryException, IOException {
DataType dataType = columnSpec.getSchemaDataType();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
- ColumnPage resultPage = ColumnPage.newPage(columnSpec, dataType, pageSize);
+ ColumnPage resultPage = ColumnPage.newPage(
+ new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize);
if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
decodeBytePage(in, resultPage);
} else if (dataType == DataTypes.SHORT) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
index 8871671..25533f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
@@ -39,8 +39,8 @@ public class RLEEncoderMeta extends ColumnPageEncoderMeta implements Writable {
}
public RLEEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
- SimpleStatsResult stats) {
- super(columnSpec, dataType, stats, "");
+ SimpleStatsResult stats, String compressorName) {
+ super(columnSpec, dataType, stats, compressorName);
this.pageSize = pageSize;
}