You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 16:55:38 UTC
[2/3] carbondata git commit: [CARBONDATA-3012] Added support for full
scan queries for vector direct fill.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 39b8282..a760b64 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -124,8 +124,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
/**
* Create a new column page for decimal page
*/
- static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
- String compressorName) throws MemoryException {
+ static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
+ byte[] lvEncodedBytes) throws MemoryException {
+ TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
DecimalConverterFactory.DecimalConverter decimalConverter =
DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
columnSpec.getScale());
@@ -133,10 +134,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
if (size < 0) {
return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
- CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName);
+ CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
} else {
// Here the size is always fixed.
- return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName);
+ return getDecimalColumnPage(meta, lvEncodedBytes, size);
}
}
@@ -158,8 +159,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
lvLength, compressorName);
}
- private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException {
+ private static ColumnPage getDecimalColumnPage(ColumnPageEncoderMeta meta,
+ byte[] lvEncodedBytes, int size) throws MemoryException {
+ TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+ String compressorName = meta.getCompressorName();
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
.newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
ColumnPage rowOffset = ColumnPage.newPage(
@@ -176,7 +179,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
rowOffset.putInt(counter, offset);
VarLengthColumnPageBase page;
- if (unsafe) {
+ if (isUnsafeEnabled(meta)) {
page = new UnsafeDecimalColumnPage(
new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
rowId);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
index 4e491c5..d82a873 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
@@ -18,9 +18,11 @@
package org.apache.carbondata.core.datastore.page.encoding;
import java.io.IOException;
+import java.util.BitSet;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
public interface ColumnPageDecoder {
@@ -29,6 +31,12 @@ public interface ColumnPageDecoder {
*/
ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException;
+ /**
+ * Apply decoding algorithm on input byte array and fill the vector here.
+ */
+ void decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
+ BitSet nullBits, boolean isLVEncoded) throws MemoryException, IOException;
+
ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index e6aafa0..03a43f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -49,6 +49,9 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
// Make it protected for RLEEncoderMeta
protected String compressorName;
+ // Whether the flow shoild go to fill complete vector while decoding the page.
+ private transient boolean fillCompleteVector;
+
public ColumnPageEncoderMeta() {
}
@@ -284,4 +287,12 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
public DataType getSchemaDataType() {
return columnSpec.getSchemaDataType();
}
+
+ public boolean isFillCompleteVector() {
+ return fillCompleteVector;
+ }
+
+ public void setFillCompleteVector(boolean fillCompleteVector) {
+ this.fillCompleteVector = fillCompleteVector;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 920a516..d3070b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -66,6 +66,21 @@ public abstract class EncodingFactory {
*/
public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
String compressor) throws IOException {
+ return createDecoder(encodings, encoderMetas, compressor, false);
+ }
+
+ /**
+ * Return new decoder based on encoder metadata read from file
+ * @param encodings encodings used to decode the page
+ * @param encoderMetas metadata of encodings to decode the data
+ * @param compressor Compressor name which will be used to decode data.
+ * @param fullVectorFill whether the flow should go to fill the given vector completely while
+ * decoding the data itself.
+ * @return decoder to decode page.
+ * @throws IOException
+ */
+ public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
+ String compressor, boolean fullVectorFill) throws IOException {
assert (encodings.size() >= 1);
assert (encoderMetas.size() == 1);
Encoding encoding = encodings.get(0);
@@ -74,16 +89,19 @@ public abstract class EncodingFactory {
DataInputStream in = new DataInputStream(stream);
if (encoding == DIRECT_COMPRESS || encoding == DIRECT_COMPRESS_VARCHAR) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
} else if (encoding == ADAPTIVE_INTEGRAL) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
} else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(),
@@ -91,12 +109,14 @@ public abstract class EncodingFactory {
.createDecoder(metadata);
} else if (encoding == ADAPTIVE_FLOATING) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
} else if (encoding == ADAPTIVE_DELTA_FLOATING) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
return new AdaptiveDeltaFloatingCodec(metadata.getSchemaDataType(),
@@ -108,12 +128,13 @@ public abstract class EncodingFactory {
return new RLECodec().createDecoder(metadata);
} else if (encoding == BOOL_BYTE) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
} else {
// for backward compatibility
ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
- return createDecoderLegacy(metadata, compressor);
+ return createDecoderLegacy(metadata, compressor, fullVectorFill);
}
}
@@ -121,6 +142,14 @@ public abstract class EncodingFactory {
* Old way of creating decoder, based on algorithm
*/
public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) {
+ return createDecoderLegacy(metadata, compressor, false);
+ }
+
+ /**
+ * Old way of creating decoder, based on algorithm
+ */
+ private ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor,
+ boolean fullVectorFill) {
if (null == metadata) {
throw new RuntimeException("internal error");
}
@@ -139,16 +168,19 @@ public abstract class EncodingFactory {
AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else if (codec instanceof AdaptiveDeltaIntegralCodec) {
AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else if (codec instanceof DirectCompressCodec) {
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else {
throw new RuntimeException("internal error");
@@ -161,30 +193,36 @@ public abstract class EncodingFactory {
AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else if (codec instanceof DirectCompressCodec) {
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else if (codec instanceof AdaptiveDeltaFloatingCodec) {
AdaptiveDeltaFloatingCodec adaptiveCodec = (AdaptiveDeltaFloatingCodec) codec;
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else {
throw new RuntimeException("internal error");
}
} else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.BYTE_ARRAY) {
// no dictionary dimension
- return new DirectCompressCodec(stats.getDataType())
- .createDecoder(new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
+ ColumnPageEncoderMeta meta =
+ new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
+ return new DirectCompressCodec(stats.getDataType()).createDecoder(meta);
} else if (dataType == DataTypes.LEGACY_LONG) {
// In case of older versions like in V1 format it has special datatype to handle
AdaptiveIntegralCodec adaptiveCodec =
new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats, false);
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return adaptiveCodec.createDecoder(meta);
} else {
throw new RuntimeException("unsupported data type: " + stats.getDataType());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 9b0b574..1826798 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -35,6 +37,9 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
@@ -125,6 +130,18 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
return LazyColumnPage.newPage(page, converter);
}
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ page.setNullBits(nullBits);
+ if (page instanceof DecimalColumnPage) {
+ vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+ }
+ converter.decodeAndFillVector(page, vectorInfo);
+ }
+
@Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
@@ -226,6 +243,71 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
}
@Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ DataType vectorDataType = vector.getType();
+ if (vectorDataType == DataTypes.FLOAT) {
+ float floatFactor = factor.floatValue();
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putFloat(i, (max - byteData[i]) / floatFactor);
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putFloat(i, (max - shortData[i]) / floatFactor);
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putFloat(i, (max - shortInt) / floatFactor);
+ }
+ } else {
+ throw new RuntimeException("internal error: " + this.toString());
+ }
+ } else {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - byteData[i]) / factor);
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - shortData[i]) / factor);
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, (max - shortInt) / factor);
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - intData[i]) / factor);
+ }
+ } else {
+ throw new RuntimeException("Unsupported datatype : " + pageDataType);
+ }
+ }
+
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
+
+ @Override
public double decodeDouble(float value) {
throw new RuntimeException("internal error: " + debugInfo());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 0e61b33..0d7ad8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -27,6 +28,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -35,6 +37,10 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
@@ -119,9 +125,11 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
};
}
- @Override public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+ @Override
+ public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
return new ColumnPageDecoder() {
- @Override public ColumnPage decode(byte[] input, int offset, int length)
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length)
throws MemoryException, IOException {
ColumnPage page = null;
if (DataTypes.isDecimal(meta.getSchemaDataType())) {
@@ -132,7 +140,23 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
return LazyColumnPage.newPage(page, converter);
}
- @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage page = null;
+ if (DataTypes.isDecimal(meta.getSchemaDataType())) {
+ page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+ vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+ } else {
+ page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ }
+ page.setNullBits(nullBits);
+ converter.decodeAndFillVector(page, vectorInfo);
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
}
@@ -272,5 +296,169 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
// this codec is for integer type only
throw new RuntimeException("internal error");
}
+
+ @Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType vectorDataType = vector.getType();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
+
+ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+ DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ if (vectorDataType == DataTypes.SHORT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putShort(i, (short) (max - byteData[i]));
+ }
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) (max - byteData[i]));
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - byteData[i]));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - byteData[i]) * 1000);
+ }
+ } else if (vectorDataType == DataTypes.BOOLEAN) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putByte(i, (byte) (max - byteData[i]));
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]);
+ vector.putDecimal(i, decimal, precision);
+ }
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - byteData[i]));
+ }
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ if (vectorDataType == DataTypes.SHORT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putShort(i, (short) (max - shortData[i]));
+ }
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) (max - shortData[i]));
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - shortData[i]));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - shortData[i]) * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]);
+ vector.putDecimal(i, decimal, precision);
+ }
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - shortData[i]));
+ }
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putInt(i, (int) (max - shortInt));
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, (max - shortInt));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, (max - shortInt) * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ BigDecimal decimal = decimalConverter.getDecimal(max - shortInt);
+ vector.putDecimal(i, decimal, precision);
+ }
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, (max - shortInt));
+ }
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) (max - intData[i]));
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - intData[i]));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - intData[i]) * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]);
+ vector.putDecimal(i, decimal, precision);
+ }
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - intData[i]));
+ }
+ }
+ } else if (pageDataType == DataTypes.LONG) {
+ long[] longData = columnPage.getLongPage();
+ if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - longData[i]));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - longData[i]) * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]);
+ vector.putDecimal(i, decimal, precision);
+ }
+ }
+ } else {
+ throw new RuntimeException("Unsupported datatype : " + pageDataType);
+ }
+ }
+
};
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 836af26..38bf9b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -34,6 +36,9 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
@@ -113,7 +118,20 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
return LazyColumnPage.newPage(page, converter);
}
- @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ page.setNullBits(nullBits);
+ if (page instanceof DecimalColumnPage) {
+ vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+ }
+ converter.decodeAndFillVector(page, vectorInfo);
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
}
@@ -226,5 +244,69 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
public double decodeDouble(double value) {
throw new RuntimeException("internal error: " + debugInfo());
}
+
+ @Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ DataType vectorDataType = vector.getType();
+ if (vectorDataType == DataTypes.FLOAT) {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putFloat(i, (byteData[i] / floatFactor));
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putFloat(i, (shortData[i] / floatFactor));
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putFloat(i, (shortInt / floatFactor));
+ }
+ } else {
+ throw new RuntimeException("internal error: " + this.toString());
+ }
+ } else {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (byteData[i] / factor));
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (shortData[i] / factor));
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, (shortInt / factor));
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (intData[i] / factor));
+ }
+ } else {
+ throw new RuntimeException("Unsupported datatype : " + pageDataType);
+ }
+ }
+
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
};
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index f1c0ea0..bdf5373 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
@@ -111,6 +117,21 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
return LazyColumnPage.newPage(page, converter);
}
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage page = null;
+ if (DataTypes.isDecimal(meta.getSchemaDataType())) {
+ page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+ vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+ } else {
+ page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ }
+ page.setNullBits(nullBits);
+ converter.decodeAndFillVector(page, vectorInfo);
+ }
+
@Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
@@ -248,6 +269,142 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
public double decodeDouble(double value) {
throw new RuntimeException("internal error: " + debugInfo());
}
+
+ @Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType vectorDataType = vector.getType();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
+
+ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+ DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ if (vectorDataType == DataTypes.SHORT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putShort(i, (short) byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, byteData[i] * 1000);
+ }
+ } else if (vectorDataType == DataTypes.BOOLEAN) {
+ vector.putBytes(0, pageSize, byteData, 0);
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, byteData[i]);
+ }
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ if (vectorDataType == DataTypes.SHORT) {
+ vector.putShorts(0, pageSize, shortData, 0);
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) shortData[i]);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, shortData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, shortData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, shortData[i]);
+ }
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putInt(i, shortInt);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, shortInt);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, shortInt * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+ decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, shortInt);
+ }
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ vector.putInts(0, pageSize, intData, 0);
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, intData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, intData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, intData[i]);
+ }
+ }
+ } else if (pageDataType == DataTypes.LONG) {
+ long[] longData = columnPage.getLongPage();
+ if (vectorDataType == DataTypes.LONG) {
+ vector.putLongs(0, pageSize, longData, 0);
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, longData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+ }
+ } else {
+ double[] doubleData = columnPage.getDoublePage();
+ vector.putDoubles(0, pageSize, doubleData, 0);
+ }
+ }
};
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index aa03ec1..4d1e6e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.compress;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.Encoding;
/**
@@ -95,10 +101,25 @@ public class DirectCompressCodec implements ColumnPageCodec {
return LazyColumnPage.newPage(decodedPage, converter);
}
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage decodedPage;
+ if (DataTypes.isDecimal(dataType)) {
+ decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+ vectorInfo.decimalConverter = ((DecimalColumnPage) decodedPage).getDecimalConverter();
+ } else {
+ decodedPage = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ }
+ decodedPage.setNullBits(nullBits);
+ converter.decodeAndFillVector(decodedPage, vectorInfo);
+ }
+
@Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
- throws MemoryException, IOException {
- return LazyColumnPage.newPage(
- ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
+ throws MemoryException, IOException {
+ return LazyColumnPage
+ .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
}
};
}
@@ -178,6 +199,149 @@ public class DirectCompressCodec implements ColumnPageCodec {
public double decodeDouble(double value) {
return value;
}
+
+ @Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType vectorDataType = vector.getType();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
+
+ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+ DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ if (vectorDataType == DataTypes.SHORT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putShort(i, (short) byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, byteData[i] * 1000);
+ }
+ } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
+ vector.putBytes(0, pageSize, byteData, 0);
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, byteData[i]);
+ }
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ if (vectorDataType == DataTypes.SHORT) {
+ vector.putShorts(0, pageSize, shortData, 0);
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) shortData[i]);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, shortData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, shortData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, shortData[i]);
+ }
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putInt(i, shortInt);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, shortInt);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, shortInt * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+ decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, shortInt);
+ }
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ vector.putInts(0, pageSize, intData, 0);
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, intData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, intData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, intData[i]);
+ }
+ }
+ } else if (pageDataType == DataTypes.LONG) {
+ long[] longData = columnPage.getLongPage();
+ if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, longData[i]);
+ }
+ vector.putLongs(0, pageSize, longData, 0);
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, longData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+ }
+ } else if (DataTypes.isDecimal(pageDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(columnPage.getByteArrayPage(), pageSize, vectorInfo,
+ columnPage.getNullBits());
+ } else {
+ double[] doubleData = columnPage.getDoublePage();
+ vector.putDoubles(0, pageSize, doubleData, 0);
+ }
+ }
};
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index e7d4118..c9b47db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.format.Encoding;
/**
@@ -314,6 +316,13 @@ public class RLECodec implements ColumnPageCodec {
return resultPage;
}
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ throw new UnsupportedOperationException("Not supposed to be called here");
+ }
+
@Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index a49eced..67d70e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
*/
public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator {
- private static final int cutOffDate = Integer.MAX_VALUE >> 1;
+ public static final int cutOffDate = Integer.MAX_VALUE >> 1;
private static final long SECONDS_PER_DAY = 60 * 60 * 24L;
public static final long MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index a8da6d4..89a3168 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.core.metadata.datatype;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
+import java.util.BitSet;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.DataTypeUtil;
/**
@@ -72,6 +75,8 @@ public final class DecimalConverterFactory {
BigDecimal getDecimal(Object valueToBeConverted);
+ void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info, BitSet nullBitset);
+
int getSize();
DecimalConverterType getDecimalConverterType();
@@ -80,7 +85,7 @@ public final class DecimalConverterFactory {
public static class DecimalIntConverter implements DecimalConverter {
- private int scale;
+ protected int scale;
DecimalIntConverter(int scale) {
this.scale = scale;
@@ -95,6 +100,51 @@ public final class DecimalConverterFactory {
return BigDecimal.valueOf((Long) valueToBeConverted, scale);
}
+ @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+ BitSet nullBitset) {
+ // TODO we need to find way to directly set to vector with out conversion. This way is very
+ // inefficient.
+ CarbonColumnVector vector = info.vector;
+ int precision = info.measure.getMeasure().getPrecision();
+ if (valuesToBeConverted instanceof byte[]) {
+ byte[] data = (byte[]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+ }
+ }
+ } else if (valuesToBeConverted instanceof short[]) {
+ short[] data = (short[]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+ }
+ }
+ } else if (valuesToBeConverted instanceof int[]) {
+ int[] data = (int[]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+ }
+ }
+ } else if (valuesToBeConverted instanceof long[]) {
+ long[] data = (long[]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+ }
+ }
+ }
+ }
+
@Override public int getSize() {
return 4;
}
@@ -104,12 +154,10 @@ public final class DecimalConverterFactory {
}
}
- public static class DecimalLongConverter implements DecimalConverter {
-
- private int scale;
+ public static class DecimalLongConverter extends DecimalIntConverter {
DecimalLongConverter(int scale) {
- this.scale = scale;
+ super(scale);
}
@Override public Object convert(BigDecimal decimal) {
@@ -173,6 +221,23 @@ public final class DecimalConverterFactory {
return new BigDecimal(bigInteger, scale);
}
+ @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+ BitSet nullBitset) {
+ CarbonColumnVector vector = info.vector;
+ int precision = info.measure.getMeasure().getPrecision();
+ if (valuesToBeConverted instanceof byte[][]) {
+ byte[][] data = (byte[][]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ BigInteger bigInteger = new BigInteger(data[i]);
+ vector.putDecimal(i, new BigDecimal(bigInteger, scale), precision);
+ }
+ }
+ }
+ }
+
@Override public int getSize() {
return numBytes;
}
@@ -194,6 +259,22 @@ public final class DecimalConverterFactory {
return DataTypeUtil.byteToBigDecimal((byte[]) valueToBeConverted);
}
+ @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+ BitSet nullBitset) {
+ CarbonColumnVector vector = info.vector;
+ int precision = info.measure.getMeasure().getPrecision();
+ if (valuesToBeConverted instanceof byte[][]) {
+ byte[][] data = (byte[][]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, DataTypeUtil.byteToBigDecimal(data[i]), precision);
+ }
+ }
+ }
+ }
+
@Override public int getSize() {
return -1;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
index d68e4e9..ac50d7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
@@ -57,4 +57,8 @@ public class DeleteDeltaVo {
public boolean containsRow(int counter) {
return bitSet.get(counter);
}
+
+ public BitSet getBitSet() {
+ return bitSet;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 8695d90..430a555 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -18,10 +18,13 @@ package org.apache.carbondata.core.scan.collector.impl;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.List;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
@@ -30,11 +33,19 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
+import org.apache.log4j.Logger;
+
/**
* It is not a collector it is just a scanned result holder.
*/
public class DictionaryBasedVectorResultCollector extends AbstractScannedResultCollector {
+ /**
+ * logger of result collector factory
+ */
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(DictionaryBasedVectorResultCollector.class.getName());
+
protected ProjectionDimension[] queryDimensions;
protected ProjectionMeasure[] queryMeasures;
@@ -51,8 +62,14 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
private ColumnVectorInfo[] implictColumnInfo;
+ private boolean isDirectVectorFill;
+
public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
+ this.isDirectVectorFill = blockExecutionInfos.isDirectVectorFill();
+ if (this.isDirectVectorFill) {
+ LOGGER.info("Direct pagewise vector fill collector is used to scan and collect the data");
+ }
// initialize only if the current block is not a restructured block else the initialization
// will be taken care by RestructureBasedVectorResultCollector
if (!blockExecutionInfos.isRestructuredBlock()) {
@@ -141,29 +158,33 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
@Override
public void collectResultInColumnarBatch(BlockletScannedResult scannedResult,
CarbonColumnarBatch columnarBatch) {
- int numberOfPages = scannedResult.numberOfpages();
- int filteredRows = 0;
- while (scannedResult.getCurrentPageCounter() < numberOfPages) {
- int currentPageRowCount = scannedResult.getCurrentPageRowCount();
- if (currentPageRowCount == 0) {
- scannedResult.incrementPageCounter();
- continue;
- }
- int rowCounter = scannedResult.getRowCounter();
- int availableRows = currentPageRowCount - rowCounter;
- // getRowCounter holds total number or rows being placed in Vector. Calculate the
- // Left over space through getRowCounter only.
- int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getRowCounter();
- requiredRows = Math.min(requiredRows, availableRows);
- if (requiredRows < 1) {
- return;
+ if (isDirectVectorFill) {
+ collectResultInColumnarBatchDirect(scannedResult, columnarBatch);
+ } else {
+ int numberOfPages = scannedResult.numberOfpages();
+ int filteredRows = 0;
+ while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+ int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+ if (currentPageRowCount == 0) {
+ scannedResult.incrementPageCounter();
+ continue;
+ }
+ int rowCounter = scannedResult.getRowCounter();
+ int availableRows = currentPageRowCount - rowCounter;
+ // getRowCounter holds total number or rows being placed in Vector. Calculate the
+ // Left over space through getRowCounter only.
+ int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getRowCounter();
+ requiredRows = Math.min(requiredRows, availableRows);
+ if (requiredRows < 1) {
+ return;
+ }
+ fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+ filteredRows = scannedResult.markFilteredRows(columnarBatch, rowCounter, requiredRows,
+ columnarBatch.getRowCounter());
+ fillResultToColumnarBatch(scannedResult, columnarBatch, rowCounter, availableRows,
+ requiredRows);
+ columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
}
- fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
- filteredRows = scannedResult.markFilteredRows(
- columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
- fillResultToColumnarBatch(
- scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
- columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
}
}
@@ -198,4 +219,51 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
}
}
+ /**
+ * Fill the vector during the page decoding.
+ */
+ private void collectResultInColumnarBatchDirect(BlockletScannedResult scannedResult,
+ CarbonColumnarBatch columnarBatch) {
+ int numberOfPages = scannedResult.numberOfpages();
+ while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+ int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+ if (currentPageRowCount == 0) {
+ scannedResult.incrementPageCounter(null);
+ continue;
+ }
+ DeleteDeltaVo deltaVo = scannedResult.getCurrentDeleteDeltaVo();
+ BitSet bitSet = null;
+ int deletedRows = 0;
+ if (deltaVo != null) {
+ bitSet = deltaVo.getBitSet();
+ deletedRows = bitSet.cardinality();
+ }
+ fillColumnVectorDetails(columnarBatch, bitSet);
+ fillResultToColumnarBatch(scannedResult);
+ columnarBatch.setActualSize(currentPageRowCount - deletedRows);
+ scannedResult.setRowCounter(currentPageRowCount - deletedRows);
+ scannedResult.incrementPageCounter(null);
+ return;
+ }
+ }
+
+ private void fillResultToColumnarBatch(BlockletScannedResult scannedResult) {
+ scannedResult.fillDataChunks(dictionaryInfo, noDictionaryInfo, measureColumnInfo,
+ measureInfo.getMeasureOrdinals());
+
+ }
+
+ private void fillColumnVectorDetails(CarbonColumnarBatch columnarBatch,
+ BitSet deltaBitSet) {
+ for (int i = 0; i < allColumnInfo.length; i++) {
+ allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+ allColumnInfo[i].vector = columnarBatch.columnVectors[i];
+ allColumnInfo[i].deletedRows = deltaBitSet;
+ if (null != allColumnInfo[i].dimension) {
+ allColumnInfo[i].vector.setBlockDataType(dimensionInfo.dataType[i]);
+ }
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6a6a929..fed0faf 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -478,6 +478,19 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
} else {
blockExecutionInfo.setPrefetchBlocklet(queryModel.isPreFetchData());
}
+ // In case of fg datamap it should not go to direct fill.
+ boolean fgDataMapPathPresent = false;
+ for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
+ fgDataMapPathPresent = blockInfo.getDataMapWriterPath() != null;
+ if (fgDataMapPathPresent) {
+ queryModel.setDirectVectorFill(false);
+ break;
+ }
+ }
+
+ blockExecutionInfo
+ .setDirectVectorFill(queryModel.isDirectVectorFill());
+
blockExecutionInfo
.setTotalNumberOfMeasureToRead(segmentProperties.getMeasuresOrdinalToChunkMapping().size());
blockExecutionInfo.setComplexDimensionInfoMap(QueryUtil
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index e737b0e..f0ef23b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -217,6 +217,11 @@ public class BlockExecutionInfo {
private QueryStatisticsModel queryStatisticsModel;
/**
+ * It fills the vector directly from decoded column page with out any staging and conversions
+ */
+ private boolean isDirectVectorFill;
+
+ /**
* @param blockIndex the tableBlock to set
*/
public void setDataBlock(AbstractIndex blockIndex) {
@@ -625,4 +630,12 @@ public class BlockExecutionInfo {
public void setQueryStatisticsModel(QueryStatisticsModel queryStatisticsModel) {
this.queryStatisticsModel = queryStatisticsModel;
}
+
+ public boolean isDirectVectorFill() {
+ return isDirectVectorFill && !isRestructuredBlock;
+ }
+
+ public void setDirectVectorFill(boolean directVectorFill) {
+ isDirectVectorFill = directVectorFill;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 22e1e72..49157f9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -760,7 +760,7 @@ public class QueryUtil {
vector.putNull(vectorRow);
} else {
if (dt == DataTypes.STRING) {
- vector.putBytes(vectorRow, 0, length, value);
+ vector.putByteArray(vectorRow, 0, length, value);
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
} else if (dt == DataTypes.BYTE) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 0951da0..d7dcee0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -125,7 +125,11 @@ public class QueryModel {
private boolean preFetchData = true;
/**
- * It fills the vector directly from decoded column page with out any staging and conversions
+ * It fills the vector directly from decoded column page with out any staging and conversions.
+ * Execution engine can set this filed to true in case of vector flow. Note that execution engine
+ * should make sure that batch size vector should be greater than or equal to column page size.
+ * In this flow only pages will be pruned and decode the page and fill the complete page data to
+ * vector, so it is execution engine responsibility to filter the rows at row level.
*/
private boolean isDirectVectorFill;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 9191d08..4963441 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -73,6 +73,11 @@ public abstract class BlockletScannedResult {
private int[] pageFilteredRowCount;
/**
+ * Filtered pages to be decoded and loaded to vector.
+ */
+ private int[] pageIdFiltered;
+
+ /**
* to keep track of number of rows process
*/
protected int rowCounter;
@@ -304,7 +309,7 @@ public abstract class BlockletScannedResult {
j :
pageFilteredRowId[pageCounter][j]);
}
- vector.putBytes(vectorOffset++,
+ vector.putByteArray(vectorOffset++,
data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
}
}
@@ -342,6 +347,19 @@ public abstract class BlockletScannedResult {
}
/**
+ * Just increment the page counter and reset the remaining counters.
+ */
+ public void incrementPageCounter(ColumnVectorInfo[] vectorInfos) {
+ rowCounter = 0;
+ currentRow = -1;
+ pageCounter++;
+ if (null != deletedRecordMap && pageCounter < pageIdFiltered.length) {
+ currentDeleteDeltaVo =
+ deletedRecordMap.get(blockletNumber + "_" + pageIdFiltered[pageCounter]);
+ }
+ }
+
+ /**
* This case is used only in case of compaction, since it does not use filter flow.
*/
public void fillDataChunks() {
@@ -369,6 +387,36 @@ public abstract class BlockletScannedResult {
pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
}
+ /**
+ * Fill all the vectors with data by decompressing/decoding the column page
+ */
+ public void fillDataChunks(ColumnVectorInfo[] dictionaryInfo, ColumnVectorInfo[] noDictionaryInfo,
+ ColumnVectorInfo[] msrVectorInfo, int[] measuresOrdinal) {
+ freeDataChunkMemory();
+ if (pageCounter >= pageFilteredRowCount.length) {
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+ dimRawColumnChunks[dictionaryColumnChunkIndexes[i]]
+ .convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], dictionaryInfo[i]);
+ }
+ for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+ dimRawColumnChunks[noDictionaryColumnChunkIndexes[i]]
+ .convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], noDictionaryInfo[i]);
+ }
+
+ for (int i = 0; i < measuresOrdinal.length; i++) {
+ msrRawColumnChunks[measuresOrdinal[i]]
+ .convertToColumnPageAndFillVector(pageIdFiltered[pageCounter], msrVectorInfo[i]);
+ }
+ QueryStatistic pageUncompressTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME);
+ pageUncompressTime.addCountStatistic(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME,
+ pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
+ }
+
// free the memory for the last page chunk
private void freeDataChunkMemory() {
for (int i = 0; i < dimensionColumnPages.length; i++) {
@@ -390,6 +438,14 @@ public abstract class BlockletScannedResult {
return pageFilteredRowCount.length;
}
+ public int[] getPageIdFiltered() {
+ return pageIdFiltered;
+ }
+
+ public void setPageIdFiltered(int[] pageIdFiltered) {
+ this.pageIdFiltered = pageIdFiltered;
+ }
+
/**
* Get total rows in the current page
*
@@ -513,7 +569,13 @@ public abstract class BlockletScannedResult {
// if deleted recors map is present for this block
// then get the first page deleted vo
if (null != deletedRecordMap) {
- currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter);
+ String key;
+ if (pageIdFiltered != null) {
+ key = blockletNumber + '_' + pageIdFiltered[pageCounter];
+ } else {
+ key = blockletNumber + '_' + pageCounter;
+ }
+ currentDeleteDeltaVo = deletedRecordMap.get(key);
}
}
@@ -616,6 +678,12 @@ public abstract class BlockletScannedResult {
*/
public void setPageFilteredRowCount(int[] pageFilteredRowCount) {
this.pageFilteredRowCount = pageFilteredRowCount;
+ if (pageIdFiltered == null) {
+ pageIdFiltered = new int[pageFilteredRowCount.length];
+ for (int i = 0; i < pageIdFiltered.length; i++) {
+ pageIdFiltered[i] = i;
+ }
+ }
}
/**
@@ -714,6 +782,10 @@ public abstract class BlockletScannedResult {
return rowsFiltered;
}
+ public DeleteDeltaVo getCurrentDeleteDeltaVo() {
+ return currentDeleteDeltaVo;
+ }
+
/**
* Below method will be used to check row got deleted
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index dd0e8b9..f670884 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -27,18 +27,26 @@ public interface CarbonColumnVector {
void putFloat(int rowId, float value);
+ void putFloats(int rowId, int count, float[] src, int srcIndex);
+
void putShort(int rowId, short value);
void putShorts(int rowId, int count, short value);
+ void putShorts(int rowId, int count, short[] src, int srcIndex);
+
void putInt(int rowId, int value);
void putInts(int rowId, int count, int value);
+ void putInts(int rowId, int count, int[] src, int srcIndex);
+
void putLong(int rowId, long value);
void putLongs(int rowId, int count, long value);
+ void putLongs(int rowId, int count, long[] src, int srcIndex);
+
void putDecimal(int rowId, BigDecimal value, int precision);
void putDecimals(int rowId, int count, BigDecimal value, int precision);
@@ -47,14 +55,18 @@ public interface CarbonColumnVector {
void putDoubles(int rowId, int count, double value);
- void putBytes(int rowId, byte[] value);
+ void putDoubles(int rowId, int count, double[] src, int srcIndex);
- void putBytes(int rowId, int count, byte[] value);
+ void putByteArray(int rowId, byte[] value);
- void putBytes(int rowId, int offset, int length, byte[] value);
+ void putByteArray(int rowId, int offset, int length, byte[] value);
void putByte(int rowId, byte value);
+ void putBytes(int rowId, int count, byte[] value);
+
+ void putBytes(int rowId, int count, byte[] src, int srcIndex);
+
void putNull(int rowId);
void putNulls(int rowId, int count);