You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/08 11:20:57 UTC
[1/2] carbondata git commit: [CARBONDATA-2443][SDK]Multi level
complex type support for AVRO based SDK
Repository: carbondata
Updated Branches:
refs/heads/master 51db049c4 -> 74ea24d14
[CARBONDATA-2443][SDK]Multi level complex type support for AVRO based SDK
Multi level complex type support for AVRO based SDK
This closes #2276
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ec33c112
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ec33c112
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ec33c112
Branch: refs/heads/master
Commit: ec33c11286ebe8009ac07698bf23ffb3bd3e7711
Parents: 51db049
Author: sounakr <so...@gmail.com>
Authored: Mon May 7 06:51:54 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue May 8 16:49:08 2018 +0530
----------------------------------------------------------------------
.../schema/table/TableSchemaBuilder.java | 17 +-
.../TestNonTransactionalCarbonTable.scala | 10 +-
.../datasources/SparkCarbonTableFormat.scala | 2 +-
.../loading/DataLoadProcessBuilder.java | 12 +-
.../converter/impl/FieldEncoderFactory.java | 2 +-
.../loading/model/CarbonLoadModel.java | 14 +-
.../InputProcessorStepForPartitionImpl.java | 251 ---------------
.../InputProcessorStepWithNoConverterImpl.java | 306 +++++++++++++++++++
.../util/CarbonDataProcessorUtil.java | 29 ++
.../carbondata/sdk/file/AvroCarbonWriter.java | 60 ++--
.../sdk/file/CarbonWriterBuilder.java | 6 +
11 files changed, 401 insertions(+), 308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 42bb958..e3c07fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -184,7 +184,8 @@ public class TableSchemaBuilder {
if (field.getDataType().isComplexType()) {
String parentFieldName = newColumn.getColumnName();
if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
- addColumn(new StructField("val",
+ String colName = getColNameForArray(parentFieldName);
+ addColumn(new StructField(colName,
((ArrayType) field.getDataType()).getElementType()), field.getFieldName(), false, true);
} else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")
&& ((StructType) field.getDataType()).getFields().size() > 0) {
@@ -198,6 +199,20 @@ public class TableSchemaBuilder {
return newColumn;
}
+ private String getColNameForArray(String parentFieldName) {
+ if (!parentFieldName.contains(".val")) {
+ return "val";
+ } else {
+ String[] splits = parentFieldName.split("val");
+ if (splits.length == 1) {
+ return "val" + 1;
+ } else {
+ return "val" + (Integer.parseInt(parentFieldName
+ .substring(parentFieldName.lastIndexOf("val") + 3, parentFieldName.length())) + 1);
+ }
+ }
+ }
+
/**
* Throw exception if {@param field} name is repeated
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index fabcd02..6b02d5a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -678,8 +678,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val exception =
intercept[RuntimeException] {
- buildTestDataWithBadRecordFail()
- }
+ buildTestDataWithBadRecordFail()
+ }
assert(exception.getMessage()
.contains("Data load failed due to bad record"))
@@ -780,7 +780,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
| }
| ]
| }
- """.stripMargin
+ """.stripMargin
val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """
@@ -835,8 +835,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val json =
""" {"name":"bob", "age":10,
- |"address" : {"street":"abc", "city":"bang"},
- |"doorNum" : [1,2,3,4]}""".stripMargin
+ |"address" : {"street":"abc", "city":"bang"},
+ |"doorNum" : [1,2,3,4]}""".stripMargin
val fields = new Array[Field](4)
fields(0) = new Field("name", DataTypes.STRING)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 1928b38..d6eab1d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -111,7 +111,7 @@ with Serializable {
model.setDictionaryServerHost(options.getOrElse("dicthost", null))
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
- model.setPartitionLoad(true)
+ model.setLoadWithoutCoverterStep(true)
val staticPartition = options.getOrElse("staticpartition", null)
if (staticPartition != null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 028c404..2f904ed 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -44,8 +44,8 @@ import org.apache.carbondata.processing.loading.steps.CarbonRowDataWriterProcess
import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
-import org.apache.carbondata.processing.loading.steps.InputProcessorStepForPartitionImpl;
import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.InputProcessorStepWithNoConverterImpl;
import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -62,8 +62,8 @@ public final class DataLoadProcessBuilder {
CarbonIterator[] inputIterators) throws Exception {
CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
- if (loadModel.isPartitionLoad()) {
- return buildInternalForPartitionLoad(inputIterators, configuration, sortScope);
+ if (loadModel.isLoadWithoutCoverterStep()) {
+ return buildInternalWithNoConverter(inputIterators, configuration, sortScope);
} else if (!configuration.isSortTable() ||
sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
return buildInternalForNoSort(inputIterators, configuration);
@@ -106,14 +106,14 @@ public final class DataLoadProcessBuilder {
}
/**
- * Build pipe line for partition load
+ * Build pipe line for Load without Conversion Step.
*/
- private AbstractDataLoadProcessorStep buildInternalForPartitionLoad(
+ private AbstractDataLoadProcessorStep buildInternalWithNoConverter(
CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration,
SortScopeOptions.SortScope sortScope) {
// Wraps with dummy processor.
AbstractDataLoadProcessorStep inputProcessorStep =
- new InputProcessorStepForPartitionImpl(configuration, inputIterators);
+ new InputProcessorStepWithNoConverterImpl(configuration, inputIterators);
if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) {
AbstractDataLoadProcessorStep sortProcessorStep =
new SortProcessorStepImpl(configuration, inputProcessorStep);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 567a8b5..dd28dc6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -128,7 +128,7 @@ public class FieldEncoderFactory {
/**
* Create parser for the carbon column.
*/
- private static GenericDataType createComplexDataType(DataField dataField,
+ public static GenericDataType createComplexDataType(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 2a846e2..0cc0da3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -201,8 +201,10 @@ public class CarbonLoadModel implements Serializable {
/**
* It directly writes data directly to nosort processor bypassing all other processors.
+ * For this method there will be no data conversion step. It writes data which is directly
+ * pushed into.
*/
- private boolean isPartitionLoad;
+ private boolean isLoadWithoutCoverterStep;
/**
* Flder path to where data should be written for this load.
@@ -435,7 +437,7 @@ public class CarbonLoadModel implements Serializable {
copy.batchSortSizeInMb = batchSortSizeInMb;
copy.isAggLoadRequest = isAggLoadRequest;
copy.badRecordsLocation = badRecordsLocation;
- copy.isPartitionLoad = isPartitionLoad;
+ copy.isLoadWithoutCoverterStep = isLoadWithoutCoverterStep;
copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
return copy;
}
@@ -814,12 +816,12 @@ public class CarbonLoadModel implements Serializable {
}
- public boolean isPartitionLoad() {
- return isPartitionLoad;
+ public boolean isLoadWithoutCoverterStep() {
+ return isLoadWithoutCoverterStep;
}
- public void setPartitionLoad(boolean partitionLoad) {
- isPartitionLoad = partitionLoad;
+ public void setLoadWithoutCoverterStep(boolean loadWithoutCoverterStep) {
+ isLoadWithoutCoverterStep = loadWithoutCoverterStep;
}
public String getDataWritePath() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
deleted file mode 100644
index 1dc9b27..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.loading.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It reads data from record reader and sends data to next step.
- */
-public class InputProcessorStepForPartitionImpl extends AbstractDataLoadProcessorStep {
-
- private CarbonIterator<Object[]>[] inputIterators;
-
- private boolean[] noDictionaryMapping;
-
- private DataType[] dataTypes;
-
- private int[] orderOfData;
-
- public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration configuration,
- CarbonIterator<Object[]>[] inputIterators) {
- super(configuration, null);
- this.inputIterators = inputIterators;
- }
-
- @Override public DataField[] getOutput() {
- return configuration.getDataFields();
- }
-
- @Override public void initialize() throws IOException {
- super.initialize();
- // if logger is enabled then raw data will be required.
- RowConverterImpl rowConverter =
- new RowConverterImpl(configuration.getDataFields(), configuration, null);
- rowConverter.initialize();
- configuration.setCardinalityFinder(rowConverter);
- noDictionaryMapping =
- CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
- dataTypes = new DataType[configuration.getDataFields().length];
- for (int i = 0; i < dataTypes.length; i++) {
- if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) {
- dataTypes[i] = DataTypes.INT;
- } else {
- dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType();
- }
- }
- orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader());
- }
-
- private int[] arrangeData(DataField[] dataFields, String[] header) {
- int[] data = new int[dataFields.length];
- for (int i = 0; i < dataFields.length; i++) {
- for (int j = 0; j < header.length; j++) {
- if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
- data[i] = j;
- break;
- }
- }
- }
- return data;
- }
-
- @Override public Iterator<CarbonRowBatch>[] execute() {
- int batchSize = CarbonProperties.getInstance().getBatchSize();
- List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
- Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
- for (int i = 0; i < outIterators.length; i++) {
- outIterators[i] =
- new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
- rowCounter, orderOfData, noDictionaryMapping, dataTypes);
- }
- return outIterators;
- }
-
- /**
- * Partition input iterators equally as per the number of threads.
- *
- * @return
- */
- private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
- // Get the number of cores configured in property.
- int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
- // Get the minimum of number of cores and iterators size to get the number of parallel threads
- // to be launched.
- int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
-
- List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
- for (int i = 0; i < parallelThreadNumber; i++) {
- iterators[i] = new ArrayList<>();
- }
- // Equally partition the iterators as per number of threads
- for (int i = 0; i < inputIterators.length; i++) {
- iterators[i % parallelThreadNumber].add(inputIterators[i]);
- }
- return iterators;
- }
-
- @Override protected CarbonRow processRow(CarbonRow row) {
- return null;
- }
-
- @Override public void close() {
- if (!closed) {
- super.close();
- for (CarbonIterator inputIterator : inputIterators) {
- inputIterator.close();
- }
- }
- }
-
- @Override protected String getStepName() {
- return "Input Processor";
- }
-
- /**
- * This iterator wraps the list of iterators and it starts iterating the each
- * iterator of the list one by one. It also parse the data while iterating it.
- */
- private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
-
- private List<CarbonIterator<Object[]>> inputIterators;
-
- private CarbonIterator<Object[]> currentIterator;
-
- private int counter;
-
- private int batchSize;
-
- private boolean nextBatch;
-
- private boolean firstTime;
-
- private AtomicLong rowCounter;
-
- private boolean[] noDictionaryMapping;
-
- private DataType[] dataTypes;
-
- private int[] orderOfData;
-
- public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
- boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
- DataType[] dataTypes) {
- this.inputIterators = inputIterators;
- this.batchSize = batchSize;
- this.counter = 0;
- // Get the first iterator from the list.
- currentIterator = inputIterators.get(counter++);
- this.rowCounter = rowCounter;
- this.nextBatch = false;
- this.firstTime = true;
- this.noDictionaryMapping = noDictionaryMapping;
- this.dataTypes = dataTypes;
- this.orderOfData = orderOfData;
- }
-
- @Override public boolean hasNext() {
- return nextBatch || internalHasNext();
- }
-
- private boolean internalHasNext() {
- if (firstTime) {
- firstTime = false;
- currentIterator.initialize();
- }
- boolean hasNext = currentIterator.hasNext();
- // If iterator is finished then check for next iterator.
- if (!hasNext) {
- currentIterator.close();
- // Check next iterator is available in the list.
- if (counter < inputIterators.size()) {
- // Get the next iterator from the list.
- currentIterator = inputIterators.get(counter++);
- // Initialize the new iterator
- currentIterator.initialize();
- hasNext = internalHasNext();
- }
- }
- return hasNext;
- }
-
- @Override public CarbonRowBatch next() {
- return getBatch();
- }
-
- private CarbonRowBatch getBatch() {
- // Create batch and fill it.
- CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
- int count = 0;
- while (internalHasNext() && count < batchSize) {
- carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next())));
- count++;
- }
- rowCounter.getAndAdd(carbonRowBatch.getSize());
- return carbonRowBatch;
- }
-
- private Object[] convertToNoDictionaryToBytes(Object[] data) {
- Object[] newData = new Object[data.length];
- for (int i = 0; i < noDictionaryMapping.length; i++) {
- if (noDictionaryMapping[i]) {
- newData[i] = DataTypeUtil
- .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
- } else {
- newData[i] = data[orderOfData[i]];
- }
- }
- if (newData.length > noDictionaryMapping.length) {
- for (int i = noDictionaryMapping.length; i < newData.length; i++) {
- newData[i] = data[orderOfData[i]];
- }
- }
- // System.out.println(Arrays.toString(data));
- return newData;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
new file mode 100644
index 0000000..8c24b7f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProcessorStep {
+
+ private CarbonIterator<Object[]>[] inputIterators;
+
+ private boolean[] noDictionaryMapping;
+
+ private DataType[] dataTypes;
+
+ private int[] orderOfData;
+
+ private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
+
+ public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration configuration,
+ CarbonIterator<Object[]>[] inputIterators) {
+ super(configuration, null);
+ this.inputIterators = inputIterators;
+ }
+
+ @Override public DataField[] getOutput() {
+ return configuration.getDataFields();
+ }
+
+ @Override public void initialize() throws IOException {
+ super.initialize();
+ // if logger is enabled then raw data will be required.
+ RowConverterImpl rowConverter =
+ new RowConverterImpl(configuration.getDataFields(), configuration, null);
+ rowConverter.initialize();
+ configuration.setCardinalityFinder(rowConverter);
+ noDictionaryMapping =
+ CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+
+ dataFieldsWithComplexDataType = new HashMap<>();
+ convertComplexDataType(dataFieldsWithComplexDataType);
+
+ dataTypes = new DataType[configuration.getDataFields().length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) {
+ dataTypes[i] = DataTypes.INT;
+ } else {
+ dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType();
+ }
+ }
+ orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader());
+ }
+
+ private void convertComplexDataType(Map<Integer, GenericDataType> dataFieldsWithComplexDataType) {
+ DataField[] srcDataField = configuration.getDataFields();
+ FieldEncoderFactory fieldConverterFactory = FieldEncoderFactory.getInstance();
+ String nullFormat =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+ .toString();
+ boolean isEmptyBadRecord = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+ .toString());
+ for (int i = 0; i < srcDataField.length; i++) {
+ if (srcDataField[i].getColumn().isComplex()) {
+ // create a ComplexDataType
+ dataFieldsWithComplexDataType.put(srcDataField[i].getColumn().getOrdinal(),
+ fieldConverterFactory
+ .createComplexDataType(srcDataField[i], null, configuration.getTableIdentifier(),
+ null, false, null, i, nullFormat, isEmptyBadRecord));
+ }
+ }
+ }
+
+ private int[] arrangeData(DataField[] dataFields, String[] header) {
+ int[] data = new int[dataFields.length];
+ for (int i = 0; i < dataFields.length; i++) {
+ for (int j = 0; j < header.length; j++) {
+ if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
+ data[i] = j;
+ break;
+ }
+ }
+ }
+ return data;
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] execute() {
+ int batchSize = CarbonProperties.getInstance().getBatchSize();
+ List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+ Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
+ for (int i = 0; i < outIterators.length; i++) {
+ outIterators[i] =
+ new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
+ rowCounter, orderOfData, noDictionaryMapping, dataTypes,
+ configuration.getDataFields(), dataFieldsWithComplexDataType);
+ }
+ return outIterators;
+ }
+
+ /**
+ * Partition input iterators equally as per the number of threads.
+ *
+ * @return
+ */
+ private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
+ // Get the number of cores configured in property.
+ int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+ // Get the minimum of number of cores and iterators size to get the number of parallel threads
+ // to be launched.
+ int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+
+ List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+ for (int i = 0; i < parallelThreadNumber; i++) {
+ iterators[i] = new ArrayList<>();
+ }
+ // Equally partition the iterators as per number of threads
+ for (int i = 0; i < inputIterators.length; i++) {
+ iterators[i % parallelThreadNumber].add(inputIterators[i]);
+ }
+ return iterators;
+ }
+
+ @Override protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+
+ @Override public void close() {
+ if (!closed) {
+ super.close();
+ for (CarbonIterator inputIterator : inputIterators) {
+ inputIterator.close();
+ }
+ }
+ }
+
+ @Override protected String getStepName() {
+ return "Input Processor";
+ }
+
+ /**
+ * This iterator wraps the list of iterators and it starts iterating the each
+ * iterator of the list one by one. It also parse the data while iterating it.
+ */
+ private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
+
+ private List<CarbonIterator<Object[]>> inputIterators;
+
+ private CarbonIterator<Object[]> currentIterator;
+
+ private int counter;
+
+ private int batchSize;
+
+ private boolean nextBatch;
+
+ private boolean firstTime;
+
+ private AtomicLong rowCounter;
+
+ private boolean[] noDictionaryMapping;
+
+ private DataType[] dataTypes;
+
+ private DataField[] dataFields;
+
+ private int[] orderOfData;
+
+ private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
+
+ public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
+ boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
+ DataType[] dataTypes, DataField[] dataFields,
+ Map<Integer, GenericDataType> dataFieldsWithComplexDataType) {
+ this.inputIterators = inputIterators;
+ this.batchSize = batchSize;
+ this.counter = 0;
+ // Get the first iterator from the list.
+ currentIterator = inputIterators.get(counter++);
+ this.rowCounter = rowCounter;
+ this.nextBatch = false;
+ this.firstTime = true;
+ this.noDictionaryMapping = noDictionaryMapping;
+ this.dataTypes = dataTypes;
+ this.dataFields = dataFields;
+ this.orderOfData = orderOfData;
+ this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType;
+ }
+
+ @Override public boolean hasNext() {
+ return nextBatch || internalHasNext();
+ }
+
+ private boolean internalHasNext() {
+ if (firstTime) {
+ firstTime = false;
+ currentIterator.initialize();
+ }
+ boolean hasNext = currentIterator.hasNext();
+ // If iterator is finished then check for next iterator.
+ if (!hasNext) {
+ currentIterator.close();
+ // Check next iterator is available in the list.
+ if (counter < inputIterators.size()) {
+ // Get the next iterator from the list.
+ currentIterator = inputIterators.get(counter++);
+ // Initialize the new iterator
+ currentIterator.initialize();
+ hasNext = internalHasNext();
+ }
+ }
+ return hasNext;
+ }
+
+ @Override public CarbonRowBatch next() {
+ return getBatch();
+ }
+
+ private CarbonRowBatch getBatch() {
+ // Create batch and fill it.
+ CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
+ int count = 0;
+ while (internalHasNext() && count < batchSize) {
+ carbonRowBatch.addRow(
+ new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields)));
+ count++;
+ }
+ rowCounter.getAndAdd(carbonRowBatch.getSize());
+ return carbonRowBatch;
+ }
+
+ private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) {
+ Object[] newData = new Object[data.length];
+ for (int i = 0; i < data.length; i++) {
+ if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
+ newData[i] = DataTypeUtil
+ .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+ } else {
+ // if this is a complex column then recursively comver the data into Byte Array.
+ if (dataTypes[i].isComplexType()) {
+ ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
+ try {
+ GenericDataType complextType =
+ dataFieldsWithComplexDataType.get(dataFields[i].getColumn().getOrdinal());
+
+ complextType.writeByteArray(data[orderOfData[i]], dataOutputStream);
+
+ dataOutputStream.close();
+ newData[i] = byteArray.toByteArray();
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException("Loading Exception", e);
+ }
+ } else {
+ newData[i] = data[orderOfData[i]];
+ }
+ }
+ }
+ // System.out.println(Arrays.toString(data));
+ return newData;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index f47853e..f921fd5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -240,6 +240,35 @@ public final class CarbonDataProcessorUtil {
.toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
}
+ public static void getComplexNoDictionaryMapping(DataField[] dataFields,
+ List<Integer> complexNoDictionary) {
+
+ // save the Ordinal Number in the List.
+ for (DataField field : dataFields) {
+ if (field.getColumn().isComplex()) {
+ // get the childs.
+ getComplexNoDictionaryMapping(
+ ((CarbonDimension) field.getColumn()).getListOfChildDimensions(), complexNoDictionary);
+ }
+ }
+ }
+
+ public static void getComplexNoDictionaryMapping(List<CarbonDimension> carbonDimensions,
+ List<Integer> complexNoDictionary) {
+ for (CarbonDimension carbonDimension : carbonDimensions) {
+ if (carbonDimension.isComplex()) {
+ getComplexNoDictionaryMapping(carbonDimension.getListOfChildDimensions(),
+ complexNoDictionary);
+ } else {
+ // This is primitive type. Check the encoding for NoDictionary.
+ if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+ complexNoDictionary.add(carbonDimension.getOrdinal());
+ }
+ }
+ }
+ }
+
+
/**
* Preparing the boolean [] to map whether the dimension use inverted index or not.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index bc2e9db..946040f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -26,6 +26,8 @@ import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
+import org.apache.carbondata.processing.loading.complexobjects.StructObject;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.avro.Schema;
@@ -70,15 +72,15 @@ class AvroCarbonWriter extends CarbonWriter {
avroSchema = avroRecord.getSchema();
}
List<Schema.Field> fields = avroSchema.getFields();
- Object [] csvField = new Object[fields.size()];
+ Object[] csvField = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
- csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i), 0);
+ csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i));
}
return csvField;
}
- private String avroFieldToObject(Schema.Field avroField, Object fieldValue, int delimiterLevel) {
- StringBuilder out = new StringBuilder();
+ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
+ Object out = new Object();
Schema.Type type = avroField.schema().getType();
switch (type) {
case BOOLEAN:
@@ -86,55 +88,39 @@ class AvroCarbonWriter extends CarbonWriter {
case LONG:
case DOUBLE:
case STRING:
+ out = fieldValue;
+ break;
case FLOAT:
- out.append(fieldValue.toString());
+ Float f = (Float) fieldValue;
+ out = f.doubleValue();
break;
case RECORD:
List<Schema.Field> fields = avroField.schema().getFields();
- String delimiter = null;
- delimiterLevel ++;
+
+ Object[] structChildObjects = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
- if (delimiterLevel == 1) {
- delimiter = "$";
- } else if (delimiterLevel > 1) {
- delimiter = ":";
- }
- if (i != (fields.size() - 1)) {
- out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i),
- delimiterLevel)).append(delimiter);
- } else {
- out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i),
- delimiterLevel));
- }
+ structChildObjects[i] =
+ avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i));
}
+ StructObject structObject = new StructObject(structChildObjects);
+ out = structObject;
break;
case ARRAY:
int size = ((ArrayList) fieldValue).size();
- String delimiterArray = null;
- delimiterLevel ++;
- if (delimiterLevel == 1) {
- delimiterArray = "$";
- } else if (delimiterLevel > 1) {
- delimiterArray = ":";
- }
-
+ Object[] arrayChildObjects = new Object[size];
for (int i = 0; i < size; i++) {
- if (i != size - 1) {
- out.append(avroFieldToObject(
- new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
- ((ArrayList) fieldValue).get(i), delimiterLevel)).append(delimiterArray);
- } else {
- out.append(avroFieldToObject(
- new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
- ((ArrayList) fieldValue).get(i), delimiterLevel));
- }
+ arrayChildObjects[i] = (avroFieldToObject(
+ new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+ ((ArrayList) fieldValue).get(i)));
}
+ ArrayObject arrayObject = new ArrayObject(arrayChildObjects);
+ out = arrayObject;
break;
default:
throw new UnsupportedOperationException();
}
- return out.toString();
+ return out;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 397f151..76a46d0 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -330,6 +330,12 @@ public class CarbonWriterBuilder {
Objects.requireNonNull(schema, "schema should not be null");
Objects.requireNonNull(path, "path should not be null");
CarbonLoadModel loadModel = createLoadModel();
+
+ // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to
+ // handle multi level complex type support. As there are no conversion converter step is
+ // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder
+ // which will skip Conversion Step.
+ loadModel.setLoadWithoutCoverterStep(true);
return new AvroCarbonWriter(loadModel);
}
[2/2] carbondata git commit: [CARBONDATA-2443][SDK]Added the test
cases for avro multilevel complex type support
Posted by ra...@apache.org.
[CARBONDATA-2443][SDK]Added the test cases for avro multilevel complex type support
This closes #2276
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74ea24d1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74ea24d1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74ea24d1
Branch: refs/heads/master
Commit: 74ea24d141c9a1dd1dd820750ca0387d2ff64171
Parents: ec33c11
Author: ajantha-bhat <aj...@gmail.com>
Authored: Mon May 7 14:39:40 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue May 8 16:50:38 2018 +0530
----------------------------------------------------------------------
.../schema/table/TableSchemaBuilder.java | 2 +-
.../TestNonTransactionalCarbonTable.scala | 510 ++++++++++++++++++-
2 files changed, 499 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74ea24d1/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index e3c07fa..ca082e1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -200,7 +200,7 @@ public class TableSchemaBuilder {
}
private String getColNameForArray(String parentFieldName) {
- if (!parentFieldName.contains(".val")) {
+ if (!parentFieldName.endsWith(".val")) {
return "val";
} else {
String[] splits = parentFieldName.split("val");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74ea24d1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 6b02d5a..c641ed3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -735,7 +735,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
-
val fields = new Array[Field](3)
fields(0) = new Field("name", DataTypes.STRING)
fields(1) = new Field("age", DataTypes.INT)
@@ -902,9 +901,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
|{"street":"ghi","city":"city3"},
|{"street":"jkl","city":"city4"}]} """.stripMargin
-
-
-
val fields = new Array[Field](3)
fields(0) = new Field("name", DataTypes.STRING)
fields(1) = new Field("age", DataTypes.INT)
@@ -988,20 +984,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
| }
|} """.stripMargin
-
-
-
-
val fields = new Array[Field](3)
fields(0) = new Field("name", DataTypes.STRING)
fields(1) = new Field("age", DataTypes.INT)
- val fld1 = new util.ArrayList[StructField]
- fld1.add(new StructField("eachDoorNum", DataTypes.INT))
-
val fld2 = new util.ArrayList[StructField]
fld2.add(new StructField("street", DataTypes.STRING))
fld2.add(new StructField("city", DataTypes.STRING))
+
+ val fld1 = new util.ArrayList[StructField]
+ fld1.add(new StructField("eachDoorNum", DataTypes.INT))
fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1))
fields(2) = new Field("address","struct",fld2)
@@ -1050,7 +1042,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
- assert(new File(writerPath).exists())
+ assert(new File(writerPath).listFiles().length > 0)
+ cleanTestData()
}
test("Read sdk writer Avro output with both Array and Struct Type") {
@@ -1077,6 +1070,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4))))
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
+ cleanTestData()
}
@@ -1102,6 +1096,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
+ cleanTestData()
}
@@ -1128,6 +1123,497 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
+ cleanTestData()
+ }
+
+ // test multi level -- 3 levels [array of struct of array of int]
+ def buildAvroTestDataMultiLevel3(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val mySchema = """ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name": "doorNum",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "type": "record",
+ | "name": "my_address",
+ | "fields": [
+ | {
+ | "name": "street",
+ | "type": "string"
+ | },
+ | {
+ | "name": "city",
+ | "type": "string"
+ | },
+ | {
+ | "name": "FloorNum",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "name": "floor",
+ | "type": "int"
+ | }
+ | }
+ | }
+ | ]
+ | }
+ | }
+ | }
+ | ]
+ |} """.stripMargin
+ val json =
+ """ {
+ | "name": "bob",
+ | "age": 10,
+ | "doorNum": [
+ | {
+ | "street": "abc",
+ | "city": "city1",
+ | "FloorNum": [0,1,2]
+ | },
+ | {
+ | "street": "def",
+ | "city": "city2",
+ | "FloorNum": [3,4,5]
+ | },
+ | {
+ | "street": "ghi",
+ | "city": "city3",
+ | "FloorNum": [6,7,8]
+ | },
+ | {
+ | "street": "jkl",
+ | "city": "city4",
+ | "FloorNum": [9,10,11]
+ | }
+ | ]
+ |} """.stripMargin
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+ fld.add(new StructField("FloorNum", DataTypes.createArrayType(DataTypes.INT)))
+
+ val fld2 = new util.ArrayList[StructField]
+ fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+ fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataMultiLevel3Type(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestDataMultiLevel3(3, null)
+ }
+
+ // test multi level -- 3 levels [array of struct of array of int]
+ test("test multi level support : array of struct of array of int") {
+ buildAvroTestDataMultiLevel3Type()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ // TODO: Add a validation
+ /*
+ +----+---+-----------------------------------------------------------------------------------+
+ |name|age|doorNum
+ |
+ +----+---+-----------------------------------------------------------------------------------+
+ |bob |10 |[[abc,city1,WrappedArray(0, 1, 2)], [def,city2,WrappedArray(3, 4, 5)], [ghi,city3,
+ WrappedArray(6, 7, 8)], [jkl,city4,WrappedArray(9, 10, 11)]]|
+ |bob |10 |[[abc,city1,WrappedArray(0, 1, 2)], [def,city2,WrappedArray(3, 4, 5)], [ghi,city3,
+ WrappedArray(6, 7, 8)], [jkl,city4,WrappedArray(9, 10, 11)]]|
+ |bob |10 |[[abc,city1,WrappedArray(0, 1, 2)], [def,city2,WrappedArray(3, 4, 5)], [ghi,city3,
+ WrappedArray(6, 7, 8)], [jkl,city4,WrappedArray(9, 10, 11)]]|
+ +----+---+-----------------------------------------------------------------------------------+*/
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ cleanTestData()
+ }
+
+
+ // test multi level -- 3 levels [array of struct of struct of string, int]
+ def buildAvroTestDataMultiLevel3_1(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val mySchema = """ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name": "doorNum",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "type": "record",
+ | "name": "my_address",
+ | "fields": [
+ | {
+ | "name": "street",
+ | "type": "string"
+ | },
+ | {
+ | "name": "city",
+ | "type": "string"
+ | },
+ | {
+ | "name": "FloorNum",
+ | "type": {
+ | "type": "record",
+ | "name": "Floor",
+ | "fields": [
+ | {
+ | "name": "wing",
+ | "type": "string"
+ | },
+ | {
+ | "name": "number",
+ | "type": "int"
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ | }
+ | }
+ | }
+ | ]
+ |} """.stripMargin
+
+
+ val json =
+ """ {
+ | "name": "bob",
+ | "age": 10,
+ | "doorNum": [
+ | {
+ | "street": "abc",
+ | "city": "city1",
+ | "FloorNum": {"wing" : "a", "number" : 1}
+ | },
+ | {
+ | "street": "def",
+ | "city": "city2",
+ | "FloorNum": {"wing" : "b", "number" : 0}
+ | },
+ | {
+ | "street": "ghi",
+ | "city": "city3",
+ | "FloorNum": {"wing" : "a", "number" : 2}
+ | }
+ | ]
+ |} """.stripMargin
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+
+ val subFld = new util.ArrayList[StructField]
+ subFld.add(new StructField("wing", DataTypes.STRING))
+ subFld.add(new StructField("number", DataTypes.INT))
+ fld.add(new StructField("FloorNum", DataTypes.createStructType(subFld)))
+
+ // array of struct of struct
+ val fld2 = new util.ArrayList[StructField]
+ fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+ fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataMultiLevel3_1Type(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestDataMultiLevel3_1(3, null)
+ }
+
+ // test multi level -- 3 levels [array of struct of struct of string, int]
+ test("test multi level support : array of struct of struct") {
+ buildAvroTestDataMultiLevel3_1Type()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ // TODO: Add a validation
+ /*
+ +----+---+---------------------------------------------------------+
+ |name|age|doorNum |
+ +----+---+---------------------------------------------------------+
+ |bob |10 |[[abc,city1,[a,1]], [def,city2,[b,0]], [ghi,city3,[a,2]]]|
+ |bob |10 |[[abc,city1,[a,1]], [def,city2,[b,0]], [ghi,city3,[a,2]]]|
+ |bob |10 |[[abc,city1,[a,1]], [def,city2,[b,0]], [ghi,city3,[a,2]]]|
+ +----+---+---------------------------------------------------------+
+ */
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ cleanTestData()
+ }
+
+ // test multi level -- 3 levels [array of array of array of int]
+ def buildAvroTestDataMultiLevel3_2(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val mySchema = """ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name": "BuildNum",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "name": "FloorNum",
+ | "type": "array",
+ | "items": {
+ | "name": "doorNum",
+ | "type": "array",
+ | "items": {
+ | "name": "EachdoorNums",
+ | "type": "int",
+ | "default": -1
+ | }
+ | }
+ | }
+ | }
+ | }
+ | ]
+ |} """.stripMargin
+
+ val json =
+ """ {
+ | "name": "bob",
+ | "age": 10,
+ | "BuildNum": [[[1,2,3],[4,5,6]],[[10,20,30],[40,50,60]]]
+ | } """.stripMargin
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val subFld = new util.ArrayList[StructField]
+ subFld.add(new StructField("EachDoorNum", DataTypes.INT))
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("DoorNum", DataTypes.createArrayType(DataTypes.INT), subFld))
+ // array of struct of struct
+ val doorNum = new util.ArrayList[StructField]
+ doorNum.add(new StructField("FloorNum",
+ DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.INT)), fld))
+ fields(2) = new Field("BuildNum", "array", doorNum)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataMultiLevel3_2Type(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestDataMultiLevel3_2(3, null)
+ }
+
+ // test multi level -- 3 levels [array of array of array of int]
+ test("test multi level support : array of array of array of int") {
+ buildAvroTestDataMultiLevel3_2Type()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ // TODO: Add a validation
+ /*
+ +----+---+---------------------------------------------------------------------------+
+ |name|age|BuildNum
+ |
+ +----+---+---------------------------------------------------------------------------+
+ |bob |10 |[WrappedArray(WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)), WrappedArray
+ (WrappedArray(10, 20, 30), WrappedArray(40, 50, 60))]|
+ |bob |10 |[WrappedArray(WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)), WrappedArray
+ (WrappedArray(10, 20, 30), WrappedArray(40, 50, 60))]|
+ |bob |10 |[WrappedArray(WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)), WrappedArray
+ (WrappedArray(10, 20, 30), WrappedArray(40, 50, 60))]|
+ +----+---+---------------------------------------------------------------------------+
+ */
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ cleanTestData()
}
+
+
+ // test multi level -- 4 levels [array of array of array of struct]
+ def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val mySchema = """ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name": "BuildNum",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "name": "FloorNum",
+ | "type": "array",
+ | "items": {
+ | "name": "doorNum",
+ | "type": "array",
+ | "items": {
+ | "name": "my_address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "street",
+ | "type": "string"
+ | },
+ | {
+ | "name": "city",
+ | "type": "string"
+ | }
+ | ]
+ | }
+ | }
+ | }
+ | }
+ | }
+ | ]
+ |} """.stripMargin
+
+ val json =
+ """ {
+ | "name": "bob",
+ | "age": 10,
+ | "BuildNum": [
+ | [
+ | [
+ | {"street":"abc", "city":"city1"},
+ | {"street":"def", "city":"city2"},
+ | {"street":"cfg", "city":"city3"}
+ | ],
+ | [
+ | {"street":"abc1", "city":"city3"},
+ | {"street":"def1", "city":"city4"},
+ | {"street":"cfg1", "city":"city5"}
+ | ]
+ | ],
+ | [
+ | [
+ | {"street":"abc2", "city":"cityx"},
+ | {"street":"abc3", "city":"cityy"},
+ | {"street":"abc4", "city":"cityz"}
+ | ],
+ | [
+ | {"street":"a1bc", "city":"cityA"},
+ | {"street":"a1bc", "city":"cityB"},
+ | {"street":"a1bc", "city":"cityc"}
+ | ]
+ | ]
+ | ]
+ |} """.stripMargin
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val subFld = new util.ArrayList[StructField]
+ subFld.add(new StructField("EachDoorNum", DataTypes.INT))
+
+ val address = new util.ArrayList[StructField]
+ address.add(new StructField("street", DataTypes.STRING))
+ address.add(new StructField("city", DataTypes.STRING))
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("DoorNum",
+ DataTypes.createArrayType(DataTypes.createStructType(address)),
+ subFld))
+ // array of struct of struct
+ val doorNum = new util.ArrayList[StructField]
+ doorNum.add(new StructField("FloorNum",
+ DataTypes.createArrayType(
+ DataTypes.createArrayType(DataTypes.createStructType(address))), fld))
+ fields(2) = new Field("BuildNum", "array", doorNum)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataMultiLevel4Type(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestDataMultiLevel4(3, null)
+ }
+
+ // test multi level -- 4 levels [array of array of array of struct]
+ test("test multi level support : array of array of array of struct") {
+ buildAvroTestDataMultiLevel4Type()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ // TODO: Add a validation
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ cleanTestData()
+ }
+
+
}
\ No newline at end of file