You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/17 14:31:02 UTC
[49/50] [abbrv] carbondata git commit: [CARBONDATA-2477]Fixed No
dictionary Complex type with double/date/decimal data type
[CARBONDATA-2477]Fixed No dictionary Complex type with double/date/decimal data type
Problem: SDK create table with No Dictionary complex type is failing when complex type child contain double/date/decimal data type
Solution: In complex type validation , it is not allowing double/date/decimal data , need to remove the same
Changed no dictionary complex type storage format, instead of storing length in int , now storing in short to reduce storage space
This closes #2304
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6297ea0b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6297ea0b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6297ea0b
Branch: refs/heads/spark-2.3
Commit: 6297ea0b4092539fa0aa2c6f772d6984850c6110
Parents: cf1b50b
Author: kumarvishal09 <ku...@gmail.com>
Authored: Mon May 14 14:17:38 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu May 17 19:05:30 2018 +0530
----------------------------------------------------------------------
.../carbondata/core/datastore/ColumnType.java | 14 ++-
.../core/datastore/page/ColumnPage.java | 82 ++++++++++++++++--
.../core/datastore/page/ComplexColumnPage.java | 16 +++-
.../core/datastore/page/LazyColumnPage.java | 13 ++-
.../datastore/page/SafeFixLengthColumnPage.java | 25 +++++-
.../datastore/page/SafeVarLengthColumnPage.java | 21 +++++
.../page/UnsafeFixLengthColumnPage.java | 39 ++++++++-
.../datastore/page/VarLengthColumnPageBase.java | 90 ++++++++++++++++++--
.../page/encoding/ColumnPageEncoder.java | 9 +-
.../scan/complextypes/PrimitiveQueryType.java | 4 +-
.../core/scan/complextypes/StructQueryType.java | 8 +-
.../apache/carbondata/core/util/ByteUtil.java | 9 ++
...ransactionalCarbonTableWithComplexType.scala | 76 ++++++++++++++++-
.../processing/datatypes/ArrayDataType.java | 7 ++
.../processing/datatypes/GenericDataType.java | 4 +
.../processing/datatypes/PrimitiveDataType.java | 17 ++--
.../processing/datatypes/StructDataType.java | 30 +++----
.../carbondata/processing/store/TablePage.java | 6 +-
.../sdk/file/CarbonWriterBuilder.java | 9 --
19 files changed, 407 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
index f98307b..8bbf12d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
@@ -31,7 +31,13 @@ public enum ColumnType {
COMPLEX,
// measure column, numerical data type
- MEASURE;
+ MEASURE,
+
+ COMPLEX_STRUCT,
+
+ COMPLEX_ARRAY,
+
+ COMPLEX_PRIMITIVE;
public static ColumnType valueOf(int ordinal) {
if (ordinal == GLOBAL_DICTIONARY.ordinal()) {
@@ -44,6 +50,12 @@ public enum ColumnType {
return COMPLEX;
} else if (ordinal == MEASURE.ordinal()) {
return MEASURE;
+ } else if (ordinal == COMPLEX_STRUCT.ordinal()) {
+ return COMPLEX_STRUCT;
+ } else if (ordinal == COMPLEX_ARRAY.ordinal()) {
+ return COMPLEX_ARRAY;
+ } else if (ordinal == COMPLEX_PRIMITIVE.ordinal()) {
+ return COMPLEX_PRIMITIVE;
} else {
throw new RuntimeException("create ColumnType with invalid ordinal: " + ordinal);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 68269fb..69ed437 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -22,6 +22,7 @@ import java.math.BigDecimal;
import java.util.BitSet;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
@@ -153,6 +154,19 @@ public abstract class ColumnPage {
}
}
+ private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec,
+ DataType dataType, int pageSize, int eachValueSize) {
+ if (unsafe) {
+ try {
+ return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize);
+ } catch (MemoryException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize);
+ }
+ }
+
private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
int pageSize) {
if (DataTypes.isDecimal(dataType)) {
@@ -281,8 +295,31 @@ public abstract class ColumnPage {
}
private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedByteArray) throws MemoryException {
- return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray);
+ byte[] lvEncodedByteArray, int lvLength) throws MemoryException {
+ return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength);
+ }
+
+ private static ColumnPage newComplexLVBytesPage(TableSpec.ColumnSpec columnSpec,
+ byte[] lvEncodedByteArray, int lvLength) throws MemoryException {
+ return VarLengthColumnPageBase
+ .newComplexLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength);
+ }
+
+ private static ColumnPage newFixedByteArrayPage(TableSpec.ColumnSpec columnSpec,
+ byte[] lvEncodedByteArray, int eachValueSize) throws MemoryException {
+ int pageSize = lvEncodedByteArray.length / eachValueSize;
+ ColumnPage fixLengthByteArrayPage =
+ createFixLengthByteArrayPage(columnSpec, columnSpec.getSchemaDataType(), pageSize,
+ eachValueSize);
+ byte[] data = null;
+ int offset = 0;
+ for (int i = 0; i < pageSize; i++) {
+ data = new byte[eachValueSize];
+ System.arraycopy(lvEncodedByteArray, offset, data, 0, eachValueSize);
+ fixLengthByteArrayPage.putBytes(i, data);
+ offset += eachValueSize;
+ }
+ return fixLengthByteArrayPage;
}
/**
@@ -607,6 +644,20 @@ public abstract class ColumnPage {
public abstract byte[] getLVFlattenedBytePage() throws IOException;
/**
+ * For complex type columns
+ * @return
+ * @throws IOException
+ */
+ public abstract byte[] getComplexChildrenLVFlattenedBytePage() throws IOException;
+
+ /**
+ * For complex type columns
+ * @return
+ * @throws IOException
+ */
+ public abstract byte[] getComplexParentFlattenedBytePage() throws IOException;
+
+ /**
* For decimals
*/
public abstract byte[] getDecimalPage();
@@ -638,6 +689,13 @@ public abstract class ColumnPage {
return compressor.compressDouble(getDoublePage());
} else if (DataTypes.isDecimal(dataType)) {
return compressor.compressByte(getDecimalPage());
+ } else if (dataType == DataTypes.BYTE_ARRAY
+ && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+ return compressor.compressByte(getComplexChildrenLVFlattenedBytePage());
+ } else if (dataType == DataTypes.BYTE_ARRAY && (
+ columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT
+ || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY)) {
+ return compressor.compressByte(getComplexParentFlattenedBytePage());
} else if (dataType == DataTypes.BYTE_ARRAY) {
return compressor.compressByte(getLVFlattenedBytePage());
} else {
@@ -676,12 +734,26 @@ public abstract class ColumnPage {
} else if (storeDataType == DataTypes.DOUBLE) {
double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
return newDoublePage(columnSpec, doubleData);
+ } else if (storeDataType == DataTypes.BYTE_ARRAY
+ && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+ byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+ return newComplexLVBytesPage(columnSpec, lvVarBytes,
+ CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+ } else if (storeDataType == DataTypes.BYTE_ARRAY
+ && columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT) {
+ byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+ return newFixedByteArrayPage(columnSpec, lvVarBytes,
+ CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+ } else if (storeDataType == DataTypes.BYTE_ARRAY
+ && columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY) {
+ byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+ return newFixedByteArrayPage(columnSpec, lvVarBytes, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
} else if (storeDataType == DataTypes.BYTE_ARRAY) {
byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
- return newLVBytesPage(columnSpec, lvVarBytes);
+ return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
} else {
- throw new UnsupportedOperationException("unsupport uncompress column page: " +
- meta.getStoreDataType());
+ throw new UnsupportedOperationException(
+ "unsupport uncompress column page: " + meta.getStoreDataType());
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index 8cb18e9..07dc837 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.ColumnType;
// Represent a complex column page, e.g. Array, Struct type column
public class ComplexColumnPage {
@@ -34,17 +35,20 @@ public class ComplexColumnPage {
private List<ArrayList<byte[]>> complexColumnData;
// depth is the number of column after complex type is expanded. It is from 1 to N
- private final int depth;
-
private final int pageSize;
- public ComplexColumnPage(int pageSize, int depth) {
+ private int depth;
+
+ private List<ColumnType> complexColumnType;
+
+ public ComplexColumnPage(int pageSize, List<ColumnType> complexColumnType) {
this.pageSize = pageSize;
- this.depth = depth;
+ this.depth = complexColumnType.size();
complexColumnData = new ArrayList<>(depth);
for (int i = 0; i < depth; i++) {
complexColumnData.add(new ArrayList<byte[]>());
}
+ this.complexColumnType = complexColumnType;
}
public void putComplexData(int rowId, int depth, List<byte[]> value) {
@@ -79,4 +83,8 @@ public class ComplexColumnPage {
public int getPageSize() {
return pageSize;
}
+
+ public ColumnType getComplexColumnType(int isDepth) {
+ return complexColumnType.get(isDepth);
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index ce8aaae..255e078 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.datastore.page;
+import java.io.IOException;
import java.math.BigDecimal;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -171,7 +172,17 @@ public class LazyColumnPage extends ColumnPage {
}
@Override
- public byte[] getLVFlattenedBytePage() {
+ public byte[] getLVFlattenedBytePage() throws IOException {
+ throw new UnsupportedOperationException("internal error");
+ }
+
+ @Override
+ public byte[] getComplexChildrenLVFlattenedBytePage() {
+ throw new UnsupportedOperationException("internal error");
+ }
+
+ @Override
+ public byte[] getComplexParentFlattenedBytePage() throws IOException {
throw new UnsupportedOperationException("internal error");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 1e4445a..2304614 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.datastore.page;
+import java.io.IOException;
import java.math.BigDecimal;
import org.apache.carbondata.core.datastore.TableSpec;
@@ -37,11 +38,19 @@ public class SafeFixLengthColumnPage extends ColumnPage {
private float[] floatData;
private double[] doubleData;
private byte[] shortIntData;
+ private byte[][] fixedLengthdata;
+
SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
super(columnSpec, dataType, pageSize);
}
+ SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
+ int eachRowSize) {
+ super(columnSpec, dataType, pageSize);
+ this.fixedLengthdata = new byte[pageSize][];
+ }
+
/**
* Set byte value at rowId
*/
@@ -87,7 +96,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
*/
@Override
public void putBytes(int rowId, byte[] bytes) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ this.fixedLengthdata[rowId] = bytes;
}
@Override
@@ -173,7 +182,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
@Override
public byte[] getBytes(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ return this.fixedLengthdata[rowId];
}
/**
@@ -241,10 +250,20 @@ public class SafeFixLengthColumnPage extends ColumnPage {
}
@Override
- public byte[] getLVFlattenedBytePage() {
+ public byte[] getLVFlattenedBytePage() throws IOException {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
+ @Override
+ public byte[] getComplexChildrenLVFlattenedBytePage() {
+ throw new UnsupportedOperationException("internal error");
+ }
+
+ @Override
+ public byte[] getComplexParentFlattenedBytePage() throws IOException {
+ throw new UnsupportedOperationException("internal error");
+ }
+
/**
* Set byte values to page
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 782b9dc..7b1ad20 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -82,6 +82,27 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
}
@Override
+ public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(stream);
+ for (byte[] byteArrayDatum : byteArrayData) {
+ out.writeShort((short)byteArrayDatum.length);
+ out.write(byteArrayDatum);
+ }
+ return stream.toByteArray();
+ }
+
+ @Override
+ public byte[] getComplexParentFlattenedBytePage() throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(stream);
+ for (byte[] byteArrayDatum : byteArrayData) {
+ out.write(byteArrayDatum);
+ }
+ return stream.toByteArray();
+ }
+
+ @Override
public byte[][] getByteArrayPage() {
return byteArrayData;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index c88dc0b..6847ab9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -44,6 +44,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
// base offset of memoryBlock
private long baseOffset;
+ private int eachRowSize;
+
private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
private static final int byteBits = DataTypes.BYTE.getSizeBits();
@@ -77,6 +79,19 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
}
}
+ UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
+ int eachRowSize)
+ throws MemoryException {
+ this(columnSpec, dataType, pageSize);
+ this.eachRowSize = eachRowSize;
+ if (dataType == DataTypes.BYTE_ARRAY) {
+ memoryBlock =
+ UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize);
+ baseAddress = memoryBlock.getBaseObject();
+ baseOffset = memoryBlock.getBaseOffset();
+ }
+ }
+
@Override
public void putByte(int rowId, byte value) {
long offset = rowId << byteBits;
@@ -118,7 +133,11 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
@Override
public void putBytes(int rowId, byte[] bytes) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ // copy the data to memory
+ long offset = (long)rowId * eachRowSize;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+ baseOffset + offset, bytes.length);
}
@Override
@@ -183,7 +202,14 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
@Override
public byte[] getBytes(int rowId) {
- throw new UnsupportedOperationException("invalid data type: " + dataType);
+ // creating a row
+ byte[] data = new byte[eachRowSize];
+ //copy the row from memory block based on offset
+ // offset position will be index * each column value length
+ CarbonUnsafe.getUnsafe().copyMemory(memoryBlock.getBaseObject(),
+ baseOffset + ((long)rowId * eachRowSize), data,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, eachRowSize);
+ return data;
}
@Override public byte[] getDecimalPage() {
@@ -267,6 +293,15 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
public byte[] getLVFlattenedBytePage() {
throw new UnsupportedOperationException("invalid data type: " + dataType);
}
+ @Override
+ public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+ throw new UnsupportedOperationException("invalid data type: " + dataType);
+ }
+
+ @Override
+ public byte[] getComplexParentFlattenedBytePage() {
+ throw new UnsupportedOperationException("internal error");
+ }
@Override
public void setBytePage(byte[] byteData) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index c6062c1..901758a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
@@ -115,7 +116,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
int size = decimalConverter.getSize();
if (size < 0) {
return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
- DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()));
+ DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
+ CarbonCommonConstants.INT_SIZE_IN_BYTE);
} else {
// Here the size is always fixed.
return getDecimalColumnPage(columnSpec, lvEncodedBytes, size);
@@ -125,9 +127,17 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
/**
* Create a new column page based on the LV (Length Value) encoded bytes
*/
- static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes)
- throws MemoryException {
- return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY);
+ static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
+ int lvLength) throws MemoryException {
+ return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
+ }
+
+ /**
+ * Create a new column page based on the LV (Length Value) encoded bytes
+ */
+ static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
+ byte[] lvEncodedBytes, int lvLength) throws MemoryException {
+ return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
}
private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
@@ -161,7 +171,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
}
private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedBytes, DataType dataType)
+ byte[] lvEncodedBytes, DataType dataType, int lvLength)
throws MemoryException {
// extract length and data, set them to rowOffset and unsafe memory correspondingly
int rowId = 0;
@@ -176,11 +186,48 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
rowOffset.add(offset);
rowLength.add(length);
- lvEncodedOffset += 4 + length;
+ lvEncodedOffset += lvLength + length;
+ rowId++;
+ }
+ rowOffset.add(offset);
+ VarLengthColumnPageBase page =
+ getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
+ rowLength, offset);
+ return page;
+ }
+
+ private static ColumnPage getComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
+ byte[] lvEncodedBytes, DataType dataType, int lvLength)
+ throws MemoryException {
+ // extract length and data, set them to rowOffset and unsafe memory correspondingly
+ int rowId = 0;
+ List<Integer> rowOffset = new ArrayList<>();
+ List<Integer> rowLength = new ArrayList<>();
+ int length;
+ int offset;
+ int lvEncodedOffset = 0;
+
+ // extract Length field in input and calculate total length
+ for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) {
+ length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset);
+ rowOffset.add(offset);
+ rowLength.add(length);
+ lvEncodedOffset += lvLength + length;
rowId++;
}
rowOffset.add(offset);
+ VarLengthColumnPageBase page =
+ getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
+ rowLength, offset);
+ return page;
+ }
+
+ private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSpec columnSpec,
+ byte[] lvEncodedBytes, DataType dataType, int lvLength, int rowId, List<Integer> rowOffset,
+ List<Integer> rowLength, int offset) throws MemoryException {
+ int lvEncodedOffset;
+ int length;
int numRows = rowId;
VarLengthColumnPageBase page;
@@ -202,10 +249,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
lvEncodedOffset = 0;
for (int i = 0; i < numRows; i++) {
length = rowLength.get(i);
- page.putBytes(i, lvEncodedBytes, lvEncodedOffset + 4, length);
- lvEncodedOffset += 4 + length;
+ page.putBytes(i, lvEncodedBytes, lvEncodedOffset + lvLength, length);
+ lvEncodedOffset += lvLength + length;
}
-
return page;
}
@@ -353,6 +399,32 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
return data;
}
+ @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+ // output LV encoded byte array
+ int offset = 0;
+ byte[] data = new byte[totalLength + pageSize * 2];
+ for (int rowId = 0; rowId < pageSize; rowId++) {
+ short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]);
+ ByteUtil.setShort(data, offset, length);
+ copyBytes(rowId, data, offset + 2, length);
+ offset += 2 + length;
+ }
+ return data;
+ }
+
+ @Override
+ public byte[] getComplexParentFlattenedBytePage() throws IOException {
+ // output LV encoded byte array
+ int offset = 0;
+ byte[] data = new byte[totalLength];
+ for (int rowId = 0; rowId < pageSize; rowId++) {
+ short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]);
+ copyBytes(rowId, data, offset, length);
+ offset += length;
+ }
+ return data;
+ }
+
@Override
public void convertValue(ColumnPageValueConverter codec) {
throw new UnsupportedOperationException("invalid data type: " + dataType);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index dfdca02..8bff5cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -141,15 +141,16 @@ public abstract class ColumnPageEncoder {
Iterator<byte[][]> iterator = input.iterator();
while (iterator.hasNext()) {
byte[][] subColumnPage = iterator.next();
- encodedPages[index++] = encodeChildColumn(subColumnPage);
+ encodedPages[index] = encodeChildColumn(subColumnPage, input.getComplexColumnType(index));
+ index++;
}
return encodedPages;
}
- private static EncodedColumnPage encodeChildColumn(byte[][] data)
+ private static EncodedColumnPage encodeChildColumn(byte[][] data, ColumnType complexDataType)
throws IOException, MemoryException {
- TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance("complex_inner_column",
- DataTypes.BYTE_ARRAY, ColumnType.COMPLEX);
+ TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
+ .newInstance("complex_inner_column", DataTypes.BYTE_ARRAY, complexDataType);
ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data);
ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
return encoder.encode(page);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index edae4da..899957e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -96,7 +96,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
int pageNumber, DataOutputStream dataOutputStream) throws IOException {
byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
if (!this.isDictionary) {
- dataOutputStream.writeInt(currentVal.length);
+ dataOutputStream.writeShort(currentVal.length);
}
dataOutputStream.write(currentVal);
}
@@ -120,7 +120,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
} else if (!isDictionary) {
// No Dictionary Columns
- int size = dataBuffer.getInt();
+ int size = dataBuffer.getShort();
byte[] value = new byte[size];
dataBuffer.get(value, 0, size);
if (dataType == DataTypes.DATE) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 9ff8252..301eb5a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -81,8 +81,8 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
int pageNumber, DataOutputStream dataOutputStream) throws IOException {
byte[] input = copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber);
ByteBuffer byteArray = ByteBuffer.wrap(input);
- int childElement = byteArray.getInt();
- dataOutputStream.writeInt(childElement);
+ int childElement = byteArray.getShort();
+ dataOutputStream.writeShort(childElement);
if (childElement > 0) {
for (int i = 0; i < childElement; i++) {
children.get(i)
@@ -102,13 +102,11 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
}
@Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
- int childLength = dataBuffer.getInt();
+ int childLength = dataBuffer.getShort();
Object[] fields = new Object[childLength];
for (int i = 0; i < childLength; i++) {
fields[i] = children.get(i).getDataBasedOnDataType(dataBuffer);
}
-
return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
}
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 661384c..1df60c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -521,6 +521,10 @@ public final class ByteUtil {
(((int)bytes[offset + 2] & 0xff) << 8) + ((int)bytes[offset + 3] & 0xff);
}
+ public static int toShort(byte[] bytes, int offset) {
+ return (((int)bytes[offset] & 0xff) << 8) + ((int)bytes[offset + 1] & 0xff);
+ }
+
public static void setInt(byte[] data, int offset, int value) {
data[offset] = (byte) (value >> 24);
data[offset + 1] = (byte) (value >> 16);
@@ -528,6 +532,11 @@ public final class ByteUtil {
data[offset + 3] = (byte) value;
}
+ public static void setShort(byte[] data, int offset, int value) {
+ data[offset] = (byte) (value >> 8);
+ data[offset + 1] = (byte) value;
+ }
+
/**
* long => byte[]
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index ccfb231..7f9023b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -17,9 +17,11 @@
package org.apache.carbondata.spark.testsuite.createTable
-import java.io.{File}
+import java.io.File
import java.util
+import java.util.ArrayList
+import scala.collection.mutable.ArrayBuffer
import org.apache.avro
import org.apache.commons.io.FileUtils
@@ -28,6 +30,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import tech.allegro.schema.json2avro.converter.JsonAvroConverter
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -195,8 +198,8 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
val fld = new util.ArrayList[StructField]
fld.add(new StructField("DoorNum",
- DataTypes.createArrayType(DataTypes.createStructType(address)),
- subFld))
+ DataTypes.createArrayType(DataTypes.createStructType(address)),
+ subFld))
// array of struct of struct
val doorNum = new util.ArrayList[StructField]
doorNum.add(new StructField("FloorNum",
@@ -229,4 +232,71 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
// drop table should not delete the files
cleanTestData()
}
+
+ test("test multi level support : array of array of array of with Double data type") {
+ cleanTestData()
+ val mySchema = """ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name" :"my_address",
+ | "type" :{
+ | "name": "my_address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "Temperaturetest",
+ | "type": "double"
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |} """.stripMargin
+
+ val jsonvalue=
+ """{
+ |"name" :"babu",
+ |"age" :12,
+ |"my_address" :{ "Temperaturetest" :123 }
+ |}
+ """.stripMargin
+ val pschema= org.apache.avro.Schema.parse(mySchema)
+
+ val records=new JsonAvroConverter().convertToGenericDataRecord(jsonvalue.getBytes(CharEncoding.UTF_8),pschema)
+
+ val fieds = new Array[Field](3)
+ fieds(0)=new Field("name",DataTypes.STRING);
+ fieds(1)=new Field("age",DataTypes.INT)
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("Temperature", DataTypes.DOUBLE))
+ fieds(2) = new Field("my_address", "struct", fld)
+
+
+ val writer=CarbonWriter.builder().withSchema(new Schema(fieds)).outputPath(writerPath).buildWriterForAvroInput()
+ writer.write(records)
+ writer.close()
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ // TODO: Add a validation
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ cleanTestData()
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index cc2619e..4ce80a6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -286,4 +287,10 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
public GenericDataType<ArrayObject> deepCopy() {
return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy());
}
+
+ @Override
+ public void getChildrenType(List<ColumnType> type) {
+ type.add(ColumnType.COMPLEX_ARRAY);
+ children.getChildrenType(type);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index f48a91d..8b1ccf2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -155,4 +156,7 @@ public interface GenericDataType<T> {
* clone self for multithread access (for complex type processing in table page)
*/
GenericDataType<T> deepCopy();
+
+ void getChildrenType(List<ColumnType> type);
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 481c811..7450b82 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
@@ -362,17 +363,17 @@ public class PrimitiveDataType implements GenericDataType<Object> {
private void updateValueToByteStream(DataOutputStream dataOutputStream, byte[] value)
throws IOException {
- dataOutputStream.writeInt(value.length);
+ dataOutputStream.writeShort(value.length);
dataOutputStream.write(value);
}
private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
throws IOException {
if (this.carbonDimension.getDataType() == DataTypes.STRING) {
- dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+ dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
} else {
- dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+ dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
}
String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
@@ -396,8 +397,8 @@ public class PrimitiveDataType implements GenericDataType<Object> {
KeyGenerator[] generator)
throws IOException, KeyGenException {
if (!this.isDictionary) {
- int sizeOfData = byteArrayInput.getInt();
- dataOutputStream.writeInt(sizeOfData);
+ int sizeOfData = byteArrayInput.getShort();
+ dataOutputStream.writeShort(sizeOfData);
byte[] bb = new byte[sizeOfData];
byteArrayInput.get(bb, 0, sizeOfData);
dataOutputStream.write(bb);
@@ -438,7 +439,7 @@ public class PrimitiveDataType implements GenericDataType<Object> {
@Override public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray,
ByteBuffer inputArray) {
if (!isDictionary) {
- byte[] key = new byte[inputArray.getInt()];
+ byte[] key = new byte[inputArray.getShort()];
inputArray.get(key);
columnsArray.get(outputArrayIndex).add(key);
} else {
@@ -506,4 +507,8 @@ public class PrimitiveDataType implements GenericDataType<Object> {
return dataType;
}
+
+ public void getChildrenType(List<ColumnType> type) {
+ type.add(ColumnType.COMPLEX_PRIMITIVE);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index bb3da6c..b66eef7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -153,7 +154,7 @@ public class StructDataType implements GenericDataType<StructObject> {
@Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream,
BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException {
- dataOutputStream.writeInt(children.size());
+ dataOutputStream.writeShort(children.size());
if (input == null) {
for (int i = 0; i < children.size(); i++) {
children.get(i).writeByteArray(null, dataOutputStream, logHolder);
@@ -191,9 +192,8 @@ public class StructDataType implements GenericDataType<StructObject> {
@Override public void parseComplexValue(ByteBuffer byteArrayInput,
DataOutputStream dataOutputStream, KeyGenerator[] generator)
throws IOException, KeyGenException {
- int childElement = byteArrayInput.getInt();
- dataOutputStream.writeInt(childElement);
-
+ short childElement = byteArrayInput.getShort();
+ dataOutputStream.writeShort(childElement);
for (int i = 0; i < childElement; i++) {
if (children.get(i) instanceof PrimitiveDataType) {
if (children.get(i).getIsColumnDictionary()) {
@@ -254,17 +254,10 @@ public class StructDataType implements GenericDataType<StructObject> {
@Override
public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray,
ByteBuffer inputArray) {
-
- ByteBuffer b = ByteBuffer.allocate(8);
- int childElement = inputArray.getInt();
- b.putInt(childElement);
- if (childElement == 0) {
- b.putInt(0);
- } else {
- b.putInt(children.get(0).getDataCounter());
- }
+ ByteBuffer b = ByteBuffer.allocate(2);
+ int childElement = inputArray.getShort();
+ b.putShort((short)childElement);
columnsArray.get(this.outputArrayIndex).add(b.array());
-
for (int i = 0; i < childElement; i++) {
if (children.get(i) instanceof PrimitiveDataType) {
PrimitiveDataType child = ((PrimitiveDataType) children.get(i));
@@ -301,7 +294,7 @@ public class StructDataType implements GenericDataType<StructObject> {
*/
@Override
public void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize) {
- blockKeySizeWithComplex.add(8);
+ blockKeySizeWithComplex.add(2);
for (int i = 0; i < children.size(); i++) {
children.get(i).fillBlockKeySize(blockKeySizeWithComplex, primitiveBlockKeySize);
}
@@ -327,4 +320,11 @@ public class StructDataType implements GenericDataType<StructObject> {
}
return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter);
}
+
+ public void getChildrenType(List<ColumnType> type) {
+ type.add(ColumnType.COMPLEX_STRUCT);
+ for (int i = 0; i < children.size(); i++) {
+ children.get(i).getChildrenType(type);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 26a634b..5408193 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -50,7 +50,6 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
-
/**
* Represent a page data for all columns, we store its data in columnar layout, so that
* all processing apply to TablePage can be done in vectorized fashion.
@@ -205,8 +204,9 @@ public class TablePage {
// initialize the page if first row
if (rowId == 0) {
- int depthInComplexColumn = complexDataType.getColsCount();
- complexDimensionPages[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
+ List<ColumnType> complexColumnType = new ArrayList<>();
+ complexDataType.getChildrenType(complexColumnType);
+ complexDimensionPages[index] = new ComplexColumnPage(pageSize, complexColumnType);
}
int depthInComplexColumn = complexDimensionPages[index].getDepth();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index a01f0d7..36be65f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -459,7 +459,6 @@ public class CarbonWriterBuilder {
}
if (field.getChildren() != null && field.getChildren().size() > 0) {
if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
- checkForUnsupportedDataTypes(field.getChildren().get(0).getDataType());
// Loop through the inner columns and for a StructData
DataType complexType =
DataTypes.createArrayType(field.getChildren().get(0).getDataType());
@@ -470,7 +469,6 @@ public class CarbonWriterBuilder {
List<StructField> structFieldsArray =
new ArrayList<StructField>(field.getChildren().size());
for (StructField childFld : field.getChildren()) {
- checkForUnsupportedDataTypes(childFld.getDataType());
structFieldsArray
.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
}
@@ -496,13 +494,6 @@ public class CarbonWriterBuilder {
}
}
- private void checkForUnsupportedDataTypes(DataType dataType) {
- if (dataType == DataTypes.DOUBLE || dataType == DataTypes.DATE || DataTypes
- .isDecimal(dataType)) {
- throw new RuntimeException("Unsupported data type: " + dataType.getName());
- }
- }
-
/**
* Save the schema of the {@param table} to {@param persistFilePath}
* @param table table object containing schema