You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 16:55:37 UTC
[1/3] carbondata git commit: [CARBONDATA-3012] Added support for full
scan queries for vector direct fill.
Repository: carbondata
Updated Branches:
refs/heads/master e0baa9b9f -> 3d3b6ff16
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index 803715c..471f9b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -56,7 +56,9 @@ public class CarbonColumnarBatch {
actualSize = 0;
rowCounter = 0;
rowsFiltered = 0;
- Arrays.fill(filteredRows, false);
+ if (filteredRows != null) {
+ Arrays.fill(filteredRows, false);
+ }
for (int i = 0; i < columnVectors.length; i++) {
columnVectors[i].reset();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
index 50d2ac5..2147c43 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
@@ -27,4 +27,6 @@ public interface CarbonDictionary {
void setDictionaryUsed();
byte[] getDictionaryValue(int index);
+
+ byte[][] getAllDictionaryValues();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
index 59117dd..d127728 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -16,7 +16,10 @@
*/
package org.apache.carbondata.core.scan.result.vector;
+import java.util.BitSet;
+
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
@@ -32,6 +35,8 @@ public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
public DirectDictionaryGenerator directDictionaryGenerator;
public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;
public GenericQueryType genericQueryType;
+ public BitSet deletedRows;
+ public DecimalConverterFactory.DecimalConverter decimalConverter;
@Override public int compareTo(ColumnVectorInfo o) {
return ordinal - o.ordinal;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index f8f663f..5dfd6ca 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -146,7 +146,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
}
}
- @Override public void putBytes(int rowId, byte[] value) {
+ @Override public void putByteArray(int rowId, byte[] value) {
bytes[rowId] = value;
}
@@ -160,7 +160,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
}
}
- @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+ @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
bytes[rowId] = new byte[length];
System.arraycopy(value, offset, bytes[rowId], 0, length);
}
@@ -227,6 +227,31 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
}
}
+ public Object getDataArray() {
+ if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+ return byteArr;
+ } else if (dataType == DataTypes.SHORT) {
+ return shorts;
+ } else if (dataType == DataTypes.INT) {
+ return ints;
+ } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
+ return longs;
+ } else if (dataType == DataTypes.FLOAT) {
+ return floats;
+ } else if (dataType == DataTypes.DOUBLE) {
+ return doubles;
+ } else if (dataType instanceof DecimalType) {
+ return decimals;
+ } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+ if (null != carbonDictionary) {
+ return ints;
+ }
+ return bytes;
+ } else {
+ return data;
+ }
+ }
+
@Override public void reset() {
nullBytes.clear();
if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
@@ -287,4 +312,42 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
* as an optimization to prevent setting nulls.
*/
public final boolean anyNullsSet() { return anyNullsSet; }
+
+ @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ floats[rowId ++] = src[i];
+ }
+ }
+
+ @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ shorts[rowId ++] = src[i];
+ }
+ }
+
+ @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ ints[rowId ++] = src[i];
+ }
+ }
+
+ @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ longs[rowId ++] = src[i];
+ }
+ }
+
+ @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ doubles[rowId ++] = src[i];
+ }
+ }
+
+ @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ byteArr[rowId ++] = src[i];
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
index cc3a03c..c8fd573 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
@@ -51,4 +51,7 @@ public class CarbonDictionaryImpl implements CarbonDictionary {
return dictionary[index];
}
+ @Override public byte[][] getAllDictionaryValues() {
+ return dictionary;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index 4ec8cb6..62674bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -123,7 +123,9 @@ public class BlockletFullScanner implements BlockletScanner {
}
}
scannedResult.setPageFilteredRowCount(numberOfRows);
- scannedResult.fillDataChunks();
+ if (!blockExecutionInfo.isDirectVectorFill()) {
+ scannedResult.fillDataChunks();
+ }
// adding statistics for carbon scan time
QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
index 9635896..2a294ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
@@ -20,11 +20,20 @@ package org.apache.carbondata.core.stats;
import java.util.HashMap;
import java.util.Map;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
public class QueryStatisticsModel {
+
private QueryStatisticsRecorder recorder;
+
private Map<String, QueryStatistic> statisticsTypeAndObjMap =
new HashMap<String, QueryStatistic>();
+ private boolean isEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+ CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT));
+
public QueryStatisticsRecorder getRecorder() {
return recorder;
}
@@ -36,4 +45,8 @@ public class QueryStatisticsModel {
public Map<String, QueryStatistic> getStatisticsTypeAndObjMap() {
return statisticsTypeAndObjMap;
}
+
+ public boolean isEnabled() {
+ return isEnabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 596d1dd..6188948 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -733,4 +733,12 @@ public final class ByteUtil {
public static float toXorFloat(byte[] value, int offset, int length) {
return Float.intBitsToFloat(toXorInt(value, offset, length));
}
+
+ public static int[] toIntArray(byte[] data, int size) {
+ int[] ints = new int[size];
+ for (int i = 0; i < ints.length; i++) {
+ ints[i] = ByteUtil.valueOf3Bytes(data, i * 3);
+ }
+ return ints;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java b/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java
index b9e90d6..2662cee 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java
@@ -270,11 +270,11 @@ public class IncludeFilterExecuterImplTest extends TestCase {
long newTime = 0;
long start;
long end;
-
+
// dimension's data number in a blocklet, usually default is 32000
- int dataChunkSize = 32000;
+ int dataChunkSize = 32000;
// repeat query times in the test
- int queryTimes = 10000;
+ int queryTimes = 10000;
// repeated times for a dictionary value
int repeatTimes = 200;
//filtered value count in a blocklet
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index a4abc61..ecd61bd 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -807,7 +807,7 @@ public class CarbonUtilTest {
.getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, true);
assertEquals(2, result);
}
-
+
@Test
public void testBinaryRangeSearch() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
index b843709..7d6eda0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
@@ -150,24 +150,24 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector {
}
}
- @Override public void putBytes(int rowId, byte[] value) {
+ @Override public void putByteArray(int rowId, byte[] value) {
if (!filteredRows[rowId]) {
- columnVector.putBytes(counter++, value);
+ columnVector.putByteArray(counter++, value);
}
}
@Override public void putBytes(int rowId, int count, byte[] value) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
- columnVector.putBytes(counter++, value);
+ columnVector.putByteArray(counter++, value);
}
rowId++;
}
}
- @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+ @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
if (!filteredRows[rowId]) {
- columnVector.putBytes(counter++, offset, length, value);
+ columnVector.putByteArray(counter++, offset, length, value);
}
}
@@ -246,4 +246,59 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector {
return this.columnVector;
}
+ @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putFloat(counter++, src[i]);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putShort(counter++, src[i]);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putInt(counter++, src[i]);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putLong(counter++, src[i]);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putDouble(counter++, src[i]);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putByte(counter++, src[i]);
+ }
+ rowId++;
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index 39fd19a..ab270fc 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -71,11 +71,11 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
values[rowId] = value;
}
- @Override public void putBytes(int rowId, byte[] value) {
+ @Override public void putByteArray(int rowId, byte[] value) {
type.writeSlice(builder, wrappedBuffer(value));
}
- @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+ @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
byte[] byteArr = new byte[length];
System.arraycopy(value, offset, byteArr, 0, length);
type.writeSlice(builder, wrappedBuffer(byteArr));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index 73786c8..c96e643 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@ -57,15 +57,25 @@ class AllDataTypesTestCaseFilter extends QueryTest with BeforeAndAfterAll {
test("verify like query ends with filter push down") {
val df = sql("select * from alldatatypestableFilter where empname like '%nandh'").queryExecution
.sparkPlan
- assert(df.asInstanceOf[CarbonDataSourceScan].metadata
- .get("PushedFilters").get.contains("CarbonEndsWith"))
+ if (df.isInstanceOf[CarbonDataSourceScan]) {
+ assert(df.asInstanceOf[CarbonDataSourceScan].metadata
+ .get("PushedFilters").get.contains("CarbonEndsWith"))
+ } else {
+ assert(df.children.head.asInstanceOf[CarbonDataSourceScan].metadata
+ .get("PushedFilters").get.contains("CarbonEndsWith"))
+ }
}
test("verify like query contains with filter push down") {
val df = sql("select * from alldatatypestableFilter where empname like '%nand%'").queryExecution
.sparkPlan
- assert(df.asInstanceOf[CarbonDataSourceScan].metadata
- .get("PushedFilters").get.contains("CarbonContainsWith"))
+ if (df.isInstanceOf[CarbonDataSourceScan]) {
+ assert(df.asInstanceOf[CarbonDataSourceScan].metadata
+ .get("PushedFilters").get.contains("CarbonContainsWith"))
+ } else {
+ assert(df.children.head.asInstanceOf[CarbonDataSourceScan].metadata
+ .get("PushedFilters").get.contains("CarbonContainsWith"))
+ }
}
override def afterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 5121027..a605134 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -29,7 +29,9 @@ import org.apache.spark.sql.types.Decimal;
class ColumnarVectorWrapper implements CarbonColumnVector {
- private CarbonVectorProxy sparkColumnVectorProxy;
+ private CarbonVectorProxy.ColumnVectorProxy sparkColumnVectorProxy;
+
+ private CarbonVectorProxy carbonVectorProxy;
private boolean[] filteredRows;
@@ -47,8 +49,9 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector,
boolean[] filteredRows, int ordinal) {
- this.sparkColumnVectorProxy = writableColumnVector;
+ this.sparkColumnVectorProxy = writableColumnVector.getColumnVector(ordinal);
this.filteredRows = filteredRows;
+ this.carbonVectorProxy = writableColumnVector;
this.ordinal = ordinal;
}
@@ -167,7 +170,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
}
}
- @Override public void putBytes(int rowId, byte[] value) {
+ @Override public void putByteArray(int rowId, byte[] value) {
if (!filteredRows[rowId]) {
sparkColumnVectorProxy.putByteArray(counter++, value, ordinal);
}
@@ -182,7 +185,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
}
}
- @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+ @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
if (!filteredRows[rowId]) {
sparkColumnVectorProxy.putByteArray(counter++, value, offset, length, ordinal);
}
@@ -276,12 +279,67 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
}
public void reserveDictionaryIds() {
- sparkColumnVectorProxy.reserveDictionaryIds(sparkColumnVectorProxy.numRows(), ordinal);
- dictionaryVector = new ColumnarVectorWrapper(sparkColumnVectorProxy, filteredRows, ordinal);
+ sparkColumnVectorProxy.reserveDictionaryIds(carbonVectorProxy.numRows(), ordinal);
+ dictionaryVector = new ColumnarVectorWrapper(carbonVectorProxy, filteredRows, ordinal);
((ColumnarVectorWrapper) dictionaryVector).isDictionary = true;
}
@Override public CarbonColumnVector getDictionaryVector() {
return dictionaryVector;
}
+
+ @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ sparkColumnVectorProxy.putFloat(counter++, src[i], ordinal);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ sparkColumnVectorProxy.putShort(counter++, src[i], ordinal);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ sparkColumnVectorProxy.putInt(counter++, src[i], ordinal);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ sparkColumnVectorProxy.putLong(counter++, src[i], ordinal);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ sparkColumnVectorProxy.putDouble(counter++, src[i], ordinal);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ for (int i = srcIndex; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ sparkColumnVectorProxy.putByte(counter++, src[i], ordinal);
+ }
+ rowId++;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
new file mode 100644
index 0000000..b55749e
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.vectorreader;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.spark.sql.CarbonVectorProxy;
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
+import org.apache.spark.sql.types.Decimal;
+
+/**
+ * Fills the vector directly with out considering any deleted rows.
+ */
+class ColumnarVectorWrapperDirect implements CarbonColumnVector {
+
+ /**
+ * It is one column vector adapter class.
+ */
+ protected CarbonVectorProxy.ColumnVectorProxy sparkColumnVectorProxy;
+
+ /**
+ * It is adapter class of complete ColumnarBatch.
+ */
+ protected CarbonVectorProxy carbonVectorProxy;
+
+ protected int ordinal;
+
+ protected boolean isDictionary;
+
+ private DataType blockDataType;
+
+ private CarbonColumnVector dictionaryVector;
+
+ ColumnarVectorWrapperDirect(CarbonVectorProxy writableColumnVector, int ordinal) {
+ this.sparkColumnVectorProxy = writableColumnVector.getColumnVector(ordinal);
+ this.carbonVectorProxy = writableColumnVector;
+ this.ordinal = ordinal;
+ }
+
+ @Override public void putBoolean(int rowId, boolean value) {
+ sparkColumnVectorProxy.putBoolean(rowId, value, ordinal);
+ }
+
+ @Override public void putFloat(int rowId, float value) {
+ sparkColumnVectorProxy.putFloat(rowId, value, ordinal);
+ }
+
+ @Override public void putShort(int rowId, short value) {
+ sparkColumnVectorProxy.putShort(rowId, value, ordinal);
+ }
+
+ @Override public void putShorts(int rowId, int count, short value) {
+ sparkColumnVectorProxy.putShorts(rowId, count, value, ordinal);
+ }
+
+ @Override public void putInt(int rowId, int value) {
+ if (isDictionary) {
+ sparkColumnVectorProxy.putDictionaryInt(rowId, value, ordinal);
+ } else {
+ sparkColumnVectorProxy.putInt(rowId, value, ordinal);
+ }
+ }
+
+ @Override public void putInts(int rowId, int count, int value) {
+ sparkColumnVectorProxy.putInts(rowId, count, value, ordinal);
+ }
+
+ @Override public void putLong(int rowId, long value) {
+ sparkColumnVectorProxy.putLong(rowId, value, ordinal);
+ }
+
+ @Override public void putLongs(int rowId, int count, long value) {
+ sparkColumnVectorProxy.putLongs(rowId, count, value, ordinal);
+ }
+
+ @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
+ Decimal toDecimal = Decimal.apply(value);
+ sparkColumnVectorProxy.putDecimal(rowId, toDecimal, precision, ordinal);
+ }
+
+ @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+ Decimal decimal = Decimal.apply(value);
+ for (int i = 0; i < count; i++) {
+ sparkColumnVectorProxy.putDecimal(rowId, decimal, precision, ordinal);
+ rowId++;
+ }
+ }
+
+ @Override public void putDouble(int rowId, double value) {
+ sparkColumnVectorProxy.putDouble(rowId, value, ordinal);
+ }
+
+ @Override public void putDoubles(int rowId, int count, double value) {
+ sparkColumnVectorProxy.putDoubles(rowId, count, value, ordinal);
+ }
+
+ @Override public void putByteArray(int rowId, byte[] value) {
+ sparkColumnVectorProxy.putByteArray(rowId, value, ordinal);
+ }
+
+ @Override
+ public void putBytes(int rowId, int count, byte[] value) {
+ for (int i = 0; i < count; i++) {
+ sparkColumnVectorProxy.putByteArray(rowId, value, ordinal);
+ rowId++;
+ }
+ }
+
+ @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
+ sparkColumnVectorProxy.putByteArray(rowId, value, offset, length, ordinal);
+ }
+
+ @Override public void putNull(int rowId) {
+ sparkColumnVectorProxy.putNull(rowId, ordinal);
+ }
+
+ @Override public void putNulls(int rowId, int count) {
+ sparkColumnVectorProxy.putNulls(rowId, count, ordinal);
+ }
+
+ @Override public void putNotNull(int rowId) {
+ sparkColumnVectorProxy.putNotNull(rowId, ordinal);
+ }
+
+ @Override public void putNotNull(int rowId, int count) {
+ sparkColumnVectorProxy.putNotNulls(rowId, count, ordinal);
+ }
+
+ @Override public boolean isNull(int rowId) {
+ return sparkColumnVectorProxy.isNullAt(rowId, ordinal);
+ }
+
+ @Override public void putObject(int rowId, Object obj) {
+ //TODO handle complex types
+ }
+
+ @Override public Object getData(int rowId) {
+ //TODO handle complex types
+ return null;
+ }
+
+ @Override public void reset() {
+ if (null != dictionaryVector) {
+ dictionaryVector.reset();
+ }
+ }
+
+ @Override public DataType getType() {
+ return CarbonSparkDataSourceUtil
+ .convertSparkToCarbonDataType(sparkColumnVectorProxy.dataType(ordinal));
+ }
+
+ @Override public DataType getBlockDataType() {
+ return blockDataType;
+ }
+
+ @Override public void setBlockDataType(DataType blockDataType) {
+ this.blockDataType = blockDataType;
+ }
+
+ @Override public void setDictionary(CarbonDictionary dictionary) {
+ sparkColumnVectorProxy.setDictionary(dictionary, ordinal);
+ }
+
+ @Override public boolean hasDictionary() {
+ return sparkColumnVectorProxy.hasDictionary(ordinal);
+ }
+
+ public void reserveDictionaryIds() {
+ sparkColumnVectorProxy.reserveDictionaryIds(carbonVectorProxy.numRows(), ordinal);
+ dictionaryVector = new ColumnarVectorWrapperDirect(carbonVectorProxy, ordinal);
+ ((ColumnarVectorWrapperDirect) dictionaryVector).isDictionary = true;
+ }
+
+ @Override public CarbonColumnVector getDictionaryVector() {
+ return dictionaryVector;
+ }
+
+ @Override public void putByte(int rowId, byte value) {
+ sparkColumnVectorProxy.putByte(rowId, value, ordinal);
+ }
+
+ @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+
+ }
+
+ @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+ sparkColumnVectorProxy.putFloats(rowId, count, src, srcIndex);
+ }
+
+ @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+ sparkColumnVectorProxy.putShorts(rowId, count, src, srcIndex);
+ }
+
+ @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+ sparkColumnVectorProxy.putInts(rowId, count, src, srcIndex);
+ }
+
+ @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ sparkColumnVectorProxy.putLongs(rowId, count, src, srcIndex);
+ }
+
+ @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ sparkColumnVectorProxy.putDoubles(rowId, count, src, srcIndex);
+ }
+
+ @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ sparkColumnVectorProxy.putBytes(rowId, count, src, srcIndex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 839a8a0..45686ea 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -281,7 +282,12 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
schema = schema.add(field);
}
}
- vectorProxy = new CarbonVectorProxy(DEFAULT_MEMORY_MODE,schema,DEFAULT_BATCH_SIZE);
+ short batchSize = DEFAULT_BATCH_SIZE;
+ if (queryModel.isDirectVectorFill()) {
+ batchSize = CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+ }
+ vectorProxy = new CarbonVectorProxy(DEFAULT_MEMORY_MODE, schema, batchSize);
+
if (partitionColumns != null) {
int partitionIdx = fields.length;
for (int i = 0; i < partitionColumns.fields().length; i++) {
@@ -290,12 +296,22 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
}
}
CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
- boolean[] filteredRows = new boolean[vectorProxy.numRows()];
- for (int i = 0; i < fields.length; i++) {
- vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
- if (isNoDictStringField[i]) {
- if (vectors[i] instanceof ColumnarVectorWrapper) {
- ((ColumnarVectorWrapper) vectors[i]).reserveDictionaryIds();
+ boolean[] filteredRows = null;
+ if (queryModel.isDirectVectorFill()) {
+ for (int i = 0; i < fields.length; i++) {
+ vectors[i] = new ColumnarVectorWrapperDirect(vectorProxy, i);
+ if (isNoDictStringField[i]) {
+ ((ColumnarVectorWrapperDirect) vectors[i]).reserveDictionaryIds();
+ }
+ }
+ } else {
+ filteredRows = new boolean[vectorProxy.numRows()];
+ for (int i = 0; i < fields.length; i++) {
+ vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
+ if (isNoDictStringField[i]) {
+ if (vectors[i] instanceof ColumnarVectorWrapper) {
+ ((ColumnarVectorWrapper) vectors[i]).reserveDictionaryIds();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
index 80e6dbd..03466cc 100644
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -45,6 +45,8 @@ public class CarbonVectorProxy {
private ColumnarBatch columnarBatch;
+ private ColumnVectorProxy[] columnVectorProxies;
+
/**
* Adapter class which handles the columnar vector reading of the carbondata
* based on the spark ColumnVector and ColumnarBatch API. This proxy class
@@ -57,14 +59,22 @@ public class CarbonVectorProxy {
*/
public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
+ columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+ for (int i = 0; i < columnVectorProxies.length; i++) {
+ columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+ }
}
public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
+ columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+ for (int i = 0; i < columnVectorProxies.length; i++) {
+ columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+ }
}
- public ColumnVector getColumnVector(int ordinal) {
- return columnarBatch.column(ordinal);
+ public ColumnVectorProxy getColumnVector(int ordinal) {
+ return columnVectorProxies[ordinal];
}
/**
@@ -74,9 +84,6 @@ public class CarbonVectorProxy {
columnarBatch.setNumRows(numRows);
}
- public Object reserveDictionaryIds(int capacity , int ordinal) {
- return columnarBatch.column(ordinal).reserveDictionaryIds(capacity);
- }
/**
* Returns the number of rows for read, including filtered rows.
@@ -85,22 +92,6 @@ public class CarbonVectorProxy {
return columnarBatch.capacity();
}
- public void setDictionary(CarbonDictionary dictionary, int ordinal) {
- if (null != dictionary) {
- columnarBatch.column(ordinal)
- .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
- } else {
- columnarBatch.column(ordinal).setDictionary(null);
- }
- }
-
- public void putNull(int rowId, int ordinal) {
- columnarBatch.column(ordinal).putNull(rowId);
- }
-
- public void putNulls(int rowId, int count, int ordinal) {
- columnarBatch.column(ordinal).putNulls(rowId, count);
- }
/**
* Called to close all the columns in this batch. It is not valid to access the data after
@@ -139,9 +130,7 @@ public class CarbonVectorProxy {
return columnarBatch.column(ordinal);
}
- public boolean hasDictionary(int ordinal) {
- return columnarBatch.column(ordinal).hasDictionary();
- }
+
/**
* Resets this column for writing. The currently stored values are no longer accessible.
@@ -150,127 +139,187 @@ public class CarbonVectorProxy {
columnarBatch.reset();
}
- public void putRowToColumnBatch(int rowId, Object value, int offset) {
- org.apache.spark.sql.types.DataType t = dataType(offset);
- if (null == value) {
- putNull(rowId, offset);
- } else {
- if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
- putBoolean(rowId, (boolean) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
- putByte(rowId, (byte) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
- putShort(rowId, (short) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
- putInt(rowId, (int) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
- putLong(rowId, (long) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
- putFloat(rowId, (float) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
- putDouble(rowId, (double) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
- UTF8String v = (UTF8String) value;
- putByteArray(rowId, v.getBytes(), offset);
- } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
- DecimalType dt = (DecimalType) t;
- Decimal d = Decimal.fromDecimal(value);
- if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- putInt(rowId, (int) d.toUnscaledLong(), offset);
- } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- putLong(rowId, d.toUnscaledLong(), offset);
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- putByteArray(rowId, bytes, 0, bytes.length, offset);
+
+ public static class ColumnVectorProxy {
+
+ private ColumnVector vector;
+
+ public ColumnVectorProxy(ColumnarBatch columnarBatch, int ordinal) {
+ this.vector = columnarBatch.column(ordinal);
+ }
+
+ public void putRowToColumnBatch(int rowId, Object value, int offset) {
+ org.apache.spark.sql.types.DataType t = dataType(offset);
+ if (null == value) {
+ putNull(rowId, offset);
+ } else {
+ if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+ putBoolean(rowId, (boolean) value, offset);
+ } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+ putByte(rowId, (byte) value, offset);
+ } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+ putShort(rowId, (short) value, offset);
+ } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+ putInt(rowId, (int) value, offset);
+ } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+ putLong(rowId, (long) value, offset);
+ } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+ putFloat(rowId, (float) value, offset);
+ } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+ putDouble(rowId, (double) value, offset);
+ } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+ UTF8String v = (UTF8String) value;
+ putByteArray(rowId, v.getBytes(), offset);
+ } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+ DecimalType dt = (DecimalType) t;
+ Decimal d = Decimal.fromDecimal(value);
+ if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+ putInt(rowId, (int) d.toUnscaledLong(), offset);
+ } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+ putLong(rowId, d.toUnscaledLong(), offset);
+ } else {
+ final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+ byte[] bytes = integer.toByteArray();
+ putByteArray(rowId, bytes, 0, bytes.length, offset);
+ }
+ } else if (t instanceof CalendarIntervalType) {
+ CalendarInterval c = (CalendarInterval) value;
+ vector.getChildColumn(0).putInt(rowId, c.months);
+ vector.getChildColumn(1).putLong(rowId, c.microseconds);
+ } else if (t instanceof org.apache.spark.sql.types.DateType) {
+ putInt(rowId, (int) value, offset);
+ } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+ putLong(rowId, (long) value, offset);
}
- } else if (t instanceof CalendarIntervalType) {
- CalendarInterval c = (CalendarInterval) value;
- columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
- columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
- } else if (t instanceof org.apache.spark.sql.types.DateType) {
- putInt(rowId, (int) value, offset);
- } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
- putLong(rowId, (long) value, offset);
}
}
- }
- public void putBoolean(int rowId, boolean value, int ordinal) {
- columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
- }
+ public void putBoolean(int rowId, boolean value, int ordinal) {
+ vector.putBoolean(rowId, value);
+ }
- public void putByte(int rowId, byte value, int ordinal) {
- columnarBatch.column(ordinal).putByte(rowId, (byte) value);
- }
+ public void putByte(int rowId, byte value, int ordinal) {
+ vector.putByte(rowId, value);
+ }
- public void putShort(int rowId, short value, int ordinal) {
- columnarBatch.column(ordinal).putShort(rowId, (short) value);
- }
+ public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ vector.putBytes(rowId, count, src, srcIndex);
+ }
- public void putInt(int rowId, int value, int ordinal) {
- columnarBatch.column(ordinal).putInt(rowId, (int) value);
- }
+ public void putShort(int rowId, short value, int ordinal) {
+ vector.putShort(rowId, value);
+ }
- public void putFloat(int rowId, float value, int ordinal) {
- columnarBatch.column(ordinal).putFloat(rowId, (float) value);
- }
+ public void putInt(int rowId, int value, int ordinal) {
+ vector.putInt(rowId, value);
+ }
- public void putLong(int rowId, long value, int ordinal) {
- columnarBatch.column(ordinal).putLong(rowId, (long) value);
- }
+ public void putFloat(int rowId, float value, int ordinal) {
+ vector.putFloat(rowId, value);
+ }
- public void putDouble(int rowId, double value, int ordinal) {
- columnarBatch.column(ordinal).putDouble(rowId, (double) value);
- }
+ public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+ vector.putFloats(rowId, count, src, srcIndex);
+ }
- public void putByteArray(int rowId, byte[] value, int ordinal) {
- columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
- }
+ public void putLong(int rowId, long value, int ordinal) {
+ vector.putLong(rowId, value);
+ }
- public void putInts(int rowId, int count, int value, int ordinal) {
- columnarBatch.column(ordinal).putInts(rowId, count, value);
- }
+ public void putDouble(int rowId, double value, int ordinal) {
+ vector.putDouble(rowId, value);
+ }
- public void putShorts(int rowId, int count, short value, int ordinal) {
- columnarBatch.column(ordinal).putShorts(rowId, count, value);
- }
+ public void putByteArray(int rowId, byte[] value, int ordinal) {
+ vector.putByteArray(rowId, value);
+ }
- public void putLongs(int rowId, int count, long value, int ordinal) {
- columnarBatch.column(ordinal).putLongs(rowId, count, value);
- }
+ public void putInts(int rowId, int count, int value, int ordinal) {
+ vector.putInts(rowId, count, value);
+ }
- public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
- columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
+ public void putInts(int rowId, int count, int[] src, int srcIndex) {
+ vector.putInts(rowId, count, src, srcIndex);
+ }
- }
+ public void putShorts(int rowId, int count, short value, int ordinal) {
+ vector.putShorts(rowId, count, value);
+ }
- public void putDoubles(int rowId, int count, double value, int ordinal) {
- columnarBatch.column(ordinal).putDoubles(rowId, count, value);
- }
+ public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+ vector.putShorts(rowId, count, src, srcIndex);
+ }
- public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
- columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
- }
+ public void putLongs(int rowId, int count, long value, int ordinal) {
+ vector.putLongs(rowId, count, value);
+ }
- public boolean isNullAt(int rowId, int ordinal) {
- return columnarBatch
- .column(ordinal).isNullAt(rowId);
- }
+ public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ vector.putLongs(rowId, count, src, srcIndex);
+ }
- public DataType dataType(int ordinal) {
- return columnarBatch.column(ordinal).dataType();
- }
+ public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+ vector.putDecimal(rowId, value, precision);
- public void putNotNull(int rowId, int ordinal) {
- columnarBatch.column(ordinal).putNotNull(rowId);
- }
+ }
- public void putNotNulls(int rowId, int count, int ordinal) {
- columnarBatch.column(ordinal).putNotNulls(rowId, count);
- }
+ public void putDoubles(int rowId, int count, double value, int ordinal) {
+ vector.putDoubles(rowId, count, value);
+ }
+
+ public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ vector.putDoubles(rowId, count, src, srcIndex);
+ }
+
+ public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+ vector.putByteArray(rowId, value, offset, length);
+ }
+
+ public boolean isNullAt(int rowId, int ordinal) {
+ return vector.isNullAt(rowId);
+ }
+
+ public DataType dataType(int ordinal) {
+ return vector.dataType();
+ }
+
+ public void putNotNull(int rowId, int ordinal) {
+ vector.putNotNull(rowId);
+ }
+
+ public void putNotNulls(int rowId, int count, int ordinal) {
+ vector.putNotNulls(rowId, count);
+ }
+
+ public void putDictionaryInt(int rowId, int value, int ordinal) {
+ vector.getDictionaryIds().putInt(rowId, value);
+ }
+
+ public void setDictionary(CarbonDictionary dictionary, int ordinal) {
+ if (null != dictionary) {
+ vector.setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
+ } else {
+ vector.setDictionary(null);
+ }
+ }
+
+ public void putNull(int rowId, int ordinal) {
+ vector.putNull(rowId);
+ }
+
+ public void putNulls(int rowId, int count, int ordinal) {
+ vector.putNulls(rowId, count);
+ }
+
+ public boolean hasDictionary(int ordinal) {
+ return vector.hasDictionary();
+ }
+
+ public Object reserveDictionaryIds(int capacity , int ordinal) {
+ return vector.reserveDictionaryIds(capacity);
+ }
+
+
- public void putDictionaryInt(int rowId, int value, int ordinal) {
- columnarBatch.column(ordinal).getDictionaryIds().putInt(rowId, (int) value);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
index 5a99c68..bd8c57c 100644
--- a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
@@ -28,10 +28,7 @@ public class CarbonDictionaryWrapper implements Dictionary {
private byte[][] binaries;
CarbonDictionaryWrapper(CarbonDictionary dictionary) {
- binaries = new byte[dictionary.getDictionarySize()][];
- for (int i = 0; i < binaries.length; i++) {
- binaries[i] = dictionary.getDictionaryValue(i);
- }
+ binaries = dictionary.getAllDictionaryValues();
}
@Override public int decodeToInt(int id) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
index 4a0fb9e..bd74b05 100644
--- a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
@@ -22,7 +22,6 @@ import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.Dictionary;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -38,7 +37,7 @@ import org.apache.spark.unsafe.types.UTF8String;
public class CarbonVectorProxy {
private ColumnarBatch columnarBatch;
- private WritableColumnVector[] columnVectors;
+ private ColumnVectorProxy[] columnVectorProxies;
/**
* Adapter class which handles the columnar vector reading of the carbondata
@@ -51,17 +50,25 @@ public class CarbonVectorProxy {
* @param structFileds, metadata related to current schema of table.
*/
public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
- columnVectors = ColumnVectorFactory
- .getColumnVector(memMode, new StructType(structFileds), rowNum);
+ WritableColumnVector[] columnVectors =
+ ColumnVectorFactory.getColumnVector(memMode, new StructType(structFileds), rowNum);
columnarBatch = new ColumnarBatch(columnVectors);
columnarBatch.setNumRows(rowNum);
+ columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+ for (int i = 0; i < columnVectorProxies.length; i++) {
+ columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+ }
}
public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
- columnVectors = ColumnVectorFactory
+ WritableColumnVector[] columnVectors = ColumnVectorFactory
.getColumnVector(memMode, outputSchema, rowNum);
columnarBatch = new ColumnarBatch(columnVectors);
columnarBatch.setNumRows(rowNum);
+ columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+ for (int i = 0; i < columnVectorProxies.length; i++) {
+ columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+ }
}
/**
@@ -71,10 +78,6 @@ public class CarbonVectorProxy {
return columnarBatch.numRows();
}
- public Object reserveDictionaryIds(int capacity, int ordinal) {
- return columnVectors[ordinal].reserveDictionaryIds(capacity);
- }
-
/**
* This API will return a columnvector from a batch of column vector rows
* based on the ordinal
@@ -86,21 +89,20 @@ public class CarbonVectorProxy {
return (WritableColumnVector) columnarBatch.column(ordinal);
}
- public WritableColumnVector getColumnVector(int ordinal) {
- return columnVectors[ordinal];
+ public ColumnVectorProxy getColumnVector(int ordinal) {
+ return columnVectorProxies[ordinal];
}
-
/**
* Resets this column for writing. The currently stored values are no longer accessible.
*/
public void reset() {
- for (WritableColumnVector col : columnVectors) {
- col.reset();
+ for (int i = 0; i < columnarBatch.numCols(); i++) {
+ ((WritableColumnVector)columnarBatch.column(i)).reset();
}
}
public void resetDictionaryIds(int ordinal) {
- columnVectors[ordinal].getDictionaryIds().reset();
+ ((WritableColumnVector)columnarBatch.column(ordinal)).getDictionaryIds().reset();
}
/**
@@ -133,146 +135,189 @@ public class CarbonVectorProxy {
columnarBatch.setNumRows(numRows);
}
- public void putRowToColumnBatch(int rowId, Object value, int offset) {
- org.apache.spark.sql.types.DataType t = dataType(offset);
- if (null == value) {
- putNull(rowId, offset);
- } else {
- if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
- putBoolean(rowId, (boolean) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
- putByte(rowId, (byte) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
- putShort(rowId, (short) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
- putInt(rowId, (int) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
- putLong(rowId, (long) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
- putFloat(rowId, (float) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
- putDouble(rowId, (double) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
- UTF8String v = (UTF8String) value;
- putByteArray(rowId, v.getBytes(), offset);
- } else if (t instanceof DecimalType) {
- DecimalType dt = (DecimalType) t;
- Decimal d = Decimal.fromDecimal(value);
- if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- putInt(rowId, (int) d.toUnscaledLong(), offset);
- } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- putLong(rowId, d.toUnscaledLong(), offset);
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- putByteArray(rowId, bytes, 0, bytes.length, offset);
+
+ public DataType dataType(int ordinal) {
+ return columnarBatch.column(ordinal).dataType();
+ }
+
+ public static class ColumnVectorProxy {
+
+ private WritableColumnVector vector;
+
+ public ColumnVectorProxy(ColumnarBatch columnarBatch, int ordinal) {
+ vector = (WritableColumnVector) columnarBatch.column(ordinal);
+ }
+
+ public void putRowToColumnBatch(int rowId, Object value, int offset) {
+ DataType t = dataType(offset);
+ if (null == value) {
+ putNull(rowId, offset);
+ } else {
+ if (t == DataTypes.BooleanType) {
+ putBoolean(rowId, (boolean) value, offset);
+ } else if (t == DataTypes.ByteType) {
+ putByte(rowId, (byte) value, offset);
+ } else if (t == DataTypes.ShortType) {
+ putShort(rowId, (short) value, offset);
+ } else if (t == DataTypes.IntegerType) {
+ putInt(rowId, (int) value, offset);
+ } else if (t == DataTypes.LongType) {
+ putLong(rowId, (long) value, offset);
+ } else if (t == DataTypes.FloatType) {
+ putFloat(rowId, (float) value, offset);
+ } else if (t == DataTypes.DoubleType) {
+ putDouble(rowId, (double) value, offset);
+ } else if (t == DataTypes.StringType) {
+ UTF8String v = (UTF8String) value;
+ putByteArray(rowId, v.getBytes(), offset);
+ } else if (t instanceof DecimalType) {
+ DecimalType dt = (DecimalType) t;
+ Decimal d = Decimal.fromDecimal(value);
+ if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+ putInt(rowId, (int) d.toUnscaledLong(), offset);
+ } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+ putLong(rowId, d.toUnscaledLong(), offset);
+ } else {
+ final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+ byte[] bytes = integer.toByteArray();
+ putByteArray(rowId, bytes, 0, bytes.length, offset);
+ }
+ } else if (t instanceof CalendarIntervalType) {
+ CalendarInterval c = (CalendarInterval) value;
+ vector.getChild(0).putInt(rowId, c.months);
+ vector.getChild(1).putLong(rowId, c.microseconds);
+ } else if (t instanceof DateType) {
+ putInt(rowId, (int) value, offset);
+ } else if (t instanceof TimestampType) {
+ putLong(rowId, (long) value, offset);
}
- } else if (t instanceof CalendarIntervalType) {
- CalendarInterval c = (CalendarInterval) value;
- columnVectors[offset].getChild(0).putInt(rowId, c.months);
- columnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
- } else if (t instanceof org.apache.spark.sql.types.DateType) {
- putInt(rowId, (int) value, offset);
- } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
- putLong(rowId, (long) value, offset);
}
}
- }
- public void putBoolean(int rowId, boolean value, int ordinal) {
- columnVectors[ordinal].putBoolean(rowId, (boolean) value);
- }
+ public void putBoolean(int rowId, boolean value, int ordinal) {
+ vector.putBoolean(rowId, value);
+ }
- public void putByte(int rowId, byte value, int ordinal) {
- columnVectors[ordinal].putByte(rowId, (byte) value);
- }
+ public void putByte(int rowId, byte value, int ordinal) {
+ vector.putByte(rowId, value);
+ }
- public void putShort(int rowId, short value, int ordinal) {
- columnVectors[ordinal].putShort(rowId, (short) value);
- }
+ public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ vector.putBytes(rowId, count, src, srcIndex);
+ }
- public void putInt(int rowId, int value, int ordinal) {
- columnVectors[ordinal].putInt(rowId, (int) value);
- }
+ public void putShort(int rowId, short value, int ordinal) {
+ vector.putShort(rowId, value);
+ }
- public void putDictionaryInt(int rowId, int value, int ordinal) {
- columnVectors[ordinal].getDictionaryIds().putInt(rowId, (int) value);
- }
+ public void putInt(int rowId, int value, int ordinal) {
+ vector.putInt(rowId, value);
+ }
- public void putFloat(int rowId, float value, int ordinal) {
- columnVectors[ordinal].putFloat(rowId, (float) value);
- }
+ public void putFloat(int rowId, float value, int ordinal) {
+ vector.putFloat(rowId, value);
+ }
- public void putLong(int rowId, long value, int ordinal) {
- columnVectors[ordinal].putLong(rowId, (long) value);
- }
+ public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+ vector.putFloats(rowId, count, src, srcIndex);
+ }
- public void putDouble(int rowId, double value, int ordinal) {
- columnVectors[ordinal].putDouble(rowId, (double) value);
- }
+ public void putLong(int rowId, long value, int ordinal) {
+ vector.putLong(rowId, value);
+ }
- public void putByteArray(int rowId, byte[] value, int ordinal) {
- columnVectors[ordinal].putByteArray(rowId, (byte[]) value);
- }
+ public void putDouble(int rowId, double value, int ordinal) {
+ vector.putDouble(rowId, value);
+ }
- public void putInts(int rowId, int count, int value, int ordinal) {
- columnVectors[ordinal].putInts(rowId, count, value);
- }
+ public void putByteArray(int rowId, byte[] value, int ordinal) {
+ vector.putByteArray(rowId, value);
+ }
- public void putShorts(int rowId, int count, short value, int ordinal) {
- columnVectors[ordinal].putShorts(rowId, count, value);
- }
+ public void putInts(int rowId, int count, int value, int ordinal) {
+ vector.putInts(rowId, count, value);
+ }
- public void putLongs(int rowId, int count, long value, int ordinal) {
- columnVectors[ordinal].putLongs(rowId, count, value);
- }
+ public void putInts(int rowId, int count, int[] src, int srcIndex) {
+ vector.putInts(rowId, count, src, srcIndex);
+ }
- public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
- columnVectors[ordinal].putDecimal(rowId, value, precision);
+ public void putShorts(int rowId, int count, short value, int ordinal) {
+ vector.putShorts(rowId, count, value);
+ }
- }
+ public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+ vector.putShorts(rowId, count, src, srcIndex);
+ }
- public void putDoubles(int rowId, int count, double value, int ordinal) {
- columnVectors[ordinal].putDoubles(rowId, count, value);
- }
+ public void putLongs(int rowId, int count, long value, int ordinal) {
+ vector.putLongs(rowId, count, value);
+ }
- public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
- columnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
- }
+ public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ vector.putLongs(rowId, count, src, srcIndex);
+ }
- public void putNull(int rowId, int ordinal) {
- columnVectors[ordinal].putNull(rowId);
- }
+ public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+ vector.putDecimal(rowId, value, precision);
- public void putNulls(int rowId, int count, int ordinal) {
- columnVectors[ordinal].putNulls(rowId, count);
- }
+ }
- public void putNotNull(int rowId, int ordinal) {
- columnVectors[ordinal].putNotNull(rowId);
- }
+ public void putDoubles(int rowId, int count, double value, int ordinal) {
+ vector.putDoubles(rowId, count, value);
+ }
- public void putNotNulls(int rowId, int count, int ordinal) {
- columnVectors[ordinal].putNotNulls(rowId, count);
- }
+ public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ vector.putDoubles(rowId, count, src, srcIndex);
+ }
- public boolean isNullAt(int rowId, int ordinal) {
- return columnVectors[ordinal].isNullAt(rowId);
- }
+ public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+ vector.putByteArray(rowId, value, offset, length);
+ }
- public boolean hasDictionary(int ordinal) {
- return columnVectors[ordinal].hasDictionary();
- }
+ public boolean isNullAt(int rowId, int ordinal) {
+ return vector.isNullAt(rowId);
+ }
- public void setDictionary(CarbonDictionary dictionary, int ordinal) {
+ public DataType dataType(int ordinal) {
+ return vector.dataType();
+ }
+
+ public void putNotNull(int rowId, int ordinal) {
+ vector.putNotNull(rowId);
+ }
+
+ public void putNotNulls(int rowId, int count, int ordinal) {
+ vector.putNotNulls(rowId, count);
+ }
+
+ public void putDictionaryInt(int rowId, int value, int ordinal) {
+ vector.getDictionaryIds().putInt(rowId, value);
+ }
+
+ public void setDictionary(CarbonDictionary dictionary, int ordinal) {
if (null != dictionary) {
- columnVectors[ordinal].setDictionary(new CarbonDictionaryWrapper(dictionary));
+ vector.setDictionary(new CarbonDictionaryWrapper(dictionary));
} else {
- columnVectors[ordinal].setDictionary(null);
+ vector.setDictionary(null);
+ }
+ }
+
+ public void putNull(int rowId, int ordinal) {
+ vector.putNull(rowId);
+ }
+
+ public void putNulls(int rowId, int count, int ordinal) {
+ vector.putNulls(rowId, count);
+ }
+
+ public boolean hasDictionary(int ordinal) {
+ return vector.hasDictionary();
+ }
+
+ public Object reserveDictionaryIds(int capacity, int ordinal) {
+ return vector.reserveDictionaryIds(capacity);
}
- }
- public DataType dataType(int ordinal) {
- return columnVectors[ordinal].dataType();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 6c65285..3330e8b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -705,7 +705,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
private void putRowToColumnBatch(int rowId) {
for (int i = 0; i < projection.length; i++) {
Object value = outputValues[i];
- vectorProxy.putRowToColumnBatch(rowId,value,i);
+ vectorProxy.getColumnVector(i).putRowToColumnBatch(rowId,value,i);
}
}
[3/3] carbondata git commit: [CARBONDATA-3012] Added support for full
scan queries for vector direct fill.
Posted by ku...@apache.org.
[CARBONDATA-3012] Added support for full scan queries for vector direct fill.
After decompressing the page in our V3 reader we can immediately fill the data to a vector without any condition checks inside loops.
So here complete column page data is set to column vector in a single batch and gives back data to Spark/Presto.
For this purpose, a new method is added in ColumnPageDecoder
ColumnPage decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
BitSet nullBits, boolean isLVEncoded)
The above method takes vector fill it in a single loop without any checks inside loop.
And also added new method inside DimensionDataChunkStore
void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
ColumnVectorInfo vectorInfo);
The above method takes vector fill it in a single loop without any checks inside loop.
This closes #2818
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3d3b6ff1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3d3b6ff1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3d3b6ff1
Branch: refs/heads/master
Commit: 3d3b6ff1615e08131f6bcaea23dec0116a18081d
Parents: e0baa9b
Author: ravipesala <ra...@gmail.com>
Authored: Tue Oct 16 11:30:43 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Oct 25 22:24:24 2018 +0530
----------------------------------------------------------------------
.../chunk/impl/DimensionRawColumnChunk.java | 17 ++
.../impl/FixedLengthDimensionColumnPage.java | 29 +-
.../chunk/impl/MeasureRawColumnChunk.java | 17 ++
.../impl/VariableLengthDimensionColumnPage.java | 29 +-
.../reader/DimensionColumnChunkReader.java | 7 +
.../chunk/reader/MeasureColumnChunkReader.java | 7 +
.../reader/dimension/AbstractChunkReader.java | 11 +
...essedDimChunkFileBasedPageLevelReaderV3.java | 2 +-
...mpressedDimensionChunkFileBasedReaderV3.java | 78 +++--
.../measure/AbstractMeasureChunkReader.java | 12 +
...CompressedMeasureChunkFileBasedReaderV3.java | 45 ++-
...essedMsrChunkFileBasedPageLevelReaderV3.java | 6 +-
.../chunk/store/DimensionChunkStoreFactory.java | 16 +-
.../chunk/store/DimensionDataChunkStore.java | 7 +
.../impl/LocalDictDimensionDataChunkStore.java | 25 ++
.../safe/AbstractNonDictionaryVectorFiller.java | 282 ++++++++++++++++++
.../SafeFixedLengthDimensionDataChunkStore.java | 51 +++-
...feVariableLengthDimensionDataChunkStore.java | 17 +-
.../UnsafeAbstractDimensionDataChunkStore.java | 6 +
.../datastore/columnar/BlockIndexerStorage.java | 5 +-
.../BlockIndexerStorageForNoDictionary.java | 3 +-
.../columnar/BlockIndexerStorageForShort.java | 3 +-
.../core/datastore/columnar/UnBlockIndexer.java | 3 +
.../core/datastore/impl/FileReaderImpl.java | 1 +
.../core/datastore/page/ColumnPage.java | 130 ++++----
.../page/ColumnPageValueConverter.java | 3 +
.../datastore/page/SafeDecimalColumnPage.java | 25 ++
.../datastore/page/VarLengthColumnPageBase.java | 17 +-
.../page/encoding/ColumnPageDecoder.java | 8 +
.../page/encoding/ColumnPageEncoderMeta.java | 11 +
.../page/encoding/EncodingFactory.java | 44 ++-
.../adaptive/AdaptiveDeltaFloatingCodec.java | 82 +++++
.../adaptive/AdaptiveDeltaIntegralCodec.java | 194 +++++++++++-
.../adaptive/AdaptiveFloatingCodec.java | 84 +++++-
.../adaptive/AdaptiveIntegralCodec.java | 157 ++++++++++
.../encoding/compress/DirectCompressCodec.java | 170 ++++++++++-
.../datastore/page/encoding/rle/RLECodec.java | 9 +
.../DateDirectDictionaryGenerator.java | 2 +-
.../datatype/DecimalConverterFactory.java | 91 +++++-
.../carbondata/core/mutate/DeleteDeltaVo.java | 4 +
.../DictionaryBasedVectorResultCollector.java | 112 +++++--
.../executor/impl/AbstractQueryExecutor.java | 13 +
.../scan/executor/infos/BlockExecutionInfo.java | 13 +
.../core/scan/executor/util/QueryUtil.java | 2 +-
.../carbondata/core/scan/model/QueryModel.java | 6 +-
.../core/scan/result/BlockletScannedResult.java | 76 ++++-
.../scan/result/vector/CarbonColumnVector.java | 18 +-
.../scan/result/vector/CarbonColumnarBatch.java | 4 +-
.../scan/result/vector/CarbonDictionary.java | 2 +
.../scan/result/vector/ColumnVectorInfo.java | 5 +
.../vector/impl/CarbonColumnVectorImpl.java | 67 ++++-
.../vector/impl/CarbonDictionaryImpl.java | 3 +
.../scan/scanner/impl/BlockletFullScanner.java | 4 +-
.../core/stats/QueryStatisticsModel.java | 13 +
.../apache/carbondata/core/util/ByteUtil.java | 8 +
.../executer/IncludeFilterExecuterImplTest.java | 6 +-
.../carbondata/core/util/CarbonUtilTest.java | 2 +-
.../presto/CarbonColumnVectorWrapper.java | 65 +++-
.../presto/readers/SliceStreamReader.java | 4 +-
.../filterexpr/AllDataTypesTestCaseFilter.scala | 18 +-
.../vectorreader/ColumnarVectorWrapper.java | 70 ++++-
.../ColumnarVectorWrapperDirect.java | 229 ++++++++++++++
.../VectorizedCarbonRecordReader.java | 30 +-
.../org/apache/spark/sql/CarbonVectorProxy.java | 295 ++++++++++--------
.../spark/sql/CarbonDictionaryWrapper.java | 5 +-
.../org/apache/spark/sql/CarbonVectorProxy.java | 297 +++++++++++--------
.../stream/CarbonStreamRecordReader.java | 2 +-
67 files changed, 2616 insertions(+), 463 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index 7b1aca1..d84434e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.format.Encoding;
@@ -121,6 +122,22 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
}
}
+ /**
+ * Convert raw data with specified page number processed to DimensionColumnDataChunk and fill
+ * the vector
+ *
+ * @param pageNumber page number to decode and fill the vector
+ * @param vectorInfo vector to be filled with column page
+ */
+ public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
+ assert pageNumber < pagesCount;
+ try {
+ chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override public void freeMemory() {
super.freeMemory();
if (null != dataChunks) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
index c815e4d..e650e0e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
@@ -46,11 +46,38 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
dataChunk.length;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
- DimensionStoreType.FIXED_LENGTH, null);
+ DimensionStoreType.FIXED_LENGTH, null, false);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
}
/**
+ * Constructor
+ *
+ * @param dataChunk data chunk
+ * @param invertedIndex inverted index
+ * @param invertedIndexReverse reverse inverted index
+ * @param numberOfRows number of rows
+ * @param columnValueSize size of each column value
+ * @param vectorInfo vector to be filled with decoded column page.
+ */
+ public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
+ int[] invertedIndexReverse, int numberOfRows, int columnValueSize,
+ ColumnVectorInfo vectorInfo) {
+ boolean isExplicitSorted = isExplicitSorted(invertedIndex);
+ long totalSize = isExplicitSorted ?
+ dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
+ dataChunk.length;
+ dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+ .getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
+ DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null);
+ if (vectorInfo == null) {
+ dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
+ } else {
+ dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunk, vectorInfo);
+ }
+ }
+
+ /**
* Below method will be used to fill the data based on offset and row id
*
* @param rowId row id of the chunk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 9448f30..6a90569 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Contains raw measure data
@@ -105,6 +106,22 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
}
}
+ /**
+ * Convert raw data with specified page number processed to DimensionColumnDataChunk and fill the
+ * vector
+ *
+ * @param pageNumber page number to decode and fill the vector
+ * @param vectorInfo vector to be filled with column page
+ */
+ public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
+ assert pageNumber < pagesCount;
+ try {
+ chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
+ } catch (IOException | MemoryException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override public void freeMemory() {
super.freeMemory();
if (null != columnPages) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
index a404ff7..6cb8174 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
@@ -30,10 +30,31 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
/**
* Constructor for this class
+ * @param dataChunks data chunk
+ * @param invertedIndex inverted index
+ * @param invertedIndexReverse reverse inverted index
+ * @param numberOfRows number of rows
+ * @param dictionary carbon local dictionary for string column.
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary) {
+ this(dataChunks, invertedIndex, invertedIndexReverse, numberOfRows, dimStoreType, dictionary,
+ null);
+ }
+
+ /**
+ * Constructor for this class
+ * @param dataChunks data chunk
+ * @param invertedIndex inverted index
+ * @param invertedIndexReverse reverse inverted index
+ * @param numberOfRows number of rows
+ * @param dictionary carbon local dictionary for string column.
+ * @param vectorInfo vector to be filled with decoded column page.
+ */
+ public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
+ int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
+ CarbonDictionary dictionary, ColumnVectorInfo vectorInfo) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = 0;
switch (dimStoreType) {
@@ -54,8 +75,12 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
}
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType,
- dictionary);
- dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+ dictionary, vectorInfo != null);
+ if (vectorInfo != null) {
+ dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunks, vectorInfo);
+ } else {
+ dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
index fd81973..e2d6be7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Interface for reading the data chunk
@@ -60,4 +61,10 @@ public interface DimensionColumnChunkReader {
*/
DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber) throws IOException, MemoryException;
+
+ /**
+ * Decodes the raw data chunk of given page number and fill the vector with decoded data.
+ */
+ void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
index f1392d0..0fbbe6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Reader interface for reading the measure blocks from file
@@ -58,4 +59,10 @@ public interface MeasureColumnChunkReader {
ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber) throws IOException, MemoryException;
+ /**
+ * Decode raw data and fill the vector
+ */
+ void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
index b08f9ed..2c42abe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
@@ -16,10 +16,15 @@
*/
package org.apache.carbondata.core.datastore.chunk.reader.dimension;
+import java.io.IOException;
+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonProperties;
/**
@@ -79,4 +84,10 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
this.numberOfRows = numberOfRows;
}
+ @Override
+ public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+ throw new UnsupportedOperationException(
+ "This operation is not supported in this reader " + this.getClass().getName());
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
index 6efaf8a..86a4334 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
@@ -171,6 +171,6 @@ public class CompressedDimChunkFileBasedPageLevelReaderV3
ByteBuffer rawData = dimensionRawColumnChunk.getFileReader()
.readByteBuffer(filePath, offset, length);
- return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0);
+ return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0, null);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index b96e52e..a9f9338 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -39,6 +40,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
@@ -207,6 +209,12 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
*/
@Override public DimensionColumnPage decodeColumnPage(
DimensionRawColumnChunk rawColumnPage, int pageNumber) throws IOException, MemoryException {
+ return decodeColumnPage(rawColumnPage, pageNumber, null);
+ }
+
+ private DimensionColumnPage decodeColumnPage(
+ DimensionRawColumnChunk rawColumnPage, int pageNumber,
+ ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
// data chunk of blocklet column
DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
// get the data buffer
@@ -221,49 +229,65 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
int offset = (int) rawColumnPage.getOffSet() + dimensionChunksLength
.get(rawColumnPage.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber);
// first read the data and uncompressed it
- return decodeDimension(rawColumnPage, rawData, pageMetadata, offset);
+ return decodeDimension(rawColumnPage, rawData, pageMetadata, offset, vectorInfo);
+ }
+
+ @Override
+ public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+ DimensionColumnPage columnPage =
+ decodeColumnPage(dimensionRawColumnChunk, pageNumber, vectorInfo);
+ columnPage.freeMemory();
}
- private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
- ByteBuffer pageData, int offset, boolean isLocalDictEncodedPage)
+ private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
+ boolean isLocalDictEncodedPage, ColumnVectorInfo vectorInfo, BitSet nullBitSet)
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
- compressorName);
- return decoder
- .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
+ compressorName, vectorInfo != null);
+ if (vectorInfo != null) {
+ decoder
+ .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
+ nullBitSet, isLocalDictEncodedPage);
+ return null;
+ } else {
+ return decoder
+ .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
+ }
}
protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
- ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
+ ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
if (CarbonUtil.isEncodedWithMeta(encodings)) {
- ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
- null != rawColumnPage.getLocalDictionary());
- decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
// in case of no dictionary measure data types, if it is included in sort columns
// then inverted index to be uncompressed
- if (encodings.contains(Encoding.INVERTED_INDEX)) {
+ boolean isExplicitSorted =
+ CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX);
+ int dataOffset = offset;
+ if (isExplicitSorted) {
offset += pageMetadata.data_page_length;
- if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
- invertedIndexes = CarbonUtil
- .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
- // get the reverse index
- invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
- }
+ invertedIndexes = CarbonUtil
+ .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
+ // get the reverse index
+ invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
+ BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+ ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset,
+ null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet);
+ decodedPage.setNullBits(nullBitSet);
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
- invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
- CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
+ invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata), isExplicitSorted);
} else {
// following code is for backward compatibility
- return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
+ return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset, vectorInfo);
}
}
@@ -283,8 +307,8 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
}
private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
- ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
- MemoryException {
+ ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
+ throws IOException, MemoryException {
byte[] dataPage;
int[] rlePage;
int[] invertedIndexes = new int[0];
@@ -296,8 +320,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
offset += pageMetadata.rowid_page_length;
- // get the reverse index
- invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+ if (vectorInfo == null) {
+ // get the reverse index
+ invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+ }
}
// if rle is applied then read the rle block chunk and then uncompress
//then actual data based on rle block
@@ -324,13 +350,13 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
columnDataChunk =
new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
pageMetadata.getNumberOfRowsInpage(), dimStoreType,
- rawColumnPage.getLocalDictionary());
+ rawColumnPage.getLocalDictionary(), vectorInfo);
} else {
// to store fixed length column chunk values
columnDataChunk =
new FixedLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
pageMetadata.getNumberOfRowsInpage(),
- eachColumnValueSize[rawColumnPage.getColumnIndex()]);
+ eachColumnValueSize[rawColumnPage.getColumnIndex()], vectorInfo);
}
return columnDataChunk;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index 6774fcb..cd233d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -16,10 +16,15 @@
*/
package org.apache.carbondata.core.datastore.chunk.reader.measure;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Measure block reader abstract class
@@ -48,4 +53,11 @@ public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkRe
this.filePath = filePath;
this.numberOfRows = numberOfRows;
}
+
+ @Override
+ public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+ throw new UnsupportedOperationException(
+ "This operation is not supported in this class " + getClass().getName());
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 240771a..8394029 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.datastore.FileReader;
@@ -29,6 +30,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
@@ -190,6 +192,18 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
public ColumnPage decodeColumnPage(
MeasureRawColumnChunk rawColumnChunk, int pageNumber)
throws IOException, MemoryException {
+ return decodeColumnPage(rawColumnChunk, pageNumber, null);
+ }
+
+ @Override
+ public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+ decodeColumnPage(measureRawColumnChunk, pageNumber, vectorInfo);
+ }
+
+ private ColumnPage decodeColumnPage(
+ MeasureRawColumnChunk rawColumnChunk, int pageNumber, ColumnVectorInfo vectorInfo)
+ throws IOException, MemoryException {
// data chunk of blocklet column
DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
// data chunk of page
@@ -203,23 +217,34 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
int offset = (int) rawColumnChunk.getOffSet() +
measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
dataChunk3.getPage_offset().get(pageNumber);
- ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
- decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
+ BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+ ColumnPage decodedPage =
+ decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset, vectorInfo, nullBitSet);
+ if (decodedPage == null) {
+ return null;
+ }
+ decodedPage.setNullBits(nullBitSet);
return decodedPage;
}
/**
* Decode measure column page with page header and raw data starting from offset
*/
- protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset)
- throws MemoryException, IOException {
+ protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
+ ColumnVectorInfo vectorInfo, BitSet nullBitSet) throws MemoryException, IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
- String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
- pageMetadata.getChunk_meta());
- ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
- compressorName);
- return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
+ String compressorName =
+ CarbonMetadataUtil.getCompressorNameFromChunkMeta(pageMetadata.getChunk_meta());
+ ColumnPageDecoder codec =
+ encodingFactory.createDecoder(encodings, encoderMetas, compressorName, vectorInfo != null);
+ if (vectorInfo != null) {
+ codec
+ .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
+ nullBitSet, false);
+ return null;
+ } else {
+ return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
index 924a206..b092350 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
@@ -151,8 +152,9 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3
ByteBuffer buffer = rawColumnPage.getFileReader()
.readByteBuffer(filePath, offset, pageMetadata.data_page_length);
- ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0);
- decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
+ BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+ ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0, null, nullBitSet);
+ decodedPage.setNullBits(nullBitSet);
return decodedPage;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
index c7bcef1..5346f35 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
@@ -65,8 +65,8 @@ public class DimensionChunkStoreFactory {
*/
public DimensionDataChunkStore getDimensionChunkStore(int columnValueSize,
boolean isInvertedIndex, int numberOfRows, long totalSize, DimensionStoreType storeType,
- CarbonDictionary dictionary) {
- if (isUnsafe) {
+ CarbonDictionary dictionary, boolean fillDirectVector) {
+ if (isUnsafe && !fillDirectVector) {
switch (storeType) {
case FIXED_LENGTH:
return new UnsafeFixedLengthDimensionDataChunkStore(totalSize, columnValueSize,
@@ -79,24 +79,24 @@ public class DimensionChunkStoreFactory {
numberOfRows);
case LOCAL_DICT:
return new LocalDictDimensionDataChunkStore(
- new UnsafeFixedLengthDimensionDataChunkStore(totalSize,
- 3, isInvertedIndex, numberOfRows),
- dictionary);
+ new UnsafeFixedLengthDimensionDataChunkStore(totalSize, 3, isInvertedIndex,
+ numberOfRows), dictionary);
default:
throw new UnsupportedOperationException("Invalid dimension store type");
}
} else {
switch (storeType) {
case FIXED_LENGTH:
- return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize);
+ return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize,
+ numberOfRows);
case VARIABLE_SHORT_LENGTH:
return new SafeVariableShortLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
case VARIABLE_INT_LENGTH:
return new SafeVariableIntLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
case LOCAL_DICT:
return new LocalDictDimensionDataChunkStore(
- new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex,
- 3), dictionary);
+ new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, 3, numberOfRows),
+ dictionary);
default:
throw new UnsupportedOperationException("Invalid dimension store type");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
index 28aed5b..8972ddb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.core.datastore.chunk.store;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Interface responsibility is to store dimension data in memory.
@@ -35,6 +36,12 @@ public interface DimensionDataChunkStore {
void putArray(int[] invertedIndex, int[] invertedIndexReverse, byte[] data);
/**
+ * Fill the vector with decoded data.
+ */
+ void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo);
+
+ /**
* Below method will be used to get the row
* based on row id passed
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index 0d06f61..e70424f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -21,6 +21,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
/**
* Dimension chunk store for local dictionary encoded data.
@@ -49,6 +51,29 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore
this.dimensionDataChunkStore.putArray(invertedIndex, invertedIndexReverse, data);
}
+ @Override
+ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo) {
+ int columnValueSize = dimensionDataChunkStore.getColumnValueSize();
+ int rowsNum = data.length / columnValueSize;
+ CarbonColumnVector vector = vectorInfo.vector;
+ if (!dictionary.isDictionaryUsed()) {
+ vector.setDictionary(dictionary);
+ dictionary.setDictionaryUsed();
+ }
+ for (int i = 0; i < rowsNum; i++) {
+ int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+ if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+ vector.putNull(i);
+ vector.getDictionaryVector().putNull(i);
+ } else {
+ vector.putNotNull(i);
+ vector.getDictionaryVector().putInt(i, surrogate);
+ }
+
+ }
+ }
+
@Override public byte[] getRow(int rowId) {
return dictionary.getDictionaryValue(dimensionDataChunkStore.getSurrogate(rowId));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
new file mode 100644
index 0000000..ddfa470
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public abstract class AbstractNonDictionaryVectorFiller {
+
+ protected int lengthSize;
+ protected int numberOfRows;
+
+ public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) {
+ this.lengthSize = lengthSize;
+ this.numberOfRows = numberOfRows;
+ }
+
+ public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer);
+
+ public int getLengthFromBuffer(ByteBuffer buffer) {
+ return buffer.getShort();
+ }
+}
+
+class NonDictionaryVectorFillerFactory {
+
+ public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize,
+ int numberOfRows) {
+ if (type == DataTypes.STRING) {
+ return new StringVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.VARCHAR) {
+ return new LongStringVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.TIMESTAMP) {
+ return new TimeStampVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.BOOLEAN) {
+ return new BooleanVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.SHORT) {
+ return new ShortVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.INT) {
+ return new IntVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.LONG) {
+ return new LongVectorFiller(lengthSize, numberOfRows);
+ } else {
+ throw new UnsupportedOperationException("Not supported datatype : " + type);
+ }
+
+ }
+
+}
+
+class StringVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public StringVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ // as first position will be start from length of bytes as data is stored first in the memory
+ // block we need to skip first two bytes this is because first two bytes will be length of the
+ // data which we have to skip
+ int currentOffset = lengthSize;
+ ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+ CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+ vector.putNull(i);
+ } else {
+ vector.putByteArray(i, currentOffset, length, data);
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ // Handle last row
+ int length = (data.length - currentOffset);
+ if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+ CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putByteArray(numberOfRows - 1, currentOffset, length, data);
+ }
+ }
+}
+
+class LongStringVectorFiller extends StringVectorFiller {
+ public LongStringVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public int getLengthFromBuffer(ByteBuffer buffer) {
+ return buffer.getInt();
+ }
+}
+
+class BooleanVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public BooleanVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putBoolean(i, ByteUtil.toBoolean(data[currentOffset]));
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putBoolean(numberOfRows - 1, ByteUtil.toBoolean(data[currentOffset]));
+ }
+ }
+}
+
+class ShortVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public ShortVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putShort(i, ByteUtil.toXorShort(data, currentOffset, length));
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putShort(numberOfRows - 1, ByteUtil.toXorShort(data, currentOffset, length));
+ }
+ }
+}
+
+class IntVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public IntVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putInt(i, ByteUtil.toXorInt(data, currentOffset, length));
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putInt(numberOfRows - 1, ByteUtil.toXorInt(data, currentOffset, length));
+ }
+ }
+}
+
+class LongVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public LongVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putLong(i, DataTypeUtil
+ .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+ length));
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putLong(numberOfRows - 1, DataTypeUtil
+ .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+ length));
+ }
+ }
+}
+
+class TimeStampVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public TimeStampVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putLong(i, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putLong(numberOfRows - 1, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 41218d0..d30650d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -17,6 +17,12 @@
package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -30,9 +36,52 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
*/
private int columnValueSize;
- public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize) {
+ private int numOfRows;
+
+ public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize,
+ int numOfRows) {
super(isInvertedIndex);
this.columnValueSize = columnValueSize;
+ this.numOfRows = numOfRows;
+ }
+
+ @Override
+ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ fillVector(data, vectorInfo, vector);
+ }
+
+ private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) {
+ DataType dataType = vectorInfo.vector.getBlockDataType();
+ if (dataType == DataTypes.DATE) {
+ for (int i = 0; i < numOfRows; i++) {
+ int surrogateInternal =
+ CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+ if (surrogateInternal == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+ vector.putNull(i);
+ } else {
+ vector.putInt(i, surrogateInternal - DateDirectDictionaryGenerator.cutOffDate);
+ }
+ }
+ } else if (dataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < numOfRows; i++) {
+ int surrogateInternal =
+ CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+ if (surrogateInternal == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+ vector.putNull(i);
+ } else {
+ Object valueFromSurrogate =
+ vectorInfo.directDictionaryGenerator.getValueFromSurrogate(surrogateInternal);
+ vector.putLong(i, (long)valueFromSurrogate);
+ }
+ }
+ } else {
+ for (int i = 0; i < numOfRows; i++) {
+ vector.putInt(i,
+ CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize));
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 8553506..0fb4854 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -91,6 +92,20 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
}
}
+ @Override
+ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo) {
+ this.invertedIndexReverse = invertedIndex;
+ int lengthSize = getLengthSize();
+ CarbonColumnVector vector = vectorInfo.vector;
+ DataType dt = vector.getType();
+ // creating a byte buffer which will wrap the length of the row
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ AbstractNonDictionaryVectorFiller vectorFiller =
+ NonDictionaryVectorFillerFactory.getVectorFiller(dt, lengthSize, numberOfRows);
+ vectorFiller.fillVector(data, vector, buffer);
+ }
+
protected abstract int getLengthSize();
protected abstract int getLengthFromBuffer(ByteBuffer buffer);
@@ -150,7 +165,7 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
vector.putNull(vectorRow);
} else {
if (dt == DataTypes.STRING) {
- vector.putBytes(vectorRow, currentDataOffset, length, data);
+ vector.putByteArray(vectorRow, currentDataOffset, length, data);
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));
} else if (dt == DataTypes.SHORT) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
index 89bce2d..57e9de5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
/**
@@ -115,6 +116,11 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension
}
}
+ @Override public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo) {
+ throw new UnsupportedOperationException("This method not supposed to be called here");
+ }
+
/**
* Below method will be used to free the memory occupied by the column chunk
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
index 6f3f139..44b3c12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
@@ -50,8 +50,9 @@ public abstract class BlockIndexerStorage<T> {
*
* @param rowIds
*/
- protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds, short[] rowIdPage,
- short[] rowIdRlePage) {
+ protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds) {
+ short[] rowIdPage;
+ short[] rowIdRlePage;
List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
int k = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
index b3e25d3..bcf5432 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
@@ -39,8 +39,7 @@ public class BlockIndexerStorageForNoDictionary extends BlockIndexerStorage<Obje
Arrays.sort(dataWithRowId);
}
short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
- Map<String, short[]> rowIdAndRleRowIdPages =
- rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+ Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
index f1b9af2..b30396c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -43,8 +43,7 @@ public class BlockIndexerStorageForShort extends BlockIndexerStorage<byte[][]> {
Arrays.sort(dataWithRowId);
}
short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
- Map<String, short[]> rowIdAndRleRowIdPages =
- rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+ Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
if (rleOnData) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
index a7f38cd..48484ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
@@ -28,6 +28,9 @@ public final class UnBlockIndexer {
public static int[] uncompressIndex(int[] indexData, int[] indexMap) {
int actualSize = indexData.length;
int mapLength = indexMap.length;
+ if (indexMap.length == 0) {
+ return indexData;
+ }
for (int i = 0; i < mapLength; i++) {
actualSize += indexData[indexMap[i] + 1] - indexData[indexMap[i]] - 1;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
index 6fef278..9f0abd9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
@@ -76,6 +76,7 @@ public class FileReaderImpl implements FileReader {
channel.close();
}
}
+ fileNameAndStreamCache.clear();
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index e8097da..e5312f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -37,14 +37,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonProperties;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.FLOAT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.INT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.LONG;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT_INT;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.*;
public abstract class ColumnPage {
@@ -90,7 +83,7 @@ public abstract class ColumnPage {
private static ColumnPage createDecimalPage(ColumnPageEncoderMeta columnPageEncoderMeta,
int pageSize) {
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
try {
return new UnsafeDecimalColumnPage(columnPageEncoderMeta, pageSize);
} catch (MemoryException e) {
@@ -103,7 +96,7 @@ public abstract class ColumnPage {
private static ColumnPage createVarLengthPage(ColumnPageEncoderMeta columnPageEncoderMeta,
int pageSize) {
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
try {
return new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
} catch (MemoryException e) {
@@ -116,7 +109,7 @@ public abstract class ColumnPage {
private static ColumnPage createFixLengthPage(
ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
try {
return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize);
} catch (MemoryException e) {
@@ -129,7 +122,7 @@ public abstract class ColumnPage {
private static ColumnPage createFixLengthByteArrayPage(
ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int eachValueSize) {
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
try {
return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize, eachValueSize);
} catch (MemoryException e) {
@@ -163,7 +156,7 @@ public abstract class ColumnPage {
CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK_DEFAULT));
ColumnPage actualPage;
ColumnPage encodedPage;
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
actualPage = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
encodedPage = new UnsafeFixLengthColumnPage(
new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), DataTypes.BYTE_ARRAY,
@@ -190,7 +183,7 @@ public abstract class ColumnPage {
DataType dataType = columnPageEncoderMeta.getStoreDataType();
TableSpec.ColumnSpec columnSpec = columnPageEncoderMeta.getColumnSpec();
String compressorName = columnPageEncoderMeta.getCompressorName();
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
if (dataType == DataTypes.BOOLEAN) {
instance = new UnsafeFixLengthColumnPage(
new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), pageSize);
@@ -219,21 +212,23 @@ public abstract class ColumnPage {
}
} else {
if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
- instance = newBytePage(columnSpec, new byte[pageSize], compressorName);
+ instance = newBytePage(columnPageEncoderMeta, new byte[pageSize]);
} else if (dataType == DataTypes.SHORT) {
- instance = newShortPage(columnSpec, new short[pageSize], compressorName);
+ instance = newShortPage(columnPageEncoderMeta, new short[pageSize]);
} else if (dataType == DataTypes.SHORT_INT) {
- instance = newShortIntPage(columnSpec, new byte[pageSize * 3], compressorName);
+ instance = newShortIntPage(columnPageEncoderMeta, new byte[pageSize * 3]);
} else if (dataType == DataTypes.INT) {
- instance = newIntPage(columnSpec, new int[pageSize], compressorName);
+ instance = newIntPage(columnPageEncoderMeta, new int[pageSize]);
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
- instance = newLongPage(columnSpec, new long[pageSize], compressorName);
+ instance = newLongPage(
+ new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), LONG,
+ columnPageEncoderMeta.getCompressorName()), new long[pageSize]);
} else if (dataType == DataTypes.FLOAT) {
- instance = newFloatPage(columnSpec, new float[pageSize], compressorName);
+ instance = newFloatPage(columnPageEncoderMeta, new float[pageSize]);
} else if (dataType == DataTypes.DOUBLE) {
- instance = newDoublePage(columnSpec, new double[pageSize], compressorName);
+ instance = newDoublePage(columnPageEncoderMeta, new double[pageSize]);
} else if (DataTypes.isDecimal(dataType)) {
- instance = newDecimalPage(columnSpec, new byte[pageSize][], compressorName);
+ instance = newDecimalPage(columnPageEncoderMeta, new byte[pageSize][]);
} else if (dataType == DataTypes.STRING
|| dataType == DataTypes.BYTE_ARRAY
|| dataType == DataTypes.VARCHAR) {
@@ -253,75 +248,67 @@ public abstract class ColumnPage {
return columnPage;
}
- private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData,
- String compressorName) {
+ private static ColumnPage newBytePage(ColumnPageEncoderMeta meta, byte[] byteData) {
+ ColumnPageEncoderMeta encoderMeta =
+ new ColumnPageEncoderMeta(meta.getColumnSpec(), BYTE, meta.getCompressorName());
+ encoderMeta.setFillCompleteVector(meta.isFillCompleteVector());
ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), byteData.length);
+ encoderMeta, byteData.length);
columnPage.setBytePage(byteData);
return columnPage;
}
- private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, SHORT, compressorName), shortData.length);
+ private static ColumnPage newShortPage(ColumnPageEncoderMeta meta, short[] shortData) {
+ ColumnPage columnPage = createPage(meta, shortData.length);
columnPage.setShortPage(shortData);
return columnPage;
}
- private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, SHORT_INT, compressorName), shortIntData.length / 3);
+ private static ColumnPage newShortIntPage(ColumnPageEncoderMeta meta, byte[] shortIntData) {
+ ColumnPage columnPage = createPage(meta, shortIntData.length / 3);
columnPage.setShortIntPage(shortIntData);
return columnPage;
}
- private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, INT, compressorName), intData.length);
+ private static ColumnPage newIntPage(ColumnPageEncoderMeta meta, int[] intData) {
+ ColumnPage columnPage = createPage(meta, intData.length);
columnPage.setIntPage(intData);
return columnPage;
}
- private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, LONG, compressorName), longData.length);
+ private static ColumnPage newLongPage(ColumnPageEncoderMeta meta, long[] longData) {
+ ColumnPage columnPage = createPage(meta, longData.length);
columnPage.setLongPage(longData);
return columnPage;
}
- private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, FLOAT, compressorName), floatData.length);
+ private static ColumnPage newFloatPage(ColumnPageEncoderMeta meta, float[] floatData) {
+ ColumnPage columnPage = createPage(meta, floatData.length);
columnPage.setFloatPage(floatData);
return columnPage;
}
- private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, DOUBLE, compressorName), doubleData.length);
+ private static ColumnPage newDoublePage(ColumnPageEncoderMeta meta, double[] doubleData) {
+ ColumnPage columnPage = createPage(meta, doubleData.length);
columnPage.setDoublePage(doubleData);
return columnPage;
}
- private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray,
- String compressorName) {
+ private static ColumnPage newDecimalPage(ColumnPageEncoderMeta meta, byte[][] byteArray) {
+ ColumnPageEncoderMeta encoderMeta =
+ new ColumnPageEncoderMeta(meta.getColumnSpec(), meta.getColumnSpec().getSchemaDataType(),
+ meta.getCompressorName());
+ encoderMeta.setFillCompleteVector(meta.isFillCompleteVector());
ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+ encoderMeta,
byteArray.length);
columnPage.setByteArrayPage(byteArray);
return columnPage;
}
- private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedByteArray, String compressorName) throws MemoryException {
- return VarLengthColumnPageBase.newDecimalColumnPage(
- columnSpec, lvEncodedByteArray, compressorName);
+ private static ColumnPage newDecimalPage(ColumnPageEncoderMeta meta,
+ byte[] lvEncodedByteArray) throws MemoryException {
+ return VarLengthColumnPageBase.newDecimalColumnPage(meta, lvEncodedByteArray);
}
private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec,
@@ -813,25 +800,25 @@ public abstract class ColumnPage {
DataType storeDataType = meta.getStoreDataType();
if (storeDataType == DataTypes.BOOLEAN || storeDataType == DataTypes.BYTE) {
byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
- return newBytePage(columnSpec, byteData, meta.getCompressorName());
+ return newBytePage(meta, byteData);
} else if (storeDataType == DataTypes.SHORT) {
short[] shortData = compressor.unCompressShort(compressedData, offset, length);
- return newShortPage(columnSpec, shortData, meta.getCompressorName());
+ return newShortPage(meta, shortData);
} else if (storeDataType == DataTypes.SHORT_INT) {
byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
- return newShortIntPage(columnSpec, shortIntData, meta.getCompressorName());
+ return newShortIntPage(meta, shortIntData);
} else if (storeDataType == DataTypes.INT) {
int[] intData = compressor.unCompressInt(compressedData, offset, length);
- return newIntPage(columnSpec, intData, meta.getCompressorName());
+ return newIntPage(meta, intData);
} else if (storeDataType == DataTypes.LONG) {
long[] longData = compressor.unCompressLong(compressedData, offset, length);
- return newLongPage(columnSpec, longData, meta.getCompressorName());
+ return newLongPage(meta, longData);
} else if (storeDataType == DataTypes.FLOAT) {
float[] floatData = compressor.unCompressFloat(compressedData, offset, length);
- return newFloatPage(columnSpec, floatData, meta.getCompressorName());
+ return newFloatPage(meta, floatData);
} else if (storeDataType == DataTypes.DOUBLE) {
double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
- return newDoublePage(columnSpec, doubleData, meta.getCompressorName());
+ return newDoublePage(meta, doubleData);
} else if (!isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && (
columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
|| columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
@@ -873,8 +860,7 @@ public abstract class ColumnPage {
public static ColumnPage decompressDecimalPage(ColumnPageEncoderMeta meta, byte[] compressedData,
int offset, int length) throws MemoryException {
Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
- TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
- ColumnPage decimalPage = null;
+ ColumnPage decimalPage;
DataType storeDataType = meta.getStoreDataType();
if (storeDataType == DataTypes.BYTE) {
byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
@@ -888,7 +874,7 @@ public abstract class ColumnPage {
return decimalPage;
} else if (storeDataType == DataTypes.SHORT_INT) {
byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
- decimalPage = createDecimalPage(meta, shortIntData.length);
+ decimalPage = createDecimalPage(meta, shortIntData.length / 3);
decimalPage.setShortIntPage(shortIntData);
return decimalPage;
} else if (storeDataType == DataTypes.INT) {
@@ -903,10 +889,20 @@ public abstract class ColumnPage {
return decimalPage;
} else {
byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
- return newDecimalPage(columnSpec, lvEncodedBytes, meta.getCompressorName());
+ return newDecimalPage(meta, lvEncodedBytes);
}
}
+ /**
+ * Whether unsafe enabled or not. In case of filling complete vector flow there is no need to use
+ * unsafe flow as we don't store the data in memory for long time.
+ * @param meta ColumnPageEncoderMeta
+ * @return boolean Whether unsafe enabled or not
+ */
+ protected static boolean isUnsafeEnabled(ColumnPageEncoderMeta meta) {
+ return unsafe && !meta.isFillCompleteVector();
+ }
+
public BitSet getNullBits() {
return nullBitSet;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
index 53ad956..82ccd22 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.datastore.page;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
// Transformation type that can be applied to ColumnPage
public interface ColumnPageValueConverter {
void encode(int rowId, byte value);
@@ -35,4 +37,5 @@ public interface ColumnPageValueConverter {
double decodeDouble(long value);
double decodeDouble(float value);
double decodeDouble(double value);
+ void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index d3e945d..1867354 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -194,6 +194,31 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
}
@Override
+ public byte[] getBytePage() {
+ return byteData;
+ }
+
+ @Override public short[] getShortPage() {
+ return shortData;
+ }
+
+ @Override public byte[] getShortIntPage() {
+ return shortIntData;
+ }
+
+ @Override public int[] getIntPage() {
+ return intData;
+ }
+
+ @Override public long[] getLongPage() {
+ return longData;
+ }
+
+ @Override public byte[][] getByteArrayPage() {
+ return byteArrayData;
+ }
+
+ @Override
public void freeMemory() {
byteArrayData = null;
super.freeMemory();
[2/3] carbondata git commit: [CARBONDATA-3012] Added support for full
scan queries for vector direct fill.
Posted by ku...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 39b8282..a760b64 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -124,8 +124,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
/**
* Create a new column page for decimal page
*/
- static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
- String compressorName) throws MemoryException {
+ static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
+ byte[] lvEncodedBytes) throws MemoryException {
+ TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
DecimalConverterFactory.DecimalConverter decimalConverter =
DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
columnSpec.getScale());
@@ -133,10 +134,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
if (size < 0) {
return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
- CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName);
+ CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
} else {
// Here the size is always fixed.
- return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName);
+ return getDecimalColumnPage(meta, lvEncodedBytes, size);
}
}
@@ -158,8 +159,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
lvLength, compressorName);
}
- private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException {
+ private static ColumnPage getDecimalColumnPage(ColumnPageEncoderMeta meta,
+ byte[] lvEncodedBytes, int size) throws MemoryException {
+ TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+ String compressorName = meta.getCompressorName();
TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
.newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
ColumnPage rowOffset = ColumnPage.newPage(
@@ -176,7 +179,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
rowOffset.putInt(counter, offset);
VarLengthColumnPageBase page;
- if (unsafe) {
+ if (isUnsafeEnabled(meta)) {
page = new UnsafeDecimalColumnPage(
new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
rowId);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
index 4e491c5..d82a873 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
@@ -18,9 +18,11 @@
package org.apache.carbondata.core.datastore.page.encoding;
import java.io.IOException;
+import java.util.BitSet;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
public interface ColumnPageDecoder {
@@ -29,6 +31,12 @@ public interface ColumnPageDecoder {
*/
ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException;
+ /**
+ * Apply decoding algorithm on input byte array and fill the vector here.
+ */
+ void decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
+ BitSet nullBits, boolean isLVEncoded) throws MemoryException, IOException;
+
ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index e6aafa0..03a43f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -49,6 +49,9 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
// Make it protected for RLEEncoderMeta
protected String compressorName;
+ // Whether the flow shoild go to fill complete vector while decoding the page.
+ private transient boolean fillCompleteVector;
+
public ColumnPageEncoderMeta() {
}
@@ -284,4 +287,12 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
public DataType getSchemaDataType() {
return columnSpec.getSchemaDataType();
}
+
+ public boolean isFillCompleteVector() {
+ return fillCompleteVector;
+ }
+
+ public void setFillCompleteVector(boolean fillCompleteVector) {
+ this.fillCompleteVector = fillCompleteVector;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 920a516..d3070b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -66,6 +66,21 @@ public abstract class EncodingFactory {
*/
public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
String compressor) throws IOException {
+ return createDecoder(encodings, encoderMetas, compressor, false);
+ }
+
+ /**
+ * Return new decoder based on encoder metadata read from file
+ * @param encodings encodings used to decode the page
+ * @param encoderMetas metadata of encodings to decode the data
+ * @param compressor Compressor name which will be used to decode data.
+ * @param fullVectorFill whether the flow should go to fill the given vector completely while
+ * decoding the data itself.
+ * @return decoder to decode page.
+ * @throws IOException
+ */
+ public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
+ String compressor, boolean fullVectorFill) throws IOException {
assert (encodings.size() >= 1);
assert (encoderMetas.size() == 1);
Encoding encoding = encodings.get(0);
@@ -74,16 +89,19 @@ public abstract class EncodingFactory {
DataInputStream in = new DataInputStream(stream);
if (encoding == DIRECT_COMPRESS || encoding == DIRECT_COMPRESS_VARCHAR) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
} else if (encoding == ADAPTIVE_INTEGRAL) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
} else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(),
@@ -91,12 +109,14 @@ public abstract class EncodingFactory {
.createDecoder(metadata);
} else if (encoding == ADAPTIVE_FLOATING) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
} else if (encoding == ADAPTIVE_DELTA_FLOATING) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
return new AdaptiveDeltaFloatingCodec(metadata.getSchemaDataType(),
@@ -108,12 +128,13 @@ public abstract class EncodingFactory {
return new RLECodec().createDecoder(metadata);
} else if (encoding == BOOL_BYTE) {
ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.setFillCompleteVector(fullVectorFill);
metadata.readFields(in);
return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
} else {
// for backward compatibility
ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
- return createDecoderLegacy(metadata, compressor);
+ return createDecoderLegacy(metadata, compressor, fullVectorFill);
}
}
@@ -121,6 +142,14 @@ public abstract class EncodingFactory {
* Old way of creating decoder, based on algorithm
*/
public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) {
+ return createDecoderLegacy(metadata, compressor, false);
+ }
+
+ /**
+ * Old way of creating decoder, based on algorithm
+ */
+ private ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor,
+ boolean fullVectorFill) {
if (null == metadata) {
throw new RuntimeException("internal error");
}
@@ -139,16 +168,19 @@ public abstract class EncodingFactory {
AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else if (codec instanceof AdaptiveDeltaIntegralCodec) {
AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else if (codec instanceof DirectCompressCodec) {
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else {
throw new RuntimeException("internal error");
@@ -161,30 +193,36 @@ public abstract class EncodingFactory {
AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else if (codec instanceof DirectCompressCodec) {
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else if (codec instanceof AdaptiveDeltaFloatingCodec) {
AdaptiveDeltaFloatingCodec adaptiveCodec = (AdaptiveDeltaFloatingCodec) codec;
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return codec.createDecoder(meta);
} else {
throw new RuntimeException("internal error");
}
} else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.BYTE_ARRAY) {
// no dictionary dimension
- return new DirectCompressCodec(stats.getDataType())
- .createDecoder(new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
+ ColumnPageEncoderMeta meta =
+ new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
+ return new DirectCompressCodec(stats.getDataType()).createDecoder(meta);
} else if (dataType == DataTypes.LEGACY_LONG) {
// In case of older versions like in V1 format it has special datatype to handle
AdaptiveIntegralCodec adaptiveCodec =
new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats, false);
ColumnPageEncoderMeta meta =
new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+ meta.setFillCompleteVector(fullVectorFill);
return adaptiveCodec.createDecoder(meta);
} else {
throw new RuntimeException("unsupported data type: " + stats.getDataType());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 9b0b574..1826798 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -35,6 +37,9 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
@@ -125,6 +130,18 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
return LazyColumnPage.newPage(page, converter);
}
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ page.setNullBits(nullBits);
+ if (page instanceof DecimalColumnPage) {
+ vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+ }
+ converter.decodeAndFillVector(page, vectorInfo);
+ }
+
@Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
@@ -226,6 +243,71 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
}
@Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ DataType vectorDataType = vector.getType();
+ if (vectorDataType == DataTypes.FLOAT) {
+ float floatFactor = factor.floatValue();
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putFloat(i, (max - byteData[i]) / floatFactor);
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putFloat(i, (max - shortData[i]) / floatFactor);
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putFloat(i, (max - shortInt) / floatFactor);
+ }
+ } else {
+ throw new RuntimeException("internal error: " + this.toString());
+ }
+ } else {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - byteData[i]) / factor);
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - shortData[i]) / factor);
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, (max - shortInt) / factor);
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - intData[i]) / factor);
+ }
+ } else {
+ throw new RuntimeException("Unsupported datatype : " + pageDataType);
+ }
+ }
+
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
+
+ @Override
public double decodeDouble(float value) {
throw new RuntimeException("internal error: " + debugInfo());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 0e61b33..0d7ad8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -27,6 +28,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -35,6 +37,10 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
@@ -119,9 +125,11 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
};
}
- @Override public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+ @Override
+ public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
return new ColumnPageDecoder() {
- @Override public ColumnPage decode(byte[] input, int offset, int length)
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length)
throws MemoryException, IOException {
ColumnPage page = null;
if (DataTypes.isDecimal(meta.getSchemaDataType())) {
@@ -132,7 +140,23 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
return LazyColumnPage.newPage(page, converter);
}
- @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage page = null;
+ if (DataTypes.isDecimal(meta.getSchemaDataType())) {
+ page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+ vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+ } else {
+ page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ }
+ page.setNullBits(nullBits);
+ converter.decodeAndFillVector(page, vectorInfo);
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
}
@@ -272,5 +296,169 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
// this codec is for integer type only
throw new RuntimeException("internal error");
}
+
+ @Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType vectorDataType = vector.getType();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
+
+ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+ DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ if (vectorDataType == DataTypes.SHORT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putShort(i, (short) (max - byteData[i]));
+ }
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) (max - byteData[i]));
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - byteData[i]));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - byteData[i]) * 1000);
+ }
+ } else if (vectorDataType == DataTypes.BOOLEAN) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putByte(i, (byte) (max - byteData[i]));
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]);
+ vector.putDecimal(i, decimal, precision);
+ }
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - byteData[i]));
+ }
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ if (vectorDataType == DataTypes.SHORT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putShort(i, (short) (max - shortData[i]));
+ }
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) (max - shortData[i]));
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - shortData[i]));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - shortData[i]) * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]);
+ vector.putDecimal(i, decimal, precision);
+ }
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - shortData[i]));
+ }
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putInt(i, (int) (max - shortInt));
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, (max - shortInt));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, (max - shortInt) * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ BigDecimal decimal = decimalConverter.getDecimal(max - shortInt);
+ vector.putDecimal(i, decimal, precision);
+ }
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, (max - shortInt));
+ }
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) (max - intData[i]));
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - intData[i]));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - intData[i]) * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]);
+ vector.putDecimal(i, decimal, precision);
+ }
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (max - intData[i]));
+ }
+ }
+ } else if (pageDataType == DataTypes.LONG) {
+ long[] longData = columnPage.getLongPage();
+ if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - longData[i]));
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, (max - longData[i]) * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int precision = vectorInfo.measure.getMeasure().getPrecision();
+ for (int i = 0; i < pageSize; i++) {
+ BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]);
+ vector.putDecimal(i, decimal, precision);
+ }
+ }
+ } else {
+ throw new RuntimeException("Unsupported datatype : " + pageDataType);
+ }
+ }
+
};
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 836af26..38bf9b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -34,6 +36,9 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
@@ -113,7 +118,20 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
return LazyColumnPage.newPage(page, converter);
}
- @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ page.setNullBits(nullBits);
+ if (page instanceof DecimalColumnPage) {
+ vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+ }
+ converter.decodeAndFillVector(page, vectorInfo);
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
}
@@ -226,5 +244,69 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
public double decodeDouble(double value) {
throw new RuntimeException("internal error: " + debugInfo());
}
+
+ @Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ DataType vectorDataType = vector.getType();
+ if (vectorDataType == DataTypes.FLOAT) {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putFloat(i, (byteData[i] / floatFactor));
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putFloat(i, (shortData[i] / floatFactor));
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putFloat(i, (shortInt / floatFactor));
+ }
+ } else {
+ throw new RuntimeException("internal error: " + this.toString());
+ }
+ } else {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (byteData[i] / factor));
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (shortData[i] / factor));
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, (shortInt / factor));
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, (intData[i] / factor));
+ }
+ } else {
+ throw new RuntimeException("Unsupported datatype : " + pageDataType);
+ }
+ }
+
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
};
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index f1c0ea0..bdf5373 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;
@@ -111,6 +117,21 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
return LazyColumnPage.newPage(page, converter);
}
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage page = null;
+ if (DataTypes.isDecimal(meta.getSchemaDataType())) {
+ page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+ vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+ } else {
+ page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ }
+ page.setNullBits(nullBits);
+ converter.decodeAndFillVector(page, vectorInfo);
+ }
+
@Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
@@ -248,6 +269,142 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
public double decodeDouble(double value) {
throw new RuntimeException("internal error: " + debugInfo());
}
+
+ @Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType vectorDataType = vector.getType();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
+
+ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+ DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ if (vectorDataType == DataTypes.SHORT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putShort(i, (short) byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, byteData[i] * 1000);
+ }
+ } else if (vectorDataType == DataTypes.BOOLEAN) {
+ vector.putBytes(0, pageSize, byteData, 0);
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, byteData[i]);
+ }
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ if (vectorDataType == DataTypes.SHORT) {
+ vector.putShorts(0, pageSize, shortData, 0);
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) shortData[i]);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, shortData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, shortData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, shortData[i]);
+ }
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putInt(i, shortInt);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, shortInt);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, shortInt * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+ decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, shortInt);
+ }
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ vector.putInts(0, pageSize, intData, 0);
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, intData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, intData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, intData[i]);
+ }
+ }
+ } else if (pageDataType == DataTypes.LONG) {
+ long[] longData = columnPage.getLongPage();
+ if (vectorDataType == DataTypes.LONG) {
+ vector.putLongs(0, pageSize, longData, 0);
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, longData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+ }
+ } else {
+ double[] doubleData = columnPage.getDoublePage();
+ vector.putDoubles(0, pageSize, doubleData, 0);
+ }
+ }
};
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index aa03ec1..4d1e6e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.compress;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.format.Encoding;
/**
@@ -95,10 +101,25 @@ public class DirectCompressCodec implements ColumnPageCodec {
return LazyColumnPage.newPage(decodedPage, converter);
}
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ ColumnPage decodedPage;
+ if (DataTypes.isDecimal(dataType)) {
+ decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+ vectorInfo.decimalConverter = ((DecimalColumnPage) decodedPage).getDecimalConverter();
+ } else {
+ decodedPage = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+ }
+ decodedPage.setNullBits(nullBits);
+ converter.decodeAndFillVector(decodedPage, vectorInfo);
+ }
+
@Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
- throws MemoryException, IOException {
- return LazyColumnPage.newPage(
- ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
+ throws MemoryException, IOException {
+ return LazyColumnPage
+ .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
}
};
}
@@ -178,6 +199,149 @@ public class DirectCompressCodec implements ColumnPageCodec {
public double decodeDouble(double value) {
return value;
}
+
+ @Override
+ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ BitSet nullBits = columnPage.getNullBits();
+ DataType vectorDataType = vector.getType();
+ DataType pageDataType = columnPage.getDataType();
+ int pageSize = columnPage.getPageSize();
+ BitSet deletedRows = vectorInfo.deletedRows;
+ fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+ if (deletedRows == null || deletedRows.isEmpty()) {
+ for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+ vector.putNull(i);
+ }
+ }
+ }
+
+ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+ DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+ if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+ byte[] byteData = columnPage.getBytePage();
+ if (vectorDataType == DataTypes.SHORT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putShort(i, (short) byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, byteData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, byteData[i] * 1000);
+ }
+ } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
+ vector.putBytes(0, pageSize, byteData, 0);
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, byteData[i]);
+ }
+ }
+ } else if (pageDataType == DataTypes.SHORT) {
+ short[] shortData = columnPage.getShortPage();
+ if (vectorDataType == DataTypes.SHORT) {
+ vector.putShorts(0, pageSize, shortData, 0);
+ } else if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putInt(i, (int) shortData[i]);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, shortData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, shortData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, shortData[i]);
+ }
+ }
+
+ } else if (pageDataType == DataTypes.SHORT_INT) {
+ byte[] shortIntPage = columnPage.getShortIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putInt(i, shortInt);
+ }
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, shortInt);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putLong(i, shortInt * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+ decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+ vector.putDouble(i, shortInt);
+ }
+ }
+ } else if (pageDataType == DataTypes.INT) {
+ int[] intData = columnPage.getIntPage();
+ if (vectorDataType == DataTypes.INT) {
+ vector.putInts(0, pageSize, intData, 0);
+ } else if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, intData[i]);
+ }
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, intData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+ } else {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putDouble(i, intData[i]);
+ }
+ }
+ } else if (pageDataType == DataTypes.LONG) {
+ long[] longData = columnPage.getLongPage();
+ if (vectorDataType == DataTypes.LONG) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, longData[i]);
+ }
+ vector.putLongs(0, pageSize, longData, 0);
+ } else if (vectorDataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < pageSize; i++) {
+ vector.putLong(i, longData[i] * 1000);
+ }
+ } else if (DataTypes.isDecimal(vectorDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+ }
+ } else if (DataTypes.isDecimal(pageDataType)) {
+ DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+ decimalConverter.fillVector(columnPage.getByteArrayPage(), pageSize, vectorInfo,
+ columnPage.getNullBits());
+ } else {
+ double[] doubleData = columnPage.getDoublePage();
+ vector.putDoubles(0, pageSize, doubleData, 0);
+ }
+ }
};
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index e7d4118..c9b47db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.format.Encoding;
/**
@@ -314,6 +316,13 @@ public class RLECodec implements ColumnPageCodec {
return resultPage;
}
+ @Override
+ public void decodeAndFillVector(byte[] input, int offset, int length,
+ ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+ throws MemoryException, IOException {
+ throw new UnsupportedOperationException("Not supposed to be called here");
+ }
+
@Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
throws MemoryException, IOException {
return decode(input, offset, length);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index a49eced..67d70e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
*/
public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator {
- private static final int cutOffDate = Integer.MAX_VALUE >> 1;
+ public static final int cutOffDate = Integer.MAX_VALUE >> 1;
private static final long SECONDS_PER_DAY = 60 * 60 * 24L;
public static final long MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index a8da6d4..89a3168 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.core.metadata.datatype;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
+import java.util.BitSet;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.DataTypeUtil;
/**
@@ -72,6 +75,8 @@ public final class DecimalConverterFactory {
BigDecimal getDecimal(Object valueToBeConverted);
+ void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info, BitSet nullBitset);
+
int getSize();
DecimalConverterType getDecimalConverterType();
@@ -80,7 +85,7 @@ public final class DecimalConverterFactory {
public static class DecimalIntConverter implements DecimalConverter {
- private int scale;
+ protected int scale;
DecimalIntConverter(int scale) {
this.scale = scale;
@@ -95,6 +100,51 @@ public final class DecimalConverterFactory {
return BigDecimal.valueOf((Long) valueToBeConverted, scale);
}
+ @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+ BitSet nullBitset) {
+ // TODO we need to find way to directly set to vector with out conversion. This way is very
+ // inefficient.
+ CarbonColumnVector vector = info.vector;
+ int precision = info.measure.getMeasure().getPrecision();
+ if (valuesToBeConverted instanceof byte[]) {
+ byte[] data = (byte[]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+ }
+ }
+ } else if (valuesToBeConverted instanceof short[]) {
+ short[] data = (short[]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+ }
+ }
+ } else if (valuesToBeConverted instanceof int[]) {
+ int[] data = (int[]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+ }
+ }
+ } else if (valuesToBeConverted instanceof long[]) {
+ long[] data = (long[]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+ }
+ }
+ }
+ }
+
@Override public int getSize() {
return 4;
}
@@ -104,12 +154,10 @@ public final class DecimalConverterFactory {
}
}
- public static class DecimalLongConverter implements DecimalConverter {
-
- private int scale;
+ public static class DecimalLongConverter extends DecimalIntConverter {
DecimalLongConverter(int scale) {
- this.scale = scale;
+ super(scale);
}
@Override public Object convert(BigDecimal decimal) {
@@ -173,6 +221,23 @@ public final class DecimalConverterFactory {
return new BigDecimal(bigInteger, scale);
}
+ @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+ BitSet nullBitset) {
+ CarbonColumnVector vector = info.vector;
+ int precision = info.measure.getMeasure().getPrecision();
+ if (valuesToBeConverted instanceof byte[][]) {
+ byte[][] data = (byte[][]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ BigInteger bigInteger = new BigInteger(data[i]);
+ vector.putDecimal(i, new BigDecimal(bigInteger, scale), precision);
+ }
+ }
+ }
+ }
+
@Override public int getSize() {
return numBytes;
}
@@ -194,6 +259,22 @@ public final class DecimalConverterFactory {
return DataTypeUtil.byteToBigDecimal((byte[]) valueToBeConverted);
}
+ @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+ BitSet nullBitset) {
+ CarbonColumnVector vector = info.vector;
+ int precision = info.measure.getMeasure().getPrecision();
+ if (valuesToBeConverted instanceof byte[][]) {
+ byte[][] data = (byte[][]) valuesToBeConverted;
+ for (int i = 0; i < size; i++) {
+ if (nullBitset.get(i)) {
+ vector.putNull(i);
+ } else {
+ vector.putDecimal(i, DataTypeUtil.byteToBigDecimal(data[i]), precision);
+ }
+ }
+ }
+ }
+
@Override public int getSize() {
return -1;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
index d68e4e9..ac50d7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
@@ -57,4 +57,8 @@ public class DeleteDeltaVo {
public boolean containsRow(int counter) {
return bitSet.get(counter);
}
+
+ public BitSet getBitSet() {
+ return bitSet;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 8695d90..430a555 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -18,10 +18,13 @@ package org.apache.carbondata.core.scan.collector.impl;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.List;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
@@ -30,11 +33,19 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
+import org.apache.log4j.Logger;
+
/**
* It is not a collector it is just a scanned result holder.
*/
public class DictionaryBasedVectorResultCollector extends AbstractScannedResultCollector {
+ /**
+ * logger of result collector factory
+ */
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(DictionaryBasedVectorResultCollector.class.getName());
+
protected ProjectionDimension[] queryDimensions;
protected ProjectionMeasure[] queryMeasures;
@@ -51,8 +62,14 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
private ColumnVectorInfo[] implictColumnInfo;
+ private boolean isDirectVectorFill;
+
public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
+ this.isDirectVectorFill = blockExecutionInfos.isDirectVectorFill();
+ if (this.isDirectVectorFill) {
+ LOGGER.info("Direct pagewise vector fill collector is used to scan and collect the data");
+ }
// initialize only if the current block is not a restructured block else the initialization
// will be taken care by RestructureBasedVectorResultCollector
if (!blockExecutionInfos.isRestructuredBlock()) {
@@ -141,29 +158,33 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
@Override
public void collectResultInColumnarBatch(BlockletScannedResult scannedResult,
CarbonColumnarBatch columnarBatch) {
- int numberOfPages = scannedResult.numberOfpages();
- int filteredRows = 0;
- while (scannedResult.getCurrentPageCounter() < numberOfPages) {
- int currentPageRowCount = scannedResult.getCurrentPageRowCount();
- if (currentPageRowCount == 0) {
- scannedResult.incrementPageCounter();
- continue;
- }
- int rowCounter = scannedResult.getRowCounter();
- int availableRows = currentPageRowCount - rowCounter;
- // getRowCounter holds total number or rows being placed in Vector. Calculate the
- // Left over space through getRowCounter only.
- int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getRowCounter();
- requiredRows = Math.min(requiredRows, availableRows);
- if (requiredRows < 1) {
- return;
+ if (isDirectVectorFill) {
+ collectResultInColumnarBatchDirect(scannedResult, columnarBatch);
+ } else {
+ int numberOfPages = scannedResult.numberOfpages();
+ int filteredRows = 0;
+ while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+ int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+ if (currentPageRowCount == 0) {
+ scannedResult.incrementPageCounter();
+ continue;
+ }
+ int rowCounter = scannedResult.getRowCounter();
+ int availableRows = currentPageRowCount - rowCounter;
+ // getRowCounter holds total number or rows being placed in Vector. Calculate the
+ // Left over space through getRowCounter only.
+ int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getRowCounter();
+ requiredRows = Math.min(requiredRows, availableRows);
+ if (requiredRows < 1) {
+ return;
+ }
+ fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+ filteredRows = scannedResult.markFilteredRows(columnarBatch, rowCounter, requiredRows,
+ columnarBatch.getRowCounter());
+ fillResultToColumnarBatch(scannedResult, columnarBatch, rowCounter, availableRows,
+ requiredRows);
+ columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
}
- fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
- filteredRows = scannedResult.markFilteredRows(
- columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
- fillResultToColumnarBatch(
- scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
- columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
}
}
@@ -198,4 +219,51 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
}
}
+ /**
+ * Fill the vector during the page decoding.
+ */
+ private void collectResultInColumnarBatchDirect(BlockletScannedResult scannedResult,
+ CarbonColumnarBatch columnarBatch) {
+ int numberOfPages = scannedResult.numberOfpages();
+ while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+ int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+ if (currentPageRowCount == 0) {
+ scannedResult.incrementPageCounter(null);
+ continue;
+ }
+ DeleteDeltaVo deltaVo = scannedResult.getCurrentDeleteDeltaVo();
+ BitSet bitSet = null;
+ int deletedRows = 0;
+ if (deltaVo != null) {
+ bitSet = deltaVo.getBitSet();
+ deletedRows = bitSet.cardinality();
+ }
+ fillColumnVectorDetails(columnarBatch, bitSet);
+ fillResultToColumnarBatch(scannedResult);
+ columnarBatch.setActualSize(currentPageRowCount - deletedRows);
+ scannedResult.setRowCounter(currentPageRowCount - deletedRows);
+ scannedResult.incrementPageCounter(null);
+ return;
+ }
+ }
+
+ private void fillResultToColumnarBatch(BlockletScannedResult scannedResult) {
+ scannedResult.fillDataChunks(dictionaryInfo, noDictionaryInfo, measureColumnInfo,
+ measureInfo.getMeasureOrdinals());
+
+ }
+
+ private void fillColumnVectorDetails(CarbonColumnarBatch columnarBatch,
+ BitSet deltaBitSet) {
+ for (int i = 0; i < allColumnInfo.length; i++) {
+ allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+ allColumnInfo[i].vector = columnarBatch.columnVectors[i];
+ allColumnInfo[i].deletedRows = deltaBitSet;
+ if (null != allColumnInfo[i].dimension) {
+ allColumnInfo[i].vector.setBlockDataType(dimensionInfo.dataType[i]);
+ }
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6a6a929..fed0faf 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -478,6 +478,19 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
} else {
blockExecutionInfo.setPrefetchBlocklet(queryModel.isPreFetchData());
}
+ // In case of fg datamap it should not go to direct fill.
+ boolean fgDataMapPathPresent = false;
+ for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
+ fgDataMapPathPresent = blockInfo.getDataMapWriterPath() != null;
+ if (fgDataMapPathPresent) {
+ queryModel.setDirectVectorFill(false);
+ break;
+ }
+ }
+
+ blockExecutionInfo
+ .setDirectVectorFill(queryModel.isDirectVectorFill());
+
blockExecutionInfo
.setTotalNumberOfMeasureToRead(segmentProperties.getMeasuresOrdinalToChunkMapping().size());
blockExecutionInfo.setComplexDimensionInfoMap(QueryUtil
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index e737b0e..f0ef23b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -217,6 +217,11 @@ public class BlockExecutionInfo {
private QueryStatisticsModel queryStatisticsModel;
/**
+ * It fills the vector directly from decoded column page with out any staging and conversions
+ */
+ private boolean isDirectVectorFill;
+
+ /**
* @param blockIndex the tableBlock to set
*/
public void setDataBlock(AbstractIndex blockIndex) {
@@ -625,4 +630,12 @@ public class BlockExecutionInfo {
public void setQueryStatisticsModel(QueryStatisticsModel queryStatisticsModel) {
this.queryStatisticsModel = queryStatisticsModel;
}
+
+ public boolean isDirectVectorFill() {
+ return isDirectVectorFill && !isRestructuredBlock;
+ }
+
+ public void setDirectVectorFill(boolean directVectorFill) {
+ isDirectVectorFill = directVectorFill;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 22e1e72..49157f9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -760,7 +760,7 @@ public class QueryUtil {
vector.putNull(vectorRow);
} else {
if (dt == DataTypes.STRING) {
- vector.putBytes(vectorRow, 0, length, value);
+ vector.putByteArray(vectorRow, 0, length, value);
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
} else if (dt == DataTypes.BYTE) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 0951da0..d7dcee0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -125,7 +125,11 @@ public class QueryModel {
private boolean preFetchData = true;
/**
- * It fills the vector directly from decoded column page with out any staging and conversions
+ * It fills the vector directly from decoded column page with out any staging and conversions.
+ * Execution engine can set this filed to true in case of vector flow. Note that execution engine
+ * should make sure that batch size vector should be greater than or equal to column page size.
+ * In this flow only pages will be pruned and decode the page and fill the complete page data to
+ * vector, so it is execution engine responsibility to filter the rows at row level.
*/
private boolean isDirectVectorFill;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 9191d08..4963441 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -73,6 +73,11 @@ public abstract class BlockletScannedResult {
private int[] pageFilteredRowCount;
/**
+ * Filtered pages to be decoded and loaded to vector.
+ */
+ private int[] pageIdFiltered;
+
+ /**
* to keep track of number of rows process
*/
protected int rowCounter;
@@ -304,7 +309,7 @@ public abstract class BlockletScannedResult {
j :
pageFilteredRowId[pageCounter][j]);
}
- vector.putBytes(vectorOffset++,
+ vector.putByteArray(vectorOffset++,
data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
}
}
@@ -342,6 +347,19 @@ public abstract class BlockletScannedResult {
}
/**
+ * Just increment the page counter and reset the remaining counters.
+ */
+ public void incrementPageCounter(ColumnVectorInfo[] vectorInfos) {
+ rowCounter = 0;
+ currentRow = -1;
+ pageCounter++;
+ if (null != deletedRecordMap && pageCounter < pageIdFiltered.length) {
+ currentDeleteDeltaVo =
+ deletedRecordMap.get(blockletNumber + "_" + pageIdFiltered[pageCounter]);
+ }
+ }
+
+ /**
* This case is used only in case of compaction, since it does not use filter flow.
*/
public void fillDataChunks() {
@@ -369,6 +387,36 @@ public abstract class BlockletScannedResult {
pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
}
+ /**
+ * Fill all the vectors with data by decompressing/decoding the column page
+ */
+ public void fillDataChunks(ColumnVectorInfo[] dictionaryInfo, ColumnVectorInfo[] noDictionaryInfo,
+ ColumnVectorInfo[] msrVectorInfo, int[] measuresOrdinal) {
+ freeDataChunkMemory();
+ if (pageCounter >= pageFilteredRowCount.length) {
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+ dimRawColumnChunks[dictionaryColumnChunkIndexes[i]]
+ .convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], dictionaryInfo[i]);
+ }
+ for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+ dimRawColumnChunks[noDictionaryColumnChunkIndexes[i]]
+ .convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], noDictionaryInfo[i]);
+ }
+
+ for (int i = 0; i < measuresOrdinal.length; i++) {
+ msrRawColumnChunks[measuresOrdinal[i]]
+ .convertToColumnPageAndFillVector(pageIdFiltered[pageCounter], msrVectorInfo[i]);
+ }
+ QueryStatistic pageUncompressTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME);
+ pageUncompressTime.addCountStatistic(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME,
+ pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
+ }
+
// free the memory for the last page chunk
private void freeDataChunkMemory() {
for (int i = 0; i < dimensionColumnPages.length; i++) {
@@ -390,6 +438,14 @@ public abstract class BlockletScannedResult {
return pageFilteredRowCount.length;
}
+ public int[] getPageIdFiltered() {
+ return pageIdFiltered;
+ }
+
+ public void setPageIdFiltered(int[] pageIdFiltered) {
+ this.pageIdFiltered = pageIdFiltered;
+ }
+
/**
* Get total rows in the current page
*
@@ -513,7 +569,13 @@ public abstract class BlockletScannedResult {
// if deleted recors map is present for this block
// then get the first page deleted vo
if (null != deletedRecordMap) {
- currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter);
+ String key;
+ if (pageIdFiltered != null) {
+ key = blockletNumber + '_' + pageIdFiltered[pageCounter];
+ } else {
+ key = blockletNumber + '_' + pageCounter;
+ }
+ currentDeleteDeltaVo = deletedRecordMap.get(key);
}
}
@@ -616,6 +678,12 @@ public abstract class BlockletScannedResult {
*/
public void setPageFilteredRowCount(int[] pageFilteredRowCount) {
this.pageFilteredRowCount = pageFilteredRowCount;
+ if (pageIdFiltered == null) {
+ pageIdFiltered = new int[pageFilteredRowCount.length];
+ for (int i = 0; i < pageIdFiltered.length; i++) {
+ pageIdFiltered[i] = i;
+ }
+ }
}
/**
@@ -714,6 +782,10 @@ public abstract class BlockletScannedResult {
return rowsFiltered;
}
+ public DeleteDeltaVo getCurrentDeleteDeltaVo() {
+ return currentDeleteDeltaVo;
+ }
+
/**
* Below method will be used to check row got deleted
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index dd0e8b9..f670884 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -27,18 +27,26 @@ public interface CarbonColumnVector {
void putFloat(int rowId, float value);
+ void putFloats(int rowId, int count, float[] src, int srcIndex);
+
void putShort(int rowId, short value);
void putShorts(int rowId, int count, short value);
+ void putShorts(int rowId, int count, short[] src, int srcIndex);
+
void putInt(int rowId, int value);
void putInts(int rowId, int count, int value);
+ void putInts(int rowId, int count, int[] src, int srcIndex);
+
void putLong(int rowId, long value);
void putLongs(int rowId, int count, long value);
+ void putLongs(int rowId, int count, long[] src, int srcIndex);
+
void putDecimal(int rowId, BigDecimal value, int precision);
void putDecimals(int rowId, int count, BigDecimal value, int precision);
@@ -47,14 +55,18 @@ public interface CarbonColumnVector {
void putDoubles(int rowId, int count, double value);
- void putBytes(int rowId, byte[] value);
+ void putDoubles(int rowId, int count, double[] src, int srcIndex);
- void putBytes(int rowId, int count, byte[] value);
+ void putByteArray(int rowId, byte[] value);
- void putBytes(int rowId, int offset, int length, byte[] value);
+ void putByteArray(int rowId, int offset, int length, byte[] value);
void putByte(int rowId, byte value);
+ void putBytes(int rowId, int count, byte[] value);
+
+ void putBytes(int rowId, int count, byte[] src, int srcIndex);
+
void putNull(int rowId);
void putNulls(int rowId, int count);