You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/27 12:50:21 UTC

[3/4] carbondata git commit: add TablePage

add TablePage


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/353272ef
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/353272ef
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/353272ef

Branch: refs/heads/master
Commit: 353272efa51129df032d98c896f1bace837895e7
Parents: ffaeb80
Author: jackylk <ja...@huawei.com>
Authored: Sat May 27 20:02:24 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sat May 27 20:02:24 2017 +0800

----------------------------------------------------------------------
 .../core/constants/IgnoreDictionary.java        |  50 --
 .../columnar/BlockIndexerStorageForShort.java   |   2 +-
 .../core/datastore/page/KeyColumnPage.java      |  42 ++
 .../schema/table/column/CarbonColumn.java       |   9 +-
 .../table/column/CarbonImplicitDimension.java   |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   4 +-
 .../examples/CarbonSessionExample.scala         |   2 +-
 .../hadoop/test/util/StoreCreator.java          |   7 -
 .../presto/CarbondataSplitManager.java          |   2 +-
 .../complexType/TestComplexTypeQuery.scala      |   2 +
 .../dataload/TestLoadDataWithHiveSyntax.scala   |  13 +
 .../partition/TestQueryForPartitionTable.scala  |   3 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   3 -
 .../spark/sql/CarbonDatasourceRelation.scala    |   2 +-
 .../scala/org/apache/spark/sql/CarbonScan.scala |   2 +-
 .../readsupport/SparkRowReadSupportImpl.java    |  10 -
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 -
 .../scala/org/apache/spark/sql/CarbonScan.scala |   2 +-
 .../execution/command/AlterTableCommands.scala  |   4 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   2 +-
 .../carbondata/query/TestNotEqualToFilter.scala |   3 -
 .../merger/CompactionResultSortProcessor.java   |  18 +-
 .../merger/RowResultMergerProcessor.java        |  11 +-
 .../merger/TupleConversionAdapter.java          |  67 ---
 .../processing/model/CarbonDataLoadSchema.java  | 125 -----
 .../processing/model/CarbonLoadModel.java       | 103 +----
 .../newflow/CarbonDataLoadConfiguration.java    | 158 ++++---
 .../newflow/DataLoadProcessBuilder.java         |   3 +-
 .../converter/impl/FieldEncoderFactory.java     |   2 +-
 .../converter/impl/RowConverterImpl.java        |   1 +
 .../processing/newflow/row/CarbonRow.java       |   1 +
 .../newflow/row/WriteStepRowUtil.java           |  86 ++++
 .../CarbonRowDataWriterProcessorStepImpl.java   |  47 +-
 .../steps/DataWriterBatchProcessorStepImpl.java |  50 +-
 .../steps/DataWriterProcessorStepImpl.java      |  22 +-
 .../sortandgroupby/sortdata/RowComparator.java  |   6 +-
 .../sortandgroupby/sortdata/SortParameters.java |   7 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 462 +++++--------------
 .../store/CarbonFactDataHandlerModel.java       | 156 ++-----
 .../processing/store/CarbonFactHandler.java     |   3 +-
 .../carbondata/processing/store/TablePage.java  | 204 ++++++++
 .../store/colgroup/ColGroupBlockStorage.java    |   8 -
 .../store/writer/AbstractFactDataWriter.java    |  13 +-
 .../store/writer/CarbonDataWriterVo.java        |  11 -
 .../writer/v3/CarbonFactDataWriterImplV3.java   |   4 +-
 .../util/CarbonDataProcessorUtil.java           | 103 +----
 .../processing/util/NonDictionaryUtil.java      | 322 +------------
 .../carbondata/processing/StoreCreator.java     |   2 +
 .../store/colgroup/ColGroupMinMaxTest.java      |  26 --
 49 files changed, 673 insertions(+), 1518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/core/src/main/java/org/apache/carbondata/core/constants/IgnoreDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/IgnoreDictionary.java b/core/src/main/java/org/apache/carbondata/core/constants/IgnoreDictionary.java
deleted file mode 100644
index 8f81cab..0000000
--- a/core/src/main/java/org/apache/carbondata/core/constants/IgnoreDictionary.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.constants;
-
-/**
- * This enum is used for determining the indexes of the
- * dimension,ignoreDictionary,measure columns.
- */
-public enum IgnoreDictionary {
-  /**
-   * POSITION WHERE DIMENSIONS R STORED IN OBJECT ARRAY.
-   */
-  DIMENSION_INDEX_IN_ROW(0),
-
-  /**
-   * POSITION WHERE BYTE[] (high cardinality) IS STORED IN OBJECT ARRAY.
-   */
-  BYTE_ARRAY_INDEX_IN_ROW(1),
-
-  /**
-   * POSITION WHERE MEASURES R STORED IN OBJECT ARRAY.
-   */
-  MEASURES_INDEX_IN_ROW(2);
-
-  private final int index;
-
-  IgnoreDictionary(int index) {
-    this.index = index;
-  }
-
-  public int getIndex() {
-    return this.index;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
index 7b47fee..aaea62b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -92,7 +92,7 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
    *
    * @param indexes
    */
-  public void compressMyOwnWay(short[] indexes) {
+  private void compressMyOwnWay(short[] indexes) {
     List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     int k = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/core/src/main/java/org/apache/carbondata/core/datastore/page/KeyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/KeyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/KeyColumnPage.java
new file mode 100644
index 0000000..c2c448c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/KeyColumnPage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+// Represent a MDK columnar data in one page.
+public class KeyColumnPage {
+
+  private byte[][][] keyVector;
+
+  public KeyColumnPage(int pageSize, int numColumn) {
+    keyVector = new byte[numColumn][][];
+    for (int i = 0; i < numColumn; i++) {
+      keyVector[i] = new byte[pageSize][];
+    }
+  }
+
+  public void putKey(int rowId, byte[][] key) {
+    for (int i = 0; i < keyVector.length; i++) {
+      keyVector[i][rowId] = key[i];
+    }
+  }
+
+  public byte[][] getKeyVector(int columnIndex) {
+    return keyVector[columnIndex];
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index 11dc21b..7cca993 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -134,11 +134,18 @@ public class CarbonColumn implements Serializable {
   /**
    * @return if column is dimension return true, else false.
    */
-  public Boolean isDimesion() {
+  public Boolean isDimension() {
     return columnSchema.isDimensionColumn();
   }
 
   /**
+   * @return true if column is measure, otherwise false
+   */
+  public Boolean isMeasure() {
+    return !isDimension();
+  }
+
+  /**
    * return the visibility
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
index 882a376..dead205 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
@@ -106,7 +106,7 @@ public class CarbonImplicitDimension extends CarbonDimension {
   /**
    * @return if column is dimension return true, else false.
    */
-  @Override public Boolean isDimesion() {
+  @Override public Boolean isDimension() {
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8e4df1a..4874f78 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -977,8 +977,8 @@ public final class CarbonUtil {
    * @return surrogate key
    */
   public static int getSurrogateKey(byte[] data, ByteBuffer buffer) {
-    int lenght = 4 - data.length;
-    for (int i = 0; i < lenght; i++) {
+    int length = 4 - data.length;
+    for (int i = 0; i < length; i++) {
       buffer.put((byte) 0);
     }
     buffer.put(data);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 8f8e891..99cbd74 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -68,7 +68,7 @@ object CarbonSessionExample {
          | complexData ARRAY<STRING>
          | )
          | STORED BY 'carbondata'
-         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+         | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
        """.stripMargin)
 
     val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 1448957..88c182e 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -382,8 +382,6 @@ public class StoreCreator {
     CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false");
     CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000");
 
-    String fileNamePrefix = "";
-
     String graphPath =
         outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName
             + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr";
@@ -421,11 +419,6 @@ public class StoreCreator {
     info.setDatabaseName(databaseName);
     info.setTableName(tableName);
 
-//    DataGraphExecuter graphExecuter = new DataGraphExecuter(dataProcessTaskStatus);
-//    graphExecuter
-//        .executeGraph(graphPath, info, loadModel.getSchema());
-    //    LoadMetadataDetails[] loadDetails =
-    //        CarbonUtil.readLoadMetadata(loadModel.schema.getCarbonTable().getMetaDataFilepath());
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
         new ArrayList<LoadMetadataDetails>());
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index f3efb36..e39ee58 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -137,7 +137,7 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
       ColumnExpression colExpression =
           new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
       //colExpression.setColIndex(cs.getSchemaOrdinal());
-      colExpression.setDimension(target.get().isDimesion());
+      colExpression.setDimension(target.get().isDimension());
       colExpression.setDimension(
           carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
       colExpression.setCarbonColumn(target.get());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
index c2c15eb..1326889 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
@@ -35,6 +35,8 @@ class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists structusingstructHive")
     sql("drop table if exists structusingarraycarbon")
     sql("drop table if exists structusingarrayhive")
+    sql("drop table if exists complexcarbonwithspecialchardelimeter")
+    sql("drop table if exists complexhivewithspecialchardelimeter")
     sql(
       "create table complexcarbontable(deviceInformationId int, channelsId string, ROMSize " +
       "string, ROMName String, purchasedate string, mobile struct<imei:string, imsi:string>, MAC " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
index 561b0d1..ba46286 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
@@ -77,6 +77,8 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
   }
 
   test("create table with smallint type and query smallint table") {
+    sql("drop table if exists smallinttable")
+    sql("drop table if exists smallinthivetable")
     sql(
       "create table smallinttable(empno smallint, empname String, designation string, " +
         "doj String, workgroupcategory int, workgroupcategoryname String,deptno int, " +
@@ -107,6 +109,8 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test data loading and validate query output") {
+    sql("drop table if exists testtable")
+    sql("drop table if exists testhivetable")
     //Create test cube and hive table
     sql(
       "CREATE table testtable (empno string, empname String, designation String, doj String, " +
@@ -148,6 +152,8 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
     * however deletion and creation of cube with same name
     */
   test("test data loading with different case file header and validate query output") {
+    sql("drop table if exists testtable1")
+    sql("drop table if exists testhivetable1")
     //Create test cube and hive table
     sql(
       "CREATE table testtable1 (empno string, empname String, designation String, doj String, " +
@@ -217,6 +223,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
   }
 
   test("complex types data loading") {
+    sql("drop table if exists complexcarbontable")
     sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
       "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
       "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
@@ -240,6 +247,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
     "complex types data loading with more unused columns and different order of complex columns " +
       "in csv and create table"
   ) {
+    sql("drop table if exists complexcarbontable")
     sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
       "mobile struct<imei:string, imsi:string>, ROMSize string, purchasedate string," +
       "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
@@ -309,6 +317,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
 
 
   test("complex types data loading with hive column having more than required column values") {
+    sql("drop table if exists complexcarbontable")
     sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
       "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
       "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
@@ -329,6 +338,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
   }
 
   test("complex types & no dictionary columns data loading") {
+    sql("drop table if exists complexcarbontable")
     sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
       "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
       "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
@@ -350,6 +360,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
   }
 
   test("array<string> and string datatype for same column is not working properly") {
+    sql("drop table if exists complexcarbontable")
     sql("create table complexcarbontable(deviceInformationId int, MAC array<string>, channelsId string, "+
         "ROMSize string, purchasedate string, gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' "+
         "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')")
@@ -370,6 +381,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
     "test carbon table data loading when table name is in different case with create table, for " +
       "UpperCase"
   ) {
+    sql("drop table if exists UPPERCASEcube")
     sql("create table UPPERCASEcube(empno Int, empname String, designation String, " +
       "doj String, workgroupcategory Int, workgroupcategoryname String, deptno Int, " +
       "deptname String, projectcode Int, projectjoindate String, projectenddate String, " +
@@ -386,6 +398,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll {
     "test carbon table data loading when table name is in different case with create table ,for " +
       "LowerCase"
   ) {
+    sql("drop table if exists lowercaseCUBE")
     sql("create table lowercaseCUBE(empno Int, empname String, designation String, " +
       "doj String, workgroupcategory Int, workgroupcategoryname String, deptno Int, " +
       "deptname String, projectcode Int, projectjoindate String, projectenddate String, " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
index 62b3ea7..57861a9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
@@ -62,6 +62,7 @@ class TestQueryForPartitionTable  extends QueryTest with BeforeAndAfterAll {
     // In
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from hashTable where empno in (11, 13)"),
       sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno in (11, 13)"))
+    sql("drop table hashTable")
   }
 
   test("detail query on partition table: range partition") {
@@ -97,6 +98,7 @@ class TestQueryForPartitionTable  extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTable where doj < '2014-08-15 00:00:00'"),
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where doj < '2014-08-15 00:00:00'"))
 
+    sql("drop table rangeTable")
   }
 
   test("detail query on partition table: list partition") {
@@ -134,6 +136,7 @@ class TestQueryForPartitionTable  extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTable where workgroupcategory < 2"),
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where workgroupcategory < 2"))
 
+    sql("drop table listTable")
   }
 
   override def afterAll = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7752cb3..7cbed58 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -88,9 +88,6 @@ object CarbonDataRDDFactory {
         carbonLoadModel
           .setSegmentUpdateStatusManager((alterTableModel.segmentUpdateStatusManager.get))
         carbonLoadModel
-          .setSegmentUpdateDetails(alterTableModel.segmentUpdateStatusManager.get
-            .getUpdateStatusDetails.toList.asJava)
-        carbonLoadModel
           .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
             .getLoadMetadataDetails.toList.asJava)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 53dd7e9..7e449b6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -251,7 +251,7 @@ case class CarbonRelation(
     val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
         .asScala
     columns.filter(!_.isInvisible).map { column =>
-      if (column.isDimesion()) {
+      if (column.isDimension()) {
         val output: DataType = column.getDataType.toString.toLowerCase match {
           case "array" =>
             CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 5006508..17d6065 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -97,7 +97,7 @@ case class CarbonScan(
     columnProjection.foreach { attr =>
       val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
       if (carbonColumn != null) {
-        if (carbonColumn.isDimesion()) {
+        if (carbonColumn.isDimension()) {
           val dim = new QueryDimension(attr.name)
           dim.setQueryOrder(queryOrder)
           queryOrder = queryOrder + 1

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index c68887f..a4bc636 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.spark.readsupport;
 import java.io.IOException;
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 
@@ -28,17 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
 public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<InternalRow> {
 
-  boolean[] isMeasure;
-
   @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
-    //can initialize and generate schema here.
-    isMeasure = new boolean[carbonColumns.length];
-    dataTypes = new DataType[carbonColumns.length];
-    for (int i = 0; i < carbonColumns.length; i++) {
-      isMeasure[i] = !carbonColumns[i].isDimesion();
-      dataTypes[i] = carbonColumns[i].getDataType();
-    }
   }
 
   @Override public InternalRow readRow(Object[] data) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index e4a89c4..6138b15 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -86,10 +86,6 @@ object CarbonDataRDDFactory {
           .setSegmentUpdateStatusManager(alterTableModel.segmentUpdateStatusManager.get)
 
         carbonLoadModel
-          .setSegmentUpdateDetails(alterTableModel.segmentUpdateStatusManager.get
-            .getUpdateStatusDetails.toList.asJava)
-
-        carbonLoadModel
           .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
             .getLoadMetadataDetails.toList.asJava)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 0bcbcfc..b91acef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -80,7 +80,7 @@ case class CarbonScan(
     attributesRaw.foreach { attr =>
       val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
       if (carbonColumn != null) {
-        if (carbonColumn.isDimesion()) {
+        if (carbonColumn.isDimension()) {
           val dim = new QueryDimension(attr.name)
           dim.setQueryOrder(queryOrder)
           queryOrder = queryOrder + 1

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 4ac3ea2..a09e14b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -293,7 +293,7 @@ private[sql] case class AlterTableDropColumns(
         tableColumns.foreach { tableColumn =>
           // column should not be already deleted and should exist in the table
           if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
-            if (tableColumn.isDimesion) {
+            if (tableColumn.isDimension) {
               keyColumnCountToBeDeleted += 1
               if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
                 dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
@@ -309,7 +309,7 @@ private[sql] case class AlterTableDropColumns(
       // take the total key column count. key column to be deleted should not
       // be >= key columns in schema
       val totalKeyColumnInSchema = tableColumns.count {
-        tableColumn => !tableColumn.isInvisible && tableColumn.isDimesion
+        tableColumn => !tableColumn.isInvisible && tableColumn.isDimension
       }
       if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
         sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index f076d9d..8151e6e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -882,7 +882,7 @@ case class CarbonRelation(
       .asScala
     // convert each column to Attribute
     columns.filter(!_.isInvisible).map { column =>
-      if (column.isDimesion()) {
+      if (column.isDimension()) {
         val output: DataType = column.getDataType.toString.toLowerCase match {
           case "array" =>
             CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/TestNotEqualToFilter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/TestNotEqualToFilter.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/TestNotEqualToFilter.scala
index cc56509..fb0188c 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/TestNotEqualToFilter.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/TestNotEqualToFilter.scala
@@ -63,9 +63,6 @@ class TestNotEqualToFilter extends QueryTest with BeforeAndAfterAll {
   }
 
   test("select Id from test_not_equal_to_carbon where id != '7'") {
-   // sql("select id from test_not_equal_to_carbon").show(200,false)
-   // sql("select id from test_not_equal_to_hive").show(200,false)
-    sql("select Id from test_not_equal_to_carbon where id > '1.5rre'").show(200, false)
     checkAnswer(
       sql("select Id from test_not_equal_to_carbon where id != '7'"),
       sql("select Id from test_not_equal_to_hive where id != '7'")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 3c71274..e0904c5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -92,7 +92,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   /**
    * agg type defined for measures
    */
-  private DataType[] aggType;
+  private DataType[] dataTypes;
   /**
    * segment id
    */
@@ -241,7 +241,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     int measureIndexInRow = 1;
     for (int i = 0; i < measureCount; i++) {
       preparedRow[dimensionColumnCount + i] =
-          getConvertedMeasureValue(row[measureIndexInRow++], aggType[i]);
+          getConvertedMeasureValue(row[measureIndexInRow++], dataTypes[i]);
     }
     return preparedRow;
   }
@@ -273,13 +273,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
       intermediateFileMerger.finish();
       finalMerger.startFinalMerge();
       while (finalMerger.hasNext()) {
-        Object[] rowRead = finalMerger.next();
-        CarbonRow row = new CarbonRow(rowRead);
-        // convert the row from surrogate key to MDKey
-        Object[] outputRow = CarbonDataProcessorUtil
-            .convertToMDKeyAndFillRow(row, segmentProperties, measureCount, noDictionaryCount,
-                segmentProperties.getComplexDimensions().size());
-        dataHandler.addDataToStore(outputRow);
+        Object[] row = finalMerger.next();
+        dataHandler.addDataToStore(new CarbonRow(row));
       }
       dataHandler.finish();
     } catch (CarbonDataWriterException e) {
@@ -307,7 +302,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private void addRowForSorting(Object[] row) throws Exception {
     try {
-      // prepare row array using RemoveDictionaryUtil class
       sortDataRows.addRow(row);
     } catch (CarbonSortKeyAndGroupByException e) {
       LOGGER.error(e);
@@ -377,7 +371,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     finalMerger =
         new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, dimensionColumnCount,
             segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount,
-            aggType, noDictionaryColMapping, noDictionarySortColumnMapping);
+            dataTypes, noDictionaryColMapping, noDictionarySortColumnMapping);
   }
 
   /**
@@ -412,6 +406,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * initialise aggregation type for measures for their storage format
    */
   private void initAggType() {
-    aggType = CarbonDataProcessorUtil.initDataType(carbonTable, tableName, measureCount);
+    dataTypes = CarbonDataProcessorUtil.initDataType(carbonTable, tableName, measureCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index ce85227..5ba2f98 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -34,6 +34,8 @@ import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 import org.apache.carbondata.processing.store.CarbonFactHandler;
@@ -51,8 +53,6 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
    */
   private AbstractQueue<RawResultIterator> recordHolderHeap;
 
-  private TupleConversionAdapter tupleConvertor;
-
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(RowResultMergerProcessor.class.getName());
 
@@ -72,7 +72,6 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
         carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
-    tupleConvertor = new TupleConversionAdapter(segProp);
   }
 
   private void initRecordHolderHeap(List<RawResultIterator> rawResultIteratorList) {
@@ -169,11 +168,9 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
    * @throws SliceMergerException
    */
   private void addRow(Object[] carbonTuple) throws SliceMergerException {
-    Object[] rowInWritableFormat;
-
-    rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
+    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop);
     try {
-      this.dataHandler.addDataToStore(rowInWritableFormat);
+      this.dataHandler.addDataToStore(row);
     } catch (CarbonDataWriterException e) {
       throw new SliceMergerException("Problem in merging the slice", e);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/TupleConversionAdapter.java b/processing/src/main/java/org/apache/carbondata/processing/merger/TupleConversionAdapter.java
deleted file mode 100644
index f508a87..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/TupleConversionAdapter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.merger;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-
-/**
- * This class will be used to convert the Result into the format used in data writer.
- */
-class TupleConversionAdapter {
-
-  private final SegmentProperties segmentproperties;
-
-  private int noDictionaryPresentIndex;
-
-  private int measureCount;
-
-  private boolean isNoDictionaryPresent;
-
-  public TupleConversionAdapter(SegmentProperties segmentProperties) {
-    this.measureCount = segmentProperties.getMeasures().size();
-    this.isNoDictionaryPresent = segmentProperties.getNumberOfNoDictionaryDimension() > 0;
-    if (isNoDictionaryPresent) {
-      noDictionaryPresentIndex++;
-    }
-    this.segmentproperties = segmentProperties;
-  }
-
-  /**
-   * Converting the raw result to the format understandable by the data writer.
-   * @param carbonTuple
-   * @return
-   */
-  public Object[] getObjectArray(Object[] carbonTuple) {
-    Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1];
-    int index = 0;
-    // put measures.
-
-    for (int j = 1; j <= measureCount; j++) {
-      row[index++] = carbonTuple[j];
-    }
-
-    // put No dictionary byte []
-    if (isNoDictionaryPresent) {
-      row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeys();
-    }
-
-    // put No Dictionary Dims
-    row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey();
-    return row;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
index 86692db..cbdd7b4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
@@ -17,8 +17,6 @@
 package org.apache.carbondata.processing.model;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
@@ -39,27 +37,12 @@ public class CarbonDataLoadSchema implements Serializable {
   private CarbonTable carbonTable;
 
   /**
-   * dimension table and relation info
-   */
-  private List<DimensionRelation> dimensionRelationList;
-
-  /**
    * CarbonDataLoadSchema constructor which takes CarbonTable
    *
    * @param carbonTable
    */
   public CarbonDataLoadSchema(CarbonTable carbonTable) {
     this.carbonTable = carbonTable;
-    this.dimensionRelationList = new ArrayList<DimensionRelation>();
-  }
-
-  /**
-   * get dimension relation list
-   *
-   * @return dimensionRelationList
-   */
-  public List<DimensionRelation> getDimensionRelationList() {
-    return dimensionRelationList;
   }
 
   /**
@@ -71,112 +54,4 @@ public class CarbonDataLoadSchema implements Serializable {
     return carbonTable;
   }
 
-  /**
-   * Dimension Relation object which will be filled from
-   * Load DML Command to support normalized table data load
-   */
-  public static class DimensionRelation implements Serializable {
-    /**
-     * default serializer
-     */
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * dimension tableName
-     */
-    private String tableName;
-
-    /**
-     * relation with fact and dimension table
-     */
-    private Relation relation;
-
-    /**
-     * Columns to selected from dimension table.
-     * Hierarchy in-memory table should be prepared
-     * based on selected columns
-     */
-    private List<String> columns;
-
-    /**
-     * constructor
-     *
-     * @param tableName       - dimension table name
-     * @param relation        - fact foreign key with dimension primary key mapping
-     * @param columns         - list of columns to be used from this dimension table
-     */
-    public DimensionRelation(String tableName, Relation relation,
-        List<String> columns) {
-      this.tableName = tableName;
-      this.relation = relation;
-      this.columns = columns;
-    }
-
-    /**
-     * @return tableName
-     */
-    public String getTableName() {
-      return tableName;
-    }
-
-    /**
-     * @return relation
-     */
-    public Relation getRelation() {
-      return relation;
-    }
-
-    /**
-     * @return columns
-     */
-    public List<String> getColumns() {
-      return columns;
-    }
-  }
-
-  /**
-   * Relation class to specify fact foreignkey column with
-   * dimension primary key column
-   */
-  public static class Relation implements Serializable {
-    /**
-     * default serializer
-     */
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * Fact foreign key column
-     */
-    private String factForeignKeyColumn;
-
-    /**
-     * dimension primary key column
-     */
-    private String dimensionPrimaryKeyColumn;
-
-    /**
-     * constructor
-     *
-     * @param factForeignKeyColumn      - Fact Table Foreign key
-     * @param dimensionPrimaryKeyColumn - Dimension Table primary key
-     */
-    public Relation(String factForeignKeyColumn, String dimensionPrimaryKeyColumn) {
-      this.factForeignKeyColumn = factForeignKeyColumn;
-      this.dimensionPrimaryKeyColumn = dimensionPrimaryKeyColumn;
-    }
-
-    /**
-     * @return factForeignKeyColumn
-     */
-    public String getFactForeignKeyColumn() {
-      return factForeignKeyColumn;
-    }
-
-    /**
-     * @return dimensionPrimaryKeyColumn
-     */
-    public String getDimensionPrimaryKeyColumn() {
-      return dimensionPrimaryKeyColumn;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index bcba974..86163b4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -22,14 +22,11 @@ import java.util.HashMap;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 
 public class CarbonLoadModel implements Serializable {
-  /**
-   *
-   */
+
   private static final long serialVersionUID = 6580168429197697465L;
 
   private String databaseName;
@@ -38,8 +35,6 @@ public class CarbonLoadModel implements Serializable {
 
   private String factFilePath;
 
-  private String dimFolderPath;
-
   private String colDictFilePath;
 
   private String partitionId;
@@ -56,7 +51,6 @@ public class CarbonLoadModel implements Serializable {
 
   private boolean isRetentionRequest;
 
-  private List<String> factFilesToProcess;
   private String csvHeader;
   private String[] csvHeaderColumns;
   private String csvDelimiter;
@@ -65,7 +59,6 @@ public class CarbonLoadModel implements Serializable {
 
   private boolean isDirectLoad;
   private List<LoadMetadataDetails> loadMetadataDetails;
-  private transient List<SegmentUpdateDetails> segmentUpdateDetails;
   private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
 
   private String blocksID;
@@ -132,25 +125,10 @@ public class CarbonLoadModel implements Serializable {
   private String maxColumns;
 
   /**
-   * the key of RDD Iterator in RDD iterator Map
-   */
-  private String rddIteratorKey;
-
-  private String carbondataFileName = "";
-
-  /**
    * defines the string to specify whether empty data is good or bad
    */
   private String isEmptyDataBadRecord;
 
-  public String getCarbondataFileName() {
-    return carbondataFileName;
-  }
-
-  public void setCarbondataFileName(String carbondataFileName) {
-    this.carbondataFileName = carbondataFileName;
-  }
-
   /**
    * Use one pass to generate dictionary
    */
@@ -180,6 +158,7 @@ public class CarbonLoadModel implements Serializable {
    * Batch sort size in mb.
    */
   private String batchSortSizeInMb;
+
   /**
    * get escape char
    *
@@ -198,24 +177,6 @@ public class CarbonLoadModel implements Serializable {
     this.escapeChar = escapeChar;
   }
 
-  /**
-   * get blocck id
-   *
-   * @return
-   */
-  public String getBlocksID() {
-    return blocksID;
-  }
-
-  /**
-   * set block id for carbon load model
-   *
-   * @param blocksID
-   */
-  public void setBlocksID(String blocksID) {
-    this.blocksID = blocksID;
-  }
-
   public String getCsvDelimiter() {
     return csvDelimiter;
   }
@@ -256,10 +217,6 @@ public class CarbonLoadModel implements Serializable {
     this.allDictPath = allDictPath;
   }
 
-  public List<String> getFactFilesToProcess() {
-    return factFilesToProcess;
-  }
-
   public String getCsvHeader() {
     return csvHeader;
   }
@@ -428,7 +385,6 @@ public class CarbonLoadModel implements Serializable {
     copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
     copyObj.csvHeader = header;
     copyObj.csvHeaderColumns = csvHeaderColumns;
-    copyObj.factFilesToProcess = filesForPartition;
     copyObj.isDirectLoad = true;
     copyObj.csvDelimiter = delimiter;
     copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
@@ -479,20 +435,6 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * @return the aggLoadRequest
-   */
-  public boolean isAggLoadRequest() {
-    return aggLoadRequest;
-  }
-
-  /**
-   * @param aggLoadRequest the aggLoadRequest to set
-   */
-  public void setAggLoadRequest(boolean aggLoadRequest) {
-    this.aggLoadRequest = aggLoadRequest;
-  }
-
-  /**
    * @param storePath The storePath to set.
    */
   public void setStorePath(String storePath) {
@@ -500,13 +442,6 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * @return Returns the aggTableName.
-   */
-  public String getAggTableName() {
-    return aggTableName;
-  }
-
-  /**
    * @return Returns the factStoreLocation.
    */
   public String getStorePath() {
@@ -514,13 +449,6 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * @param aggTableName The aggTableName to set.
-   */
-  public void setAggTableName(String aggTableName) {
-    this.aggTableName = aggTableName;
-  }
-
-  /**
    * isRetentionRequest
    *
    * @return
@@ -548,24 +476,6 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * getSegmentUpdateDetails
-   *
-   * @return
-   */
-  public List<SegmentUpdateDetails> getSegmentUpdateDetails() {
-    return segmentUpdateDetails;
-  }
-
-  /**
-   * setSegmentUpdateDetails
-   *
-   * @param segmentUpdateDetails
-   */
-  public void setSegmentUpdateDetails(List<SegmentUpdateDetails> segmentUpdateDetails) {
-    this.segmentUpdateDetails = segmentUpdateDetails;
-  }
-
-  /**
    * getSegmentUpdateStatusManager
    *
    * @return
@@ -729,15 +639,6 @@ public class CarbonLoadModel implements Serializable {
     this.badRecordsAction = badRecordsAction;
   }
 
-  public String getRddIteratorKey() {
-    return rddIteratorKey;
-  }
-
-  public void setRddIteratorKey(String rddIteratorKey) {
-    this.rddIteratorKey = rddIteratorKey;
-
-  }
-
   public boolean getUseOnePass() {
     return useOnePass;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index e1eb071..12bcb27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -22,18 +22,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.processing.newflow.converter.DictionaryCardinalityFinder;
 
 public class CarbonDataLoadConfiguration {
 
   private DataField[] dataFields;
 
-  private DataField[] dimensionFields;
-
-  private DataField[] measureFields;
-
   private AbsoluteTableIdentifier tableIdentifier;
 
   private String[] header;
@@ -46,10 +46,6 @@ public class CarbonDataLoadConfiguration {
 
   private BucketingInfo bucketingInfo;
 
-  private String numberOfColumns;
-
-  private String maxColumns;
-
   private Map<String, Object> dataLoadProperties = new HashMap<>();
 
   /**
@@ -73,6 +69,10 @@ public class CarbonDataLoadConfiguration {
 
   private int measureCount;
 
+  private int noDictionaryCount;
+
+  private int complexColumnCount;
+
   /**
    * schema updated time stamp to be used for restructure scenarios
    */
@@ -87,32 +87,29 @@ public class CarbonDataLoadConfiguration {
   public CarbonDataLoadConfiguration() {
   }
 
-  private void initDimensionFields() {
-    List<Integer> dimensionIndexes = new ArrayList<>(dataFields.length);
-    for (int i = 0; i < dataFields.length; i++) {
-      if (dataFields[i].getColumn().isDimesion()) {
-        dimensionIndexes.add(i);
+  public void setDataFields(DataField[] dataFields) {
+    this.dataFields = dataFields;
+
+    // set counts for each column category
+    for (DataField dataField : dataFields) {
+      CarbonColumn column = dataField.getColumn();
+      if (column.isDimension()) {
         dimensionCount++;
+        if (!dataField.hasDictionaryEncoding()) {
+          noDictionaryCount++;
+        }
       }
-    }
-    dimensionFields = new DataField[dimensionCount];
-    for (int i = 0; i < dimensionCount; i++) {
-      dimensionFields[i] = dataFields[dimensionIndexes.get(i)];
-    }
-  }
-
-  private void initMeasureFields() {
-    List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
-    for (int i = 0; i < dataFields.length; i++) {
-      if (!dataFields[i].getColumn().isDimesion()) {
-        measureIndexes.add(i);
+      if (column.isComplex()) {
+        complexColumnCount++;
+      }
+      if (column.isMeasure()) {
         measureCount++;
       }
     }
-    measureFields = new DataField[measureCount];
-    for (int i = 0; i < measureCount; i++) {
-      measureFields[i] = dataFields[measureIndexes.get(i)];
-    }
+  }
+
+  public DataField[] getDataFields() {
+    return dataFields;
   }
 
   public int getDimensionCount() {
@@ -120,13 +117,15 @@ public class CarbonDataLoadConfiguration {
   }
 
   public int getNoDictionaryCount() {
-    int dimCount = 0;
-    for (int i = 0; i < dataFields.length; i++) {
-      if (dataFields[i].getColumn().isDimesion() && !dataFields[i].hasDictionaryEncoding()) {
-        dimCount++;
-      }
-    }
-    return dimCount;
+    return noDictionaryCount;
+  }
+
+  public int getComplexColumnCount() {
+    return complexColumnCount;
+  }
+
+  public int getMeasureCount() {
+    return measureCount;
   }
 
   public void setNumberOfSortColumns(int numberOfSortColumns) {
@@ -149,30 +148,6 @@ public class CarbonDataLoadConfiguration {
     return this.numberOfNoDictSortColumns;
   }
 
-  public int getComplexDimensionCount() {
-    int dimCount = 0;
-    for (int i = 0; i < dataFields.length; i++) {
-      if (dataFields[i].getColumn().isComplex()) {
-        dimCount++;
-      }
-    }
-    return dimCount;
-  }
-
-  public int getMeasureCount() {
-    return measureCount;
-  }
-
-  public DataField[] getDataFields() {
-    return dataFields;
-  }
-
-  public void setDataFields(DataField[] dataFields) {
-    this.dataFields = dataFields;
-    initDimensionFields();
-    initMeasureFields();
-  }
-
   public String[] getHeader() {
     return header;
   }
@@ -213,14 +188,6 @@ public class CarbonDataLoadConfiguration {
     this.taskNo = taskNo;
   }
 
-  public void setMaxColumns(String maxColumns) {
-    this.maxColumns = maxColumns;
-  }
-
-  public void setNumberOfColumns(int numberOfColumns) {
-    this.numberOfColumns = String.valueOf(numberOfColumns);
-  }
-
   public void setDataLoadProperty(String key, Object value) {
     dataLoadProperties.put(key, value);
   }
@@ -269,14 +236,6 @@ public class CarbonDataLoadConfiguration {
     this.preFetch = preFetch;
   }
 
-  public DataField[] getDimensionFields() {
-    return dimensionFields;
-  }
-
-  public DataField[] getMeasureFields() {
-    return measureFields;
-  }
-
   public long getSchemaUpdatedTimeStamp() {
     return schemaUpdatedTimeStamp;
   }
@@ -292,4 +251,51 @@ public class CarbonDataLoadConfiguration {
   public void setCardinalityFinder(DictionaryCardinalityFinder cardinalityFinder) {
     this.cardinalityFinder = cardinalityFinder;
   }
+
+  public DataType[] getMeasureDataType() {
+    List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
+    int measureCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (!dataFields[i].getColumn().isDimension()) {
+        measureIndexes.add(i);
+        measureCount++;
+      }
+    }
+
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
+    }
+    return type;
+  }
+
+  public int[] calcDimensionLengths() {
+    int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
+    if (!isSortTable()) {
+      for (int i = 0; i < dimLensWithComplex.length; i++) {
+        if (dimLensWithComplex[i] != 0) {
+          dimLensWithComplex[i] = Integer.MAX_VALUE;
+        }
+      }
+    }
+    List<Integer> dimsLenList = new ArrayList<Integer>();
+    for (int eachDimLen : dimLensWithComplex) {
+      if (eachDimLen != 0) dimsLenList.add(eachDimLen);
+    }
+    int[] dimLens = new int[dimsLenList.size()];
+    for (int i = 0; i < dimsLenList.size(); i++) {
+      dimLens[i] = dimsLenList.get(i);
+    }
+    return dimLens;
+  }
+
+  public KeyGenerator[] createKeyGeneratorForComplexDimension() {
+    int[] dimLens = calcDimensionLengths();
+    KeyGenerator[] complexKeyGenerators = new KeyGenerator[dimLens.length];
+    for (int i = 0; i < dimLens.length; i++) {
+      complexKeyGenerators[i] =
+          KeyGeneratorFactory.getKeyGenerator(new int[] { dimLens[i] });
+    }
+    return complexKeyGenerators;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index a261f3f..e257ce3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -55,8 +55,7 @@ public final class DataLoadProcessBuilder {
 
   public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
       CarbonIterator[] inputIterators) throws Exception {
-    CarbonDataLoadConfiguration configuration =
-        createConfiguration(loadModel, storeLocation);
+    CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
     SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
     if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
       return buildInternalForNoSort(inputIterators, configuration);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index 158f3f0..e9b0a78 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -67,7 +67,7 @@ public class FieldEncoderFactory {
       Map<Object, Integer> localCache, boolean isEmptyBadRecord)
       throws IOException {
     // Converters are only needed for dimensions and measures it return null.
-    if (dataField.getColumn().isDimesion()) {
+    if (dataField.getColumn().isDimension()) {
       if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
           !dataField.getColumn().isComplex()) {
         return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 5a476da..dfc7495 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -148,6 +148,7 @@ public class RowConverterImpl implements RowConverter {
 
   @Override
   public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
+    //TODO: only copy if it is bad record
     CarbonRow copy = row.getCopy();
     logHolder.setLogged(false);
     logHolder.clear();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
index 93911d0..ac7b7f7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
@@ -69,4 +69,5 @@ public class CarbonRow {
   @Override public String toString() {
     return Arrays.toString(data);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/row/WriteStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/WriteStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/WriteStepRowUtil.java
new file mode 100644
index 0000000..1290243
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/WriteStepRowUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.row;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+
+// Utility to create and retrieve data from CarbonRow in write step.
+public class WriteStepRowUtil {
+
+  // In write step, the element of CarbonRow is:
+  // 0: Dictionary dimension columns, encoded as int for each column
+  // 1: No dictionary and complex columns, they are all together encoded as one element (byte[][])
+  // 2: Measure columns, encoded as Object for each column
+
+  public static final int DICTIONARY_DIMENSION = 0;
+  public static final int NO_DICTIONARY_AND_COMPLEX = 1;
+  public static final int MEASURE = 2;
+
+  public static CarbonRow fromColumnCategory(int[] dictDimensions, byte[][] noDictAndComplex,
+      Object[] measures) {
+    Object[] row = new Object[3];
+    row[DICTIONARY_DIMENSION] = dictDimensions;
+    row[NO_DICTIONARY_AND_COMPLEX] = noDictAndComplex;
+    row[MEASURE] = measures;
+    return new CarbonRow(row);
+  }
+
+  public static CarbonRow fromMergerRow(Object[] row, SegmentProperties segmentProperties) {
+    Object[] converted = new Object[3];
+
+    // dictionary dimension
+    byte[] mdk = ((ByteArrayWrapper) row[0]).getDictionaryKey();
+    long[] keys = segmentProperties.getDimensionKeyGenerator().getKeyArray(mdk);
+    int[] dictDimensions = new int[keys.length];
+    for (int i = 0; i < keys.length; i++) {
+      dictDimensions[i] = Long.valueOf(keys[i]).intValue();
+    }
+    converted[DICTIONARY_DIMENSION] = dictDimensions;
+
+    // no dictionary and complex dimension
+    converted[NO_DICTIONARY_AND_COMPLEX] = ((ByteArrayWrapper) row[0]).getNoDictionaryKeys();
+
+    // measure
+    int measureCount = row.length - 1;
+    Object[] measures = new Object[measureCount];
+    System.arraycopy(row, 1, measures, 0, measureCount);
+    converted[MEASURE] = measures;
+
+    return new CarbonRow(converted);
+  }
+
+  private static int[] getDictDimension(CarbonRow row) {
+    return (int[]) row.getData()[DICTIONARY_DIMENSION];
+  }
+
+  public static byte[] getMdk(CarbonRow row, KeyGenerator keyGenerator) throws KeyGenException {
+    return keyGenerator.generateKey(getDictDimension(row));
+  }
+
+  public static byte[][] getNoDictAndComplexDimension(CarbonRow row) {
+    return (byte[][]) row.getData()[NO_DICTIONARY_AND_COMPLEX];
+  }
+
+  public static Object[] getMeasure(CarbonRow row) {
+    return (Object[]) row.getData()[MEASURE];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
index 0f0a5b0..1c9649c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -26,9 +26,7 @@ import java.util.concurrent.Future;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -39,6 +37,7 @@ import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 import org.apache.carbondata.processing.store.CarbonFactHandler;
 import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
@@ -54,10 +53,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonRowDataWriterProcessorStepImpl.class.getName());
 
-  private SegmentProperties segmentProperties;
-
-  private KeyGenerator keyGenerator;
-
   private int dimensionWithComplexCount;
 
   private int noDictWithComplextCount;
@@ -111,21 +106,16 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       writeCounter = new long[iterators.length];
       dimensionWithComplexCount = configuration.getDimensionCount();
       noDictWithComplextCount =
-          configuration.getNoDictionaryCount() + configuration.getComplexDimensionCount();
+          configuration.getNoDictionaryCount() + configuration.getComplexColumnCount();
       dimensionCount = configuration.getDimensionCount() - noDictWithComplextCount;
       isNoDictionaryDimensionColumn =
           CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
-      measureDataType = CarbonDataProcessorUtil
-          .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
-
+      measureDataType = configuration.getMeasureDataType();
       CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
           .createCarbonFactDataHandlerModel(configuration,
               getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
       measureCount = dataHandlerModel.getMeasureCount();
       outputLength = measureCount + (this.noDictWithComplextCount > 0 ? 1 : 0) + 1;
-      segmentProperties = dataHandlerModel.getSegmentProperties();
-      keyGenerator = segmentProperties.getDimensionKeyGenerator();
-
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
               System.currentTimeMillis());
@@ -232,17 +222,17 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
    * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    * | Part                     | Object item                    | describe                 |
    * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-   * | Object[0 ~ d-1]          | int, byte[], ...               | measures                 |
+   * | Object[d+1]              | byte[]                         | mdkey                    |
    * ----------------------------------------------------------------------------------------
    * | Object[d]                | byte[b+c][]                    | no dict + complex dim    |
    * ----------------------------------------------------------------------------------------
-   * | Object[d+1]              | byte[]                         | mdkey                    |
+   * | Object[0 ~ d-1]          | int, byte[], ...               | measures                 |
    * ----------------------------------------------------------------------------------------
    *
    * @param row
    * @return
    */
-  private Object[] convertRow(CarbonRow row) throws KeyGenException {
+  private CarbonRow convertRow(CarbonRow row) throws KeyGenException {
     int dictIndex = 0;
     int nonDicIndex = 0;
     int[] dim = new int[this.dimensionCount];
@@ -261,34 +251,31 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
     }
 
-    int l = 0;
-    Object[] outputRow = new Object[outputLength];
-    for (; l < this.measureCount; l++) {
-      Object value = row.getObject(l + this.dimensionWithComplexCount);
+    Object[] measures = new Object[outputLength];
+    for (int i = 0; i < this.measureCount; i++) {
+      Object value = row.getObject(i + this.dimensionWithComplexCount);
       if (null != value) {
-        if (measureDataType[l] == DataType.DECIMAL) {
+        if (measureDataType[i] == DataType.DECIMAL) {
           BigDecimal val = (BigDecimal) value;
-          outputRow[l] = DataTypeUtil.bigDecimalToByte(val);
+          measures[i] = DataTypeUtil.bigDecimalToByte(val);
         } else {
-          outputRow[l] = value;
+          measures[i] = value;
         }
       } else {
-        outputRow[l] = null;
+        measures[i] = null;
       }
     }
 
-    if (this.noDictWithComplextCount > 0) {
-      outputRow[l++] = nonDicArray;
-    }
-    outputRow[l] = keyGenerator.generateKey(dim);
-    return outputRow;
+    return WriteStepRowUtil.fromColumnCategory(dim, nonDicArray, measures);
   }
 
   private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler, int iteratorIndex)
       throws CarbonDataLoadingException {
     try {
       while (batch.hasNext()) {
-        dataHandler.addDataToStore(convertRow(batch.next()));
+        CarbonRow row = batch.next();
+        CarbonRow converted = convertRow(row);
+        dataHandler.addDataToStore(converted);
         readCounter[iteratorIndex]++;
       }
       writeCounter[iteratorIndex] += batch.getSize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
index ae2b625..b1f4761 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
@@ -22,9 +22,6 @@ import java.util.Iterator;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
@@ -48,18 +45,6 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
 
-  private int noDictionaryCount;
-
-  private int complexDimensionCount;
-
-  private int measureCount;
-
-  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
-
-  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
-
-  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
-
   public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
@@ -88,13 +73,6 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     String tableName = tableIdentifier.getTableName();
     try {
-      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
-          .createCarbonFactDataHandlerModel(configuration,
-              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
-      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
-      complexDimensionCount = configuration.getComplexDimensionCount();
-      measureCount = dataHandlerModel.getMeasureCount();
-
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
               System.currentTimeMillis());
@@ -109,7 +87,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
           CarbonFactHandler dataHandler = CarbonFactHandlerFactory
               .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
           dataHandler.initialise();
-          processBatch(next, dataHandler, model.getSegmentProperties());
+          processBatch(next, dataHandler);
           finish(tableName, dataHandler);
         }
         i++;
@@ -152,34 +130,12 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     }
   }
 
-  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler,
-      SegmentProperties segmentProperties) throws Exception {
+  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) throws Exception {
     int batchSize = 0;
-    KeyGenerator keyGenerator = segmentProperties.getDimensionKeyGenerator();
     while (batch.hasNext()) {
       CarbonRow row = batch.next();
+      dataHandler.addDataToStore(row);
       batchSize++;
-      /*
-      * The order of the data is as follows,
-      * Measuredata, nodictionary/complex byte array data, dictionary(MDK generated key)
-      */
-      int len;
-      // adding one for the high cardinality dims byte array.
-      if (noDictionaryCount > 0 || complexDimensionCount > 0) {
-        len = measureCount + 1 + 1;
-      } else {
-        len = measureCount + 1;
-      }
-      Object[] outputRow = new Object[len];;
-
-      int l = 0;
-      Object[] measures = row.getObjectArray(measureIndex);
-      for (int i = 0; i < measureCount; i++) {
-        outputRow[l++] = measures[i];
-      }
-      outputRow[l] = row.getObject(noDimByteArrayIndex);
-      outputRow[len - 1] = keyGenerator.generateKey(row.getIntArray(dimsArrayIndex));
-      dataHandler.addDataToStore(outputRow);
     }
     rowCounter.getAndAdd(batchSize);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index a85a34f..cf63a8c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
@@ -46,14 +45,6 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
 
-  private SegmentProperties segmentProperties;
-
-  private int noDictionaryCount;
-
-  private int complexDimensionCount;
-
-  private int measureCount;
-
   private long readCounter;
 
   public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
@@ -84,13 +75,6 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     String tableName = tableIdentifier.getTableName();
     try {
-      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
-          .createCarbonFactDataHandlerModel(configuration,
-              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
-      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
-      complexDimensionCount = configuration.getComplexDimensionCount();
-      measureCount = dataHandlerModel.getMeasureCount();
-      segmentProperties = dataHandlerModel.getSegmentProperties();
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
               System.currentTimeMillis());
@@ -170,12 +154,8 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     try {
       while (batch.hasNext()) {
         CarbonRow row = batch.next();
+        dataHandler.addDataToStore(row);
         readCounter++;
-        // convert the row from surrogate key to MDKey
-        Object[] outputRow = CarbonDataProcessorUtil
-            .convertToMDKeyAndFillRow(row, segmentProperties, measureCount, noDictionaryCount,
-                complexDimensionCount);
-        dataHandler.addDataToStore(outputRow);
       }
     } catch (Exception e) {
       throw new CarbonDataLoadingException("unable to generate the mdkey", e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
index 2584048..4b15b6c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
@@ -20,8 +20,8 @@ package org.apache.carbondata.processing.sortandgroupby.sortdata;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 
-import org.apache.carbondata.core.constants.IgnoreDictionary;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
 import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 public class RowComparator implements Comparator<Object[]> {
@@ -56,7 +56,7 @@ public class RowComparator implements Comparator<Object[]> {
     for (boolean isNoDictionary : noDictionarySortColumnMaping) {
 
       if (isNoDictionary) {
-        byte[] byteArr1 = (byte[]) rowA[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()];
+        byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
 
         ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
 
@@ -64,7 +64,7 @@ public class RowComparator implements Comparator<Object[]> {
         NonDictionaryUtil
             .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
 
-        byte[] byteArr2 = (byte[]) rowB[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()];
+        byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
 
         ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);