You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/08 11:20:57 UTC

[1/2] carbondata git commit: [CARBONDATA-2443][SDK]Multi level complex type support for AVRO based SDK

Repository: carbondata
Updated Branches:
  refs/heads/master 51db049c4 -> 74ea24d14


[CARBONDATA-2443][SDK]Multi level complex type support for AVRO based SDK

Multi level complex type support for AVRO based SDK

This closes #2276


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ec33c112
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ec33c112
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ec33c112

Branch: refs/heads/master
Commit: ec33c11286ebe8009ac07698bf23ffb3bd3e7711
Parents: 51db049
Author: sounakr <so...@gmail.com>
Authored: Mon May 7 06:51:54 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue May 8 16:49:08 2018 +0530

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        |  17 +-
 .../TestNonTransactionalCarbonTable.scala       |  10 +-
 .../datasources/SparkCarbonTableFormat.scala    |   2 +-
 .../loading/DataLoadProcessBuilder.java         |  12 +-
 .../converter/impl/FieldEncoderFactory.java     |   2 +-
 .../loading/model/CarbonLoadModel.java          |  14 +-
 .../InputProcessorStepForPartitionImpl.java     | 251 ---------------
 .../InputProcessorStepWithNoConverterImpl.java  | 306 +++++++++++++++++++
 .../util/CarbonDataProcessorUtil.java           |  29 ++
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  60 ++--
 .../sdk/file/CarbonWriterBuilder.java           |   6 +
 11 files changed, 401 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 42bb958..e3c07fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -184,7 +184,8 @@ public class TableSchemaBuilder {
     if (field.getDataType().isComplexType()) {
       String parentFieldName = newColumn.getColumnName();
       if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
-        addColumn(new StructField("val",
+        String colName = getColNameForArray(parentFieldName);
+        addColumn(new StructField(colName,
             ((ArrayType) field.getDataType()).getElementType()), field.getFieldName(), false, true);
       } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")
           && ((StructType) field.getDataType()).getFields().size() > 0) {
@@ -198,6 +199,20 @@ public class TableSchemaBuilder {
     return newColumn;
   }
 
+  private String getColNameForArray(String parentFieldName) {
+    if (!parentFieldName.contains(".val")) {
+      return "val";
+    } else {
+      String[] splits = parentFieldName.split("val");
+      if (splits.length == 1) {
+        return "val" + 1;
+      } else {
+        return "val" + (Integer.parseInt(parentFieldName
+            .substring(parentFieldName.lastIndexOf("val") + 3, parentFieldName.length())) + 1);
+      }
+    }
+  }
+
   /**
    * Throw exception if {@param field} name is repeated
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index fabcd02..6b02d5a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -678,8 +678,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     val exception =
       intercept[RuntimeException] {
-      buildTestDataWithBadRecordFail()
-    }
+        buildTestDataWithBadRecordFail()
+      }
     assert(exception.getMessage()
       .contains("Data load failed due to bad record"))
 
@@ -780,7 +780,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                      |      }
                      |      ]
                      |  }
-                     """.stripMargin
+                   """.stripMargin
 
     val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """
 
@@ -835,8 +835,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     val json =
       """ {"name":"bob", "age":10,
-          |"address" : {"street":"abc", "city":"bang"},
-          |"doorNum" : [1,2,3,4]}""".stripMargin
+        |"address" : {"street":"abc", "city":"bang"},
+        |"doorNum" : [1,2,3,4]}""".stripMargin
 
     val fields = new Array[Field](4)
     fields(0) = new Field("name", DataTypes.STRING)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 1928b38..d6eab1d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -111,7 +111,7 @@ with Serializable {
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
-    model.setPartitionLoad(true)
+    model.setLoadWithoutCoverterStep(true)
 
     val staticPartition = options.getOrElse("staticpartition", null)
     if (staticPartition != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 028c404..2f904ed 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -44,8 +44,8 @@ import org.apache.carbondata.processing.loading.steps.CarbonRowDataWriterProcess
 import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
-import org.apache.carbondata.processing.loading.steps.InputProcessorStepForPartitionImpl;
 import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.InputProcessorStepWithNoConverterImpl;
 import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -62,8 +62,8 @@ public final class DataLoadProcessBuilder {
       CarbonIterator[] inputIterators) throws Exception {
     CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
     SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
-    if (loadModel.isPartitionLoad()) {
-      return buildInternalForPartitionLoad(inputIterators, configuration, sortScope);
+    if (loadModel.isLoadWithoutCoverterStep()) {
+      return buildInternalWithNoConverter(inputIterators, configuration, sortScope);
     } else if (!configuration.isSortTable() ||
         sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
       return buildInternalForNoSort(inputIterators, configuration);
@@ -106,14 +106,14 @@ public final class DataLoadProcessBuilder {
   }
 
   /**
-   * Build pipe line for partition load
+   * Build pipe line for Load without Conversion Step.
    */
-  private AbstractDataLoadProcessorStep buildInternalForPartitionLoad(
+  private AbstractDataLoadProcessorStep buildInternalWithNoConverter(
       CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration,
       SortScopeOptions.SortScope sortScope) {
     // Wraps with dummy processor.
     AbstractDataLoadProcessorStep inputProcessorStep =
-        new InputProcessorStepForPartitionImpl(configuration, inputIterators);
+        new InputProcessorStepWithNoConverterImpl(configuration, inputIterators);
     if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) {
       AbstractDataLoadProcessorStep sortProcessorStep =
           new SortProcessorStepImpl(configuration, inputProcessorStep);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 567a8b5..dd28dc6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -128,7 +128,7 @@ public class FieldEncoderFactory {
   /**
    * Create parser for the carbon column.
    */
-  private static GenericDataType createComplexDataType(DataField dataField,
+  public static GenericDataType createComplexDataType(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
       Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 2a846e2..0cc0da3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -201,8 +201,10 @@ public class CarbonLoadModel implements Serializable {
 
   /**
    * It directly writes data directly to nosort processor bypassing all other processors.
+   * For this method there will be no data conversion step. It writes data which is directly
+   * pushed into.
    */
-  private boolean isPartitionLoad;
+  private boolean isLoadWithoutCoverterStep;
 
   /**
    * Flder path to where data should be written for this load.
@@ -435,7 +437,7 @@ public class CarbonLoadModel implements Serializable {
     copy.batchSortSizeInMb = batchSortSizeInMb;
     copy.isAggLoadRequest = isAggLoadRequest;
     copy.badRecordsLocation = badRecordsLocation;
-    copy.isPartitionLoad = isPartitionLoad;
+    copy.isLoadWithoutCoverterStep = isLoadWithoutCoverterStep;
     copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
     return copy;
   }
@@ -814,12 +816,12 @@ public class CarbonLoadModel implements Serializable {
   }
 
 
-  public boolean isPartitionLoad() {
-    return isPartitionLoad;
+  public boolean isLoadWithoutCoverterStep() {
+    return isLoadWithoutCoverterStep;
   }
 
-  public void setPartitionLoad(boolean partitionLoad) {
-    isPartitionLoad = partitionLoad;
+  public void setLoadWithoutCoverterStep(boolean loadWithoutCoverterStep) {
+    isLoadWithoutCoverterStep = loadWithoutCoverterStep;
   }
 
   public String getDataWritePath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
deleted file mode 100644
index 1dc9b27..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.loading.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It reads data from record reader and sends data to next step.
- */
-public class InputProcessorStepForPartitionImpl extends AbstractDataLoadProcessorStep {
-
-  private CarbonIterator<Object[]>[] inputIterators;
-
-  private boolean[] noDictionaryMapping;
-
-  private DataType[] dataTypes;
-
-  private int[] orderOfData;
-
-  public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration configuration,
-      CarbonIterator<Object[]>[] inputIterators) {
-    super(configuration, null);
-    this.inputIterators = inputIterators;
-  }
-
-  @Override public DataField[] getOutput() {
-    return configuration.getDataFields();
-  }
-
-  @Override public void initialize() throws IOException {
-    super.initialize();
-    // if logger is enabled then raw data will be required.
-    RowConverterImpl rowConverter =
-        new RowConverterImpl(configuration.getDataFields(), configuration, null);
-    rowConverter.initialize();
-    configuration.setCardinalityFinder(rowConverter);
-    noDictionaryMapping =
-        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
-    dataTypes = new DataType[configuration.getDataFields().length];
-    for (int i = 0; i < dataTypes.length; i++) {
-      if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) {
-        dataTypes[i] = DataTypes.INT;
-      } else {
-        dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType();
-      }
-    }
-    orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader());
-  }
-
-  private int[] arrangeData(DataField[] dataFields, String[] header) {
-    int[] data = new int[dataFields.length];
-    for (int i = 0; i < dataFields.length; i++) {
-      for (int j = 0; j < header.length; j++) {
-        if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
-          data[i] = j;
-          break;
-        }
-      }
-    }
-    return data;
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] execute() {
-    int batchSize = CarbonProperties.getInstance().getBatchSize();
-    List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
-    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
-    for (int i = 0; i < outIterators.length; i++) {
-      outIterators[i] =
-          new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
-              rowCounter, orderOfData, noDictionaryMapping, dataTypes);
-    }
-    return outIterators;
-  }
-
-  /**
-   * Partition input iterators equally as per the number of threads.
-   *
-   * @return
-   */
-  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
-    // Get the number of cores configured in property.
-    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    // Get the minimum of number of cores and iterators size to get the number of parallel threads
-    // to be launched.
-    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
-
-    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
-    for (int i = 0; i < parallelThreadNumber; i++) {
-      iterators[i] = new ArrayList<>();
-    }
-    // Equally partition the iterators as per number of threads
-    for (int i = 0; i < inputIterators.length; i++) {
-      iterators[i % parallelThreadNumber].add(inputIterators[i]);
-    }
-    return iterators;
-  }
-
-  @Override protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
-  @Override public void close() {
-    if (!closed) {
-      super.close();
-      for (CarbonIterator inputIterator : inputIterators) {
-        inputIterator.close();
-      }
-    }
-  }
-
-  @Override protected String getStepName() {
-    return "Input Processor";
-  }
-
-  /**
-   * This iterator wraps the list of iterators and it starts iterating the each
-   * iterator of the list one by one. It also parse the data while iterating it.
-   */
-  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
-
-    private List<CarbonIterator<Object[]>> inputIterators;
-
-    private CarbonIterator<Object[]> currentIterator;
-
-    private int counter;
-
-    private int batchSize;
-
-    private boolean nextBatch;
-
-    private boolean firstTime;
-
-    private AtomicLong rowCounter;
-
-    private boolean[] noDictionaryMapping;
-
-    private DataType[] dataTypes;
-
-    private int[] orderOfData;
-
-    public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
-        boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
-        DataType[] dataTypes) {
-      this.inputIterators = inputIterators;
-      this.batchSize = batchSize;
-      this.counter = 0;
-      // Get the first iterator from the list.
-      currentIterator = inputIterators.get(counter++);
-      this.rowCounter = rowCounter;
-      this.nextBatch = false;
-      this.firstTime = true;
-      this.noDictionaryMapping = noDictionaryMapping;
-      this.dataTypes = dataTypes;
-      this.orderOfData = orderOfData;
-    }
-
-    @Override public boolean hasNext() {
-      return nextBatch || internalHasNext();
-    }
-
-    private boolean internalHasNext() {
-      if (firstTime) {
-        firstTime = false;
-        currentIterator.initialize();
-      }
-      boolean hasNext = currentIterator.hasNext();
-      // If iterator is finished then check for next iterator.
-      if (!hasNext) {
-        currentIterator.close();
-        // Check next iterator is available in the list.
-        if (counter < inputIterators.size()) {
-          // Get the next iterator from the list.
-          currentIterator = inputIterators.get(counter++);
-          // Initialize the new iterator
-          currentIterator.initialize();
-          hasNext = internalHasNext();
-        }
-      }
-      return hasNext;
-    }
-
-    @Override public CarbonRowBatch next() {
-      return getBatch();
-    }
-
-    private CarbonRowBatch getBatch() {
-      // Create batch and fill it.
-      CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
-      int count = 0;
-      while (internalHasNext() && count < batchSize) {
-        carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next())));
-        count++;
-      }
-      rowCounter.getAndAdd(carbonRowBatch.getSize());
-      return carbonRowBatch;
-    }
-
-    private Object[] convertToNoDictionaryToBytes(Object[] data) {
-      Object[] newData = new Object[data.length];
-      for (int i = 0; i < noDictionaryMapping.length; i++) {
-        if (noDictionaryMapping[i]) {
-          newData[i] = DataTypeUtil
-              .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
-        } else {
-          newData[i] = data[orderOfData[i]];
-        }
-      }
-      if (newData.length > noDictionaryMapping.length) {
-        for (int i = noDictionaryMapping.length; i < newData.length; i++) {
-          newData[i] = data[orderOfData[i]];
-        }
-      }
-      //      System.out.println(Arrays.toString(data));
-      return newData;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
new file mode 100644
index 0000000..8c24b7f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProcessorStep {
+
+  private CarbonIterator<Object[]>[] inputIterators;
+
+  private boolean[] noDictionaryMapping;
+
+  private DataType[] dataTypes;
+
+  private int[] orderOfData;
+
+  private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
+
+  public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration configuration,
+      CarbonIterator<Object[]>[] inputIterators) {
+    super(configuration, null);
+    this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+    return configuration.getDataFields();
+  }
+
+  @Override public void initialize() throws IOException {
+    super.initialize();
+    // if logger is enabled then raw data will be required.
+    RowConverterImpl rowConverter =
+        new RowConverterImpl(configuration.getDataFields(), configuration, null);
+    rowConverter.initialize();
+    configuration.setCardinalityFinder(rowConverter);
+    noDictionaryMapping =
+        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+
+    dataFieldsWithComplexDataType = new HashMap<>();
+    convertComplexDataType(dataFieldsWithComplexDataType);
+
+    dataTypes = new DataType[configuration.getDataFields().length];
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) {
+        dataTypes[i] = DataTypes.INT;
+      } else {
+        dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType();
+      }
+    }
+    orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader());
+  }
+
+  private void convertComplexDataType(Map<Integer, GenericDataType> dataFieldsWithComplexDataType) {
+    DataField[] srcDataField = configuration.getDataFields();
+    FieldEncoderFactory fieldConverterFactory = FieldEncoderFactory.getInstance();
+    String nullFormat =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    boolean isEmptyBadRecord = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+            .toString());
+    for (int i = 0; i < srcDataField.length; i++) {
+      if (srcDataField[i].getColumn().isComplex()) {
+        // create a ComplexDataType
+        dataFieldsWithComplexDataType.put(srcDataField[i].getColumn().getOrdinal(),
+            fieldConverterFactory
+                .createComplexDataType(srcDataField[i], null, configuration.getTableIdentifier(),
+                    null, false, null, i, nullFormat, isEmptyBadRecord));
+      }
+    }
+  }
+
+  private int[] arrangeData(DataField[] dataFields, String[] header) {
+    int[] data = new int[dataFields.length];
+    for (int i = 0; i < dataFields.length; i++) {
+      for (int j = 0; j < header.length; j++) {
+        if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
+          data[i] = j;
+          break;
+        }
+      }
+    }
+    return data;
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() {
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+    List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
+    for (int i = 0; i < outIterators.length; i++) {
+      outIterators[i] =
+          new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
+              rowCounter, orderOfData, noDictionaryMapping, dataTypes,
+              configuration.getDataFields(), dataFieldsWithComplexDataType);
+    }
+    return outIterators;
+  }
+
+  /**
+   * Partition input iterators equally as per the number of threads.
+   *
+   * @return
+   */
+  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
+    // Get the number of cores configured in property.
+    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    // Get the minimum of number of cores and iterators size to get the number of parallel threads
+    // to be launched.
+    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+
+    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+    for (int i = 0; i < parallelThreadNumber; i++) {
+      iterators[i] = new ArrayList<>();
+    }
+    // Equally partition the iterators as per number of threads
+    for (int i = 0; i < inputIterators.length; i++) {
+      iterators[i % parallelThreadNumber].add(inputIterators[i]);
+    }
+    return iterators;
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      for (CarbonIterator inputIterator : inputIterators) {
+        inputIterator.close();
+      }
+    }
+  }
+
+  @Override protected String getStepName() {
+    return "Input Processor";
+  }
+
+  /**
+   * This iterator wraps the list of iterators and it starts iterating the each
+   * iterator of the list one by one. It also parse the data while iterating it.
+   */
+  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private List<CarbonIterator<Object[]>> inputIterators;
+
+    private CarbonIterator<Object[]> currentIterator;
+
+    private int counter;
+
+    private int batchSize;
+
+    private boolean nextBatch;
+
+    private boolean firstTime;
+
+    private AtomicLong rowCounter;
+
+    private boolean[] noDictionaryMapping;
+
+    private DataType[] dataTypes;
+
+    private DataField[] dataFields;
+
+    private int[] orderOfData;
+
+    private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
+
+    public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
+        boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
+        DataType[] dataTypes, DataField[] dataFields,
+        Map<Integer, GenericDataType> dataFieldsWithComplexDataType) {
+      this.inputIterators = inputIterators;
+      this.batchSize = batchSize;
+      this.counter = 0;
+      // Get the first iterator from the list.
+      currentIterator = inputIterators.get(counter++);
+      this.rowCounter = rowCounter;
+      this.nextBatch = false;
+      this.firstTime = true;
+      this.noDictionaryMapping = noDictionaryMapping;
+      this.dataTypes = dataTypes;
+      this.dataFields = dataFields;
+      this.orderOfData = orderOfData;
+      this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType;
+    }
+
+    @Override public boolean hasNext() {
+      return nextBatch || internalHasNext();
+    }
+
+    private boolean internalHasNext() {
+      if (firstTime) {
+        firstTime = false;
+        currentIterator.initialize();
+      }
+      boolean hasNext = currentIterator.hasNext();
+      // If iterator is finished then check for next iterator.
+      if (!hasNext) {
+        currentIterator.close();
+        // Check next iterator is available in the list.
+        if (counter < inputIterators.size()) {
+          // Get the next iterator from the list.
+          currentIterator = inputIterators.get(counter++);
+          // Initialize the new iterator
+          currentIterator.initialize();
+          hasNext = internalHasNext();
+        }
+      }
+      return hasNext;
+    }
+
+    @Override public CarbonRowBatch next() {
+      return getBatch();
+    }
+
+    private CarbonRowBatch getBatch() {
+      // Create batch and fill it.
+      CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
+      int count = 0;
+      while (internalHasNext() && count < batchSize) {
+        carbonRowBatch.addRow(
+            new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields)));
+        count++;
+      }
+      rowCounter.getAndAdd(carbonRowBatch.getSize());
+      return carbonRowBatch;
+    }
+
+    private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) {
+      Object[] newData = new Object[data.length];
+      for (int i = 0; i < data.length; i++) {
+        if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
+          newData[i] = DataTypeUtil
+              .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+        } else {
+          // if this is a complex column then recursively comver the data into Byte Array.
+          if (dataTypes[i].isComplexType()) {
+            ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+            DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
+            try {
+              GenericDataType complextType =
+                  dataFieldsWithComplexDataType.get(dataFields[i].getColumn().getOrdinal());
+
+              complextType.writeByteArray(data[orderOfData[i]], dataOutputStream);
+
+              dataOutputStream.close();
+              newData[i] = byteArray.toByteArray();
+            } catch (Exception e) {
+              throw new CarbonDataLoadingException("Loading Exception", e);
+            }
+          } else {
+            newData[i] = data[orderOfData[i]];
+          }
+        }
+      }
+      // System.out.println(Arrays.toString(data));
+      return newData;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index f47853e..f921fd5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -240,6 +240,35 @@ public final class CarbonDataProcessorUtil {
         .toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
   }
 
+  public static void getComplexNoDictionaryMapping(DataField[] dataFields,
+      List<Integer> complexNoDictionary) {
+
+    // save the Ordinal Number in the List.
+    for (DataField field : dataFields) {
+      if (field.getColumn().isComplex()) {
+        // get the childs.
+        getComplexNoDictionaryMapping(
+            ((CarbonDimension) field.getColumn()).getListOfChildDimensions(), complexNoDictionary);
+      }
+    }
+  }
+
+  public static void getComplexNoDictionaryMapping(List<CarbonDimension> carbonDimensions,
+      List<Integer> complexNoDictionary) {
+    for (CarbonDimension carbonDimension : carbonDimensions) {
+      if (carbonDimension.isComplex()) {
+        getComplexNoDictionaryMapping(carbonDimension.getListOfChildDimensions(),
+            complexNoDictionary);
+      } else {
+        // This is primitive type. Check the encoding for NoDictionary.
+        if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+          complexNoDictionary.add(carbonDimension.getOrdinal());
+        }
+      }
+    }
+  }
+
+
   /**
    * Preparing the boolean [] to map whether the dimension use inverted index or not.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index bc2e9db..946040f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -26,6 +26,8 @@ import java.util.UUID;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
+import org.apache.carbondata.processing.loading.complexobjects.StructObject;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
 import org.apache.avro.Schema;
@@ -70,15 +72,15 @@ class AvroCarbonWriter extends CarbonWriter {
       avroSchema = avroRecord.getSchema();
     }
     List<Schema.Field> fields = avroSchema.getFields();
-    Object [] csvField = new Object[fields.size()];
+    Object[] csvField = new Object[fields.size()];
     for (int i = 0; i < fields.size(); i++) {
-      csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i), 0);
+      csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i));
     }
     return csvField;
   }
 
-  private String avroFieldToObject(Schema.Field avroField, Object fieldValue, int delimiterLevel) {
-    StringBuilder out = new StringBuilder();
+  private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
+    Object out = new Object();
     Schema.Type type = avroField.schema().getType();
     switch (type) {
       case BOOLEAN:
@@ -86,55 +88,39 @@ class AvroCarbonWriter extends CarbonWriter {
       case LONG:
       case DOUBLE:
       case STRING:
+        out = fieldValue;
+        break;
       case FLOAT:
-        out.append(fieldValue.toString());
+        Float f = (Float) fieldValue;
+        out = f.doubleValue();
         break;
       case RECORD:
         List<Schema.Field> fields = avroField.schema().getFields();
-        String delimiter = null;
-        delimiterLevel ++;
+
+        Object[] structChildObjects = new Object[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
-          if (delimiterLevel == 1) {
-            delimiter = "$";
-          } else if (delimiterLevel > 1) {
-            delimiter = ":";
-          }
-          if (i != (fields.size() - 1)) {
-            out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i),
-                delimiterLevel)).append(delimiter);
-          } else {
-            out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i),
-                delimiterLevel));
-          }
+          structChildObjects[i] =
+              avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i));
         }
+        StructObject structObject = new StructObject(structChildObjects);
+        out = structObject;
         break;
       case ARRAY:
         int size = ((ArrayList) fieldValue).size();
-        String delimiterArray = null;
-        delimiterLevel ++;
-        if (delimiterLevel == 1) {
-          delimiterArray = "$";
-        } else if (delimiterLevel > 1) {
-          delimiterArray = ":";
-        }
-
+        Object[] arrayChildObjects = new Object[size];
         for (int i = 0; i < size; i++) {
-          if (i != size - 1) {
-            out.append(avroFieldToObject(
-                new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
-                ((ArrayList) fieldValue).get(i), delimiterLevel)).append(delimiterArray);
-          } else {
-            out.append(avroFieldToObject(
-                new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
-                ((ArrayList) fieldValue).get(i), delimiterLevel));
-          }
+          arrayChildObjects[i] = (avroFieldToObject(
+              new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+              ((ArrayList) fieldValue).get(i)));
         }
+        ArrayObject arrayObject = new ArrayObject(arrayChildObjects);
+        out = arrayObject;
         break;
 
       default:
         throw new UnsupportedOperationException();
     }
-    return out.toString();
+    return out;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 397f151..76a46d0 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -330,6 +330,12 @@ public class CarbonWriterBuilder {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
     CarbonLoadModel loadModel = createLoadModel();
+
+    // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to
+    // handle multi level complex type support. As there are no conversion converter step is
+    // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder
+    // which will skip Conversion Step.
+    loadModel.setLoadWithoutCoverterStep(true);
     return new AvroCarbonWriter(loadModel);
   }
 


[2/2] carbondata git commit: [CARBONDATA-2443][SDK]Added the test cases for avro multilevel complex type support

Posted by ra...@apache.org.
[CARBONDATA-2443][SDK]Added the test cases for avro multilevel complex type support

This closes #2276


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74ea24d1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74ea24d1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74ea24d1

Branch: refs/heads/master
Commit: 74ea24d141c9a1dd1dd820750ca0387d2ff64171
Parents: ec33c11
Author: ajantha-bhat <aj...@gmail.com>
Authored: Mon May 7 14:39:40 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue May 8 16:50:38 2018 +0530

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        |   2 +-
 .../TestNonTransactionalCarbonTable.scala       | 510 ++++++++++++++++++-
 2 files changed, 499 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/74ea24d1/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index e3c07fa..ca082e1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -200,7 +200,7 @@ public class TableSchemaBuilder {
   }
 
   private String getColNameForArray(String parentFieldName) {
-    if (!parentFieldName.contains(".val")) {
+    if (!parentFieldName.endsWith(".val")) {
       return "val";
     } else {
       String[] splits = parentFieldName.split("val");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74ea24d1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 6b02d5a..c641ed3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -735,7 +735,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
 
-
     val fields = new Array[Field](3)
     fields(0) = new Field("name", DataTypes.STRING)
     fields(1) = new Field("age", DataTypes.INT)
@@ -902,9 +901,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |{"street":"ghi","city":"city3"},
         |{"street":"jkl","city":"city4"}]} """.stripMargin
 
-
-
-
     val fields = new Array[Field](3)
     fields(0) = new Field("name", DataTypes.STRING)
     fields(1) = new Field("age", DataTypes.INT)
@@ -988,20 +984,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                  |	}
                  |} """.stripMargin
 
-
-
-
-
     val fields = new Array[Field](3)
     fields(0) = new Field("name", DataTypes.STRING)
     fields(1) = new Field("age", DataTypes.INT)
 
-    val fld1 = new util.ArrayList[StructField]
-    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
-
     val fld2 = new util.ArrayList[StructField]
     fld2.add(new StructField("street", DataTypes.STRING))
     fld2.add(new StructField("city", DataTypes.STRING))
+
+    val fld1 = new util.ArrayList[StructField]
+    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
     fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1))
 
     fields(2) = new Field("address","struct",fld2)
@@ -1050,7 +1042,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
-    assert(new File(writerPath).exists())
+    assert(new File(writerPath).listFiles().length > 0)
+    cleanTestData()
   }
 
   test("Read sdk writer Avro output with both Array and Struct Type") {
@@ -1077,6 +1070,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4))))
     sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
+    cleanTestData()
   }
 
 
@@ -1102,6 +1096,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
+    cleanTestData()
   }
 
 
@@ -1128,6 +1123,497 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
+    cleanTestData()
+  }
+
+  // test multi level -- 3 levels [array of struct of array of int]
+  def buildAvroTestDataMultiLevel3(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema = """ {
+                     |	"name": "address",
+                     |	"type": "record",
+                     |	"fields": [
+                     |		{
+                     |			"name": "name",
+                     |			"type": "string"
+                     |		},
+                     |		{
+                     |			"name": "age",
+                     |			"type": "int"
+                     |		},
+                     |		{
+                     |			"name": "doorNum",
+                     |			"type": {
+                     |				"type": "array",
+                     |				"items": {
+                     |					"type": "record",
+                     |					"name": "my_address",
+                     |					"fields": [
+                     |						{
+                     |							"name": "street",
+                     |							"type": "string"
+                     |						},
+                     |						{
+                     |							"name": "city",
+                     |							"type": "string"
+                     |						},
+                     |						{
+                     |							"name": "FloorNum",
+                     |							"type": {
+                     |								"type": "array",
+                     |								"items": {
+                     |									"name": "floor",
+                     |									"type": "int"
+                     |								}
+                     |							}
+                     |						}
+                     |					]
+                     |				}
+                     |			}
+                     |		}
+                     |	]
+                     |} """.stripMargin
+    val json =
+      """ {
+        |	"name": "bob",
+        |	"age": 10,
+        |	"doorNum": [
+        |		{
+        |			"street": "abc",
+        |			"city": "city1",
+        |			"FloorNum": [0,1,2]
+        |		},
+        |		{
+        |			"street": "def",
+        |			"city": "city2",
+        |			"FloorNum": [3,4,5]
+        |		},
+        |		{
+        |			"street": "ghi",
+        |			"city": "city3",
+        |			"FloorNum": [6,7,8]
+        |		},
+        |		{
+        |			"street": "jkl",
+        |			"city": "city4",
+        |			"FloorNum": [9,10,11]
+        |		}
+        |	]
+        |} """.stripMargin
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+    fld.add(new StructField("FloorNum", DataTypes.createArrayType(DataTypes.INT)))
+
+    val fld2 = new util.ArrayList[StructField]
+    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataMultiLevel3Type(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataMultiLevel3(3, null)
+  }
+
+  // test multi level -- 3 levels [array of struct of array of int]
+  test("test multi level support : array of struct of array of int") {
+    buildAvroTestDataMultiLevel3Type()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    // TODO: Add a validation
+    /*
+    +----+---+-----------------------------------------------------------------------------------+
+    |name|age|doorNum
+                                                               |
+    +----+---+-----------------------------------------------------------------------------------+
+    |bob |10 |[[abc,city1,WrappedArray(0, 1, 2)], [def,city2,WrappedArray(3, 4, 5)], [ghi,city3,
+    WrappedArray(6, 7, 8)], [jkl,city4,WrappedArray(9, 10, 11)]]|
+    |bob |10 |[[abc,city1,WrappedArray(0, 1, 2)], [def,city2,WrappedArray(3, 4, 5)], [ghi,city3,
+    WrappedArray(6, 7, 8)], [jkl,city4,WrappedArray(9, 10, 11)]]|
+    |bob |10 |[[abc,city1,WrappedArray(0, 1, 2)], [def,city2,WrappedArray(3, 4, 5)], [ghi,city3,
+    WrappedArray(6, 7, 8)], [jkl,city4,WrappedArray(9, 10, 11)]]|
+    +----+---+-----------------------------------------------------------------------------------+*/
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    cleanTestData()
+  }
+
+
+  // test multi level -- 3 levels [array of struct of struct of string, int]
+  def buildAvroTestDataMultiLevel3_1(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema = """ {
+                     |	"name": "address",
+                     |	"type": "record",
+                     |	"fields": [
+                     |		{
+                     |			"name": "name",
+                     |			"type": "string"
+                     |		},
+                     |		{
+                     |			"name": "age",
+                     |			"type": "int"
+                     |		},
+                     |		{
+                     |			"name": "doorNum",
+                     |			"type": {
+                     |				"type": "array",
+                     |				"items": {
+                     |					"type": "record",
+                     |					"name": "my_address",
+                     |					"fields": [
+                     |						{
+                     |							"name": "street",
+                     |							"type": "string"
+                     |						},
+                     |						{
+                     |							"name": "city",
+                     |							"type": "string"
+                     |						},
+                     |						{
+                     |							"name": "FloorNum",
+                     |                			"type": {
+                     |                				"type": "record",
+                     |                				"name": "Floor",
+                     |                				"fields": [
+                     |                					{
+                     |                						"name": "wing",
+                     |                						"type": "string"
+                     |                					},
+                     |                					{
+                     |                						"name": "number",
+                     |                						"type": "int"
+                     |                					}
+                     |                				]
+                     |                			}
+                     |						}
+                     |					]
+                     |				}
+                     |			}
+                     |		}
+                     |	]
+                     |} """.stripMargin
+
+
+    val json =
+      """  {
+        |	"name": "bob",
+        |	"age": 10,
+        |	"doorNum": [
+        |		{
+        |			"street": "abc",
+        |			"city": "city1",
+        |			"FloorNum": {"wing" : "a", "number" : 1}
+        |		},
+        |		{
+        |			"street": "def",
+        |			"city": "city2",
+        |			"FloorNum": {"wing" : "b", "number" : 0}
+        |		},
+        |		{
+        |			"street": "ghi",
+        |			"city": "city3",
+        |			"FloorNum": {"wing" : "a", "number" : 2}
+        |		}
+        |	]
+        |}  """.stripMargin
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+
+    val subFld = new util.ArrayList[StructField]
+    subFld.add(new StructField("wing", DataTypes.STRING))
+    subFld.add(new StructField("number", DataTypes.INT))
+    fld.add(new StructField("FloorNum", DataTypes.createStructType(subFld)))
+
+    // array of struct of struct
+    val fld2 = new util.ArrayList[StructField]
+    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataMultiLevel3_1Type(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataMultiLevel3_1(3, null)
+  }
+
+  // test multi level -- 3 levels [array of struct of struct of string, int]
+  test("test multi level support : array of struct of struct") {
+    buildAvroTestDataMultiLevel3_1Type()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    // TODO: Add a validation
+    /*
+    +----+---+---------------------------------------------------------+
+    |name|age|doorNum                                                  |
+    +----+---+---------------------------------------------------------+
+    |bob |10 |[[abc,city1,[a,1]], [def,city2,[b,0]], [ghi,city3,[a,2]]]|
+    |bob |10 |[[abc,city1,[a,1]], [def,city2,[b,0]], [ghi,city3,[a,2]]]|
+    |bob |10 |[[abc,city1,[a,1]], [def,city2,[b,0]], [ghi,city3,[a,2]]]|
+    +----+---+---------------------------------------------------------+
+    */
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    cleanTestData()
+  }
+
+  // test multi level -- 3 levels [array of array of array of int]
+  def buildAvroTestDataMultiLevel3_2(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema = """ {
+                     |	"name": "address",
+                     |	"type": "record",
+                     |	"fields": [
+                     |		{
+                     |			"name": "name",
+                     |			"type": "string"
+                     |		},
+                     |		{
+                     |			"name": "age",
+                     |			"type": "int"
+                     |		},
+                     |		{
+                     |			"name": "BuildNum",
+                     |			"type": {
+                     |				"type": "array",
+                     |				"items": {
+                     |					"name": "FloorNum",
+                     |					"type": "array",
+                     |					"items": {
+                     |						"name": "doorNum",
+                     |						"type": "array",
+                     |						"items": {
+                     |							"name": "EachdoorNums",
+                     |							"type": "int",
+                     |							"default": -1
+                     |						}
+                     |					}
+                     |				}
+                     |			}
+                     |		}
+                     |	]
+                     |} """.stripMargin
+
+    val json =
+      """   {
+        |        	"name": "bob",
+        |        	"age": 10,
+        |        	"BuildNum": [[[1,2,3],[4,5,6]],[[10,20,30],[40,50,60]]]
+        |        }   """.stripMargin
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val subFld = new util.ArrayList[StructField]
+    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("DoorNum", DataTypes.createArrayType(DataTypes.INT), subFld))
+    // array of struct of struct
+    val doorNum = new util.ArrayList[StructField]
+    doorNum.add(new StructField("FloorNum",
+      DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.INT)), fld))
+    fields(2) = new Field("BuildNum", "array", doorNum)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataMultiLevel3_2Type(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataMultiLevel3_2(3, null)
+  }
+
+  // test multi level -- 3 levels [array of array of array of int]
+  test("test multi level support : array of array of array of int") {
+    buildAvroTestDataMultiLevel3_2Type()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    // TODO: Add a validation
+    /*
+    +----+---+---------------------------------------------------------------------------+
+    |name|age|BuildNum
+                                               |
+    +----+---+---------------------------------------------------------------------------+
+    |bob |10 |[WrappedArray(WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)), WrappedArray
+    (WrappedArray(10, 20, 30), WrappedArray(40, 50, 60))]|
+    |bob |10 |[WrappedArray(WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)), WrappedArray
+    (WrappedArray(10, 20, 30), WrappedArray(40, 50, 60))]|
+    |bob |10 |[WrappedArray(WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)), WrappedArray
+    (WrappedArray(10, 20, 30), WrappedArray(40, 50, 60))]|
+    +----+---+---------------------------------------------------------------------------+
+   */
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    cleanTestData()
   }
 
+
+
+  // test multi level -- 4 levels [array of array of array of struct]
+  def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema =  """ {
+                      |	"name": "address",
+                      |	"type": "record",
+                      |	"fields": [
+                      |		{
+                      |			"name": "name",
+                      |			"type": "string"
+                      |		},
+                      |		{
+                      |			"name": "age",
+                      |			"type": "int"
+                      |		},
+                      |		{
+                      |			"name": "BuildNum",
+                      |			"type": {
+                      |				"type": "array",
+                      |				"items": {
+                      |					"name": "FloorNum",
+                      |					"type": "array",
+                      |					"items": {
+                      |						"name": "doorNum",
+                      |						"type": "array",
+                      |						"items": {
+                      |							"name": "my_address",
+                      |							"type": "record",
+                      |							"fields": [
+                      |								{
+                      |									"name": "street",
+                      |									"type": "string"
+                      |								},
+                      |								{
+                      |									"name": "city",
+                      |									"type": "string"
+                      |								}
+                      |							]
+                      |						}
+                      |					}
+                      |				}
+                      |			}
+                      |		}
+                      |	]
+                      |} """.stripMargin
+
+    val json =
+      """ {
+        |	"name": "bob",
+        |	"age": 10,
+        |	"BuildNum": [
+        |		[
+        |			[
+        |				{"street":"abc", "city":"city1"},
+        |				{"street":"def", "city":"city2"},
+        |				{"street":"cfg", "city":"city3"}
+        |			],
+        |			[
+        |				 {"street":"abc1", "city":"city3"},
+        |				 {"street":"def1", "city":"city4"},
+        |				 {"street":"cfg1", "city":"city5"}
+        |			]
+        |		],
+        |		[
+        |			[
+        |				 {"street":"abc2", "city":"cityx"},
+        |				 {"street":"abc3", "city":"cityy"},
+        |				 {"street":"abc4", "city":"cityz"}
+        |			],
+        |			[
+        |				 {"street":"a1bc", "city":"cityA"},
+        |				 {"street":"a1bc", "city":"cityB"},
+        |				 {"street":"a1bc", "city":"cityc"}
+        |			]
+        |		]
+        |	]
+        |} """.stripMargin
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val subFld = new util.ArrayList[StructField]
+    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
+
+    val address = new util.ArrayList[StructField]
+    address.add(new StructField("street", DataTypes.STRING))
+    address.add(new StructField("city", DataTypes.STRING))
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("DoorNum",
+        DataTypes.createArrayType(DataTypes.createStructType(address)),
+        subFld))
+    // array of struct of struct
+    val doorNum = new util.ArrayList[StructField]
+    doorNum.add(new StructField("FloorNum",
+      DataTypes.createArrayType(
+        DataTypes.createArrayType(DataTypes.createStructType(address))), fld))
+    fields(2) = new Field("BuildNum", "array", doorNum)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataMultiLevel4Type(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataMultiLevel4(3, null)
+  }
+
+  // test multi level -- 4 levels [array of array of array of struct]
+  test("test multi level support : array of array of array of struct") {
+    buildAvroTestDataMultiLevel4Type()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    // TODO: Add a validation
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    cleanTestData()
+  }
+
+
 }
\ No newline at end of file