You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/13 20:01:19 UTC

[2/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
deleted file mode 100644
index 5837f0c..0000000
--- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.integration.spark.testsuite.validation;
-
-import org.apache.spark.sql.common.util.CarbonHiveContext;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.reader.CarbonFooterReader;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.format.BlockletIndex;
-import org.apache.carbondata.format.BlockletInfo;
-import org.apache.carbondata.format.DataChunk;
-import org.apache.carbondata.format.Encoding;
-import org.apache.carbondata.format.FileFooter;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class FileFooterValidator {
-
-  private static FileFooter fileFooter;
-
-  private static boolean setUpIsDone;
-
-  @Before public void setUp() throws Exception {
-
-    if (setUpIsDone) {
-      return;
-    }
-    CarbonHiveContext.sql(
-            "CREATE CUBE validatefooter DIMENSIONS (empno Integer, empname String,"
-            + " designation String,"
-            + " doj Timestamp, workgroupcategory Integer, workgroupcategoryname String, "
-            + "deptno Integer, deptname String, projectcode Integer, projectjoindate Timestamp,"
-            + " projectenddate Timestamp) MEASURES (attendance Integer,utilization Integer,"
-            + "salary Integer) OPTIONS (PARTITIONER [PARTITION_COUNT=1])");
-    CarbonHiveContext.sql(
-            "LOAD DATA fact from './src/test/resources/data.csv' INTO CUBE validatefooter "
-                + "PARTITIONDATA(DELIMITER ',', QUOTECHAR '\"')");
-    String storePath =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
-    CarbonTableIdentifier tableIdentifier =
-            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "validatefooter", "1");
-    String segmentPath = CarbonStorePath.getCarbonTablePath(storePath, tableIdentifier)
-        .getCarbonDataDirectoryPath("0", "0");
-    CarbonFile carbonFile =
-        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-    CarbonFile[] list = carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-          return true;
-        }
-        return false;
-      }
-    });
-
-    for (CarbonFile file : list) {
-      String fileLocation = file.getAbsolutePath();
-      CarbonFile factFile =
-          FileFactory.getCarbonFile(fileLocation, FileFactory.getFileType(fileLocation));
-      long offset = factFile.getSize() - CarbonCommonConstants.LONG_SIZE_IN_BYTE;
-      FileHolder fileHolder = FileFactory.getFileHolder(FileFactory.getFileType(fileLocation));
-      offset = fileHolder.readLong(fileLocation, offset);
-      CarbonFooterReader metaDataReader = new CarbonFooterReader(fileLocation, offset);
-      fileFooter = metaDataReader.readFooter();
-    }
-    setUpIsDone = true;
-  }
-
-  @AfterClass public static void tearDownAfterClass() {
-    CarbonHiveContext.sql("drop CUBE validatefooter");
-  }
-
-  @Test public void testFileFooterExist() {
-    assertTrue(fileFooter != null);
-  }
-
-  @Test public void testFileFooterVersion() {
-    assertTrue(fileFooter.getVersion() >= 0);
-  }
-
-  @Test public void testFileFooterNumRows() {
-    assertTrue(fileFooter.getNum_rows() > 0);
-  }
-
-  @Test public void testFileFooterTableColumns() {
-    assertTrue(fileFooter.getTable_columns() != null && fileFooter.getTable_columns().size() > 0);
-  }
-
-  @Test public void testFileFooterSegmentInfo() {
-    assertTrue(
-        fileFooter.getSegment_info() != null && fileFooter.getSegment_info().getNum_cols() > 0
-            && fileFooter.getSegment_info().getColumn_cardinalities().size() > 0);
-  }
-
-  @Test public void testFileFooterBlockletIndex() {
-    assertTrue(fileFooter.getBlocklet_index_list() != null
-        && fileFooter.getBlocklet_index_list().size() > 0);
-    for (BlockletIndex blockletIndex : fileFooter.getBlocklet_index_list()) {
-      assertTrue(blockletIndex.getMin_max_index().getMin_values() != null
-          && blockletIndex.getMin_max_index().getMin_values().size() > 0
-          && blockletIndex.getMin_max_index().getMax_values() != null
-          && blockletIndex.getMin_max_index().getMax_values().size() > 0
-          && blockletIndex.getMin_max_index().getMin_values().size() == blockletIndex
-          .getMin_max_index().getMax_values().size());
-      assertTrue(blockletIndex.getB_tree_index().getStart_key() != null
-          && blockletIndex.getB_tree_index().getEnd_key() != null);
-    }
-  }
-
-  @Test public void testFileFooterBlockletInfo() {
-    assertTrue(fileFooter.getBlocklet_info_list() != null
-        && fileFooter.getBlocklet_info_list().size() > 0);
-    for (BlockletInfo blockletInfo : fileFooter.getBlocklet_info_list()) {
-      assertTrue(blockletInfo.getNum_rows() > 0 && blockletInfo.getColumn_data_chunks() != null
-          && blockletInfo.getColumn_data_chunks().size() > 0);
-      for (DataChunk columnDataChunk : blockletInfo.getColumn_data_chunks()) {
-        testColumnDataChunk(columnDataChunk);
-      }
-    }
-  }
-
-  private void testColumnDataChunk(DataChunk columnDatachunk) {
-    assertTrue(columnDatachunk.getEncoders() != null && columnDatachunk.getChunk_meta() != null
-        && columnDatachunk.getChunk_meta().getCompression_codec() != null);
-    // For Measure
-    if (columnDatachunk.getEncoders().contains(Encoding.DELTA)) {
-      assertTrue(
-          columnDatachunk.getPresence() != null && columnDatachunk.getEncoder_meta() != null);
-    } else {
-      assertTrue(columnDatachunk.getSort_state() != null);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
new file mode 100644
index 0000000..b0e4833
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.events.ChangeEvent
+import org.apache.carbondata.core.indexstore.schema.FilterType
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.util.CarbonProperties
+
+class C2DataMapFactory() extends DataMapFactory {
+
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapName: String): Unit = {}
+
+  override def fireEvent(event: ChangeEvent[_]): Unit = ???
+
+  override def clear(segmentId: String): Unit = ???
+
+  override def clear(): Unit = ???
+
+  override def getDataMap(distributable: DataMapDistributable): DataMap = ???
+
+  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+
+  override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+
+  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO)
+}
+
+class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
+
+  def buildTestData(numRows: Int): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numRows)
+      .map(x => ("a", "b", x))
+      .toDF("c1", "c2", "c3")
+  }
+
+  def dropTable(): Unit = {
+    sql("DROP TABLE IF EXISTS carbon1")
+    sql("DROP TABLE IF EXISTS carbon2")
+  }
+
+  override def beforeAll {
+    dropTable()
+  }
+
+  test("test write datamap 2 pages") {
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      AbsoluteTableIdentifier.from(storeLocation, "default", "carbon1"),
+      classOf[C2DataMapFactory],
+      "test")
+
+    val df = buildTestData(33000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+    assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+      "blocklet start 0",
+      "add page data: blocklet 0, page 0",
+      "add page data: blocklet 0, page 1",
+      "blocklet end: 0"
+    ))
+    DataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  test("test write datamap 2 blocklet") {
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      AbsoluteTableIdentifier.from(storeLocation, "default", "carbon2"),
+      classOf[C2DataMapFactory],
+      "test")
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.blockletgroup.size.in.mb", "1")
+
+    val df = buildTestData(300000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon2")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+    assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+      "blocklet start 0",
+      "add page data: blocklet 0, page 0",
+      "add page data: blocklet 0, page 1",
+      "add page data: blocklet 0, page 2",
+      "add page data: blocklet 0, page 3",
+      "add page data: blocklet 0, page 4",
+      "add page data: blocklet 0, page 5",
+      "add page data: blocklet 0, page 6",
+      "add page data: blocklet 0, page 7",
+      "blocklet end: 0",
+      "blocklet start 1",
+      "add page data: blocklet 1, page 0",
+      "add page data: blocklet 1, page 1",
+      "blocklet end: 1"
+    ))
+    DataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  override def afterAll {
+    dropTable()
+  }
+}
+
+object DataMapWriterSuite {
+  var callbackSeq: Seq[String] = Seq[String]()
+
+  val dataMapWriterC2Mock = new DataMapWriter {
+
+    override def onPageAdded(
+        blockletId: Int,
+        pageId: Int,
+        pages: Array[ColumnPage]): Unit = {
+      assert(pages.length == 1)
+      assert(pages(0).getDataType == DataType.BYTE_ARRAY)
+      val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
+      assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
+      callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
+    }
+
+    override def onBlockletEnd(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet end: $blockletId"
+    }
+
+    override def onBlockEnd(blockId: String): Unit = {
+      callbackSeq :+= s"block end $blockId"
+    }
+
+    override def onBlockletStart(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet start $blockletId"
+    }
+
+    override def onBlockStart(blockId: String): Unit = {
+      callbackSeq :+= s"block start $blockId"
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index e0829ed..a6a8835 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -256,10 +256,7 @@ object DataManagementFunc {
       compactionModel.compactionType
     )
 
-    val future: Future[Void] = executor
-        .submit(new CompactionCallable(compactionCallableModel
-        )
-        )
+    val future: Future[Void] = executor.submit(new CompactionCallable(compactionCallableModel))
     futureList.add(future)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index a32146a..90f57a9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -31,9 +31,9 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.indexstore.DataMapStoreManager
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 4620db0..1837c04 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.indexstore.DataMapStoreManager
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
index e92d06d..697b727 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
@@ -43,6 +43,7 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf
       sql("""select c1 from iud_db_sub.dest"""),
       Seq(Row("c"), Row("d"), Row("e"))
     )
+    sql("drop table if exists iud_db_sub.dest")
   }
 
   test("delete data from  carbon table[where IN (sub query with where clause) ]") {
@@ -54,10 +55,12 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf
       sql("""select c1 from iud_db_sub.dest"""),
       Seq(Row("a"), Row("c"), Row("d"), Row("e"))
     )
+    sql("drop table if exists iud_db_sub.dest")
   }
 
   override def afterAll {
-    sql("use default")
+    sql("drop table if exists iud_db_sub.source2")
     sql("drop database  if exists iud_db_sub cascade")
+    sql("use default")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
deleted file mode 100644
index 12fe27b..0000000
--- a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-
-/**
- * Generic DataType interface which will be used while data loading for complex types like Array &
- * Struct
- */
-public interface GenericDataType<T> {
-
-  /**
-   * @return name of the column
-   */
-  String getName();
-
-  /**
-   * @return - columns parent name
-   */
-  String getParentname();
-
-  /**
-   * @param children - To add children dimension for parent complex type
-   */
-  void addChildren(GenericDataType children);
-
-  /**
-   * @param primitiveChild - Returns all primitive type columns in complex type
-   */
-  void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
-
-  /**
-   * writes to byte stream
-   * @param dataOutputStream
-   * @throws IOException
-   */
-  void writeByteArray(T input, DataOutputStream dataOutputStream)
-      throws IOException, DictionaryGenerationException;
-
-  /**
-   * @return surrogateIndex for primitive column in complex type
-   */
-  int getSurrogateIndex();
-
-  /**
-   * @param surrIndex - surrogate index of primitive column in complex type
-   */
-  void setSurrogateIndex(int surrIndex);
-
-  /**
-   * converts integer surrogate to bit packed surrogate value
-   * @param byteArrayInput
-   * @param dataOutputStream
-   * @param generator
-   * @throws IOException
-   * @throws KeyGenException
-   */
-  void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
-      KeyGenerator[] generator) throws IOException, KeyGenException;
-
-  /**
-   * @return columns count of each complex type
-   */
-  int getColsCount();
-
-  /**
-   * @return column uuid string
-   */
-  String getColumnId();
-
-  /**
-   * set array index to be referred while creating metadata column
-   * @param outputArrayIndex
-   */
-  void setOutputArrayIndex(int outputArrayIndex);
-
-  /**
-   * @return array index count of metadata column
-   */
-  int getMaxOutputArrayIndex();
-
-  /**
-   * Split byte array into complex metadata column and primitive column
-   * @param columnsArray
-   * @param inputArray
-   */
-  void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
-
-  /**
-   * @return current read row count
-   */
-  int getDataCounter();
-
-  /**
-   * fill agg key block including complex types
-   * @param aggKeyBlockWithComplex
-   * @param aggKeyBlock
-   */
-  void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
-
-  /**
-   * fill block key size including complex types
-   * @param blockKeySizeWithComplex
-   * @param primitiveBlockKeySize
-   */
-  void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
-
-  /**
-   * fill cardinality value including complex types
-   * @param dimCardWithComplex
-   * @param maxSurrogateKeyArray
-   */
-  void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
-
-  /**
-   * Fill the cardinality of the primitive datatypes
-   * @param dimCardWithComplex
-   */
-  void fillCardinality(List<Integer> dimCardWithComplex);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
new file mode 100644
index 0000000..4b0113c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datamap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.processing.store.TablePage;
+
+/**
+ * It is for writing DataMap for one table
+ */
+public class DataMapWriterListener {
+
+  private static final LogService LOG = LogServiceFactory.getLogService(
+      DataMapWriterListener.class.getCanonicalName());
+
+  // list indexed column name -> list of data map writer
+  private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>();
+
+  /**
+   * register all datamap writer for specified table and segment
+   */
+  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+    List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+    if (tableDataMaps != null) {
+      for (TableDataMap tableDataMap : tableDataMaps) {
+        DataMapFactory factory = tableDataMap.getDataMapFactory();
+        register(factory, segmentId);
+      }
+    }
+  }
+
+  /**
+   * Register a DataMapWriter
+   */
+  private void register(DataMapFactory factory, String segmentId) {
+    assert (factory != null);
+    assert (segmentId != null);
+    DataMapMeta meta = factory.getMeta();
+    if (meta == null) {
+      // if data map does not have meta, no need to register
+      return;
+    }
+    List<String> columns = factory.getMeta().getIndexedColumns();
+    List<DataMapWriter> writers = registry.get(columns);
+    DataMapWriter writer = factory.createWriter(segmentId);
+    if (writers != null) {
+      writers.add(writer);
+    } else {
+      writers = new ArrayList<>();
+      writers.add(writer);
+      registry.put(columns, writers);
+    }
+    LOG.info("DataMapWriter " + writer + " added");
+  }
+
+  public void onBlockStart(String blockId) {
+    for (List<DataMapWriter> writers : registry.values()) {
+      for (DataMapWriter writer : writers) {
+        writer.onBlockStart(blockId);
+      }
+    }
+  }
+
+  public void onBlockEnd(String blockId) {
+    for (List<DataMapWriter> writers : registry.values()) {
+      for (DataMapWriter writer : writers) {
+        writer.onBlockEnd(blockId);
+      }
+    }
+  }
+
+  public void onBlockletStart(int blockletId) {
+    for (List<DataMapWriter> writers : registry.values()) {
+      for (DataMapWriter writer : writers) {
+        writer.onBlockletStart(blockletId);
+      }
+    }
+  }
+
+  public void onBlockletEnd(int blockletId) {
+    for (List<DataMapWriter> writers : registry.values()) {
+      for (DataMapWriter writer : writers) {
+        writer.onBlockletEnd(blockletId);
+      }
+    }
+  }
+
+  /**
+   * Pick corresponding column pages and add to all registered datamap
+   *
+   * @param pageId     sequence number of page, start from 0
+   * @param tablePage  page data
+   */
+  public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
+    Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
+    for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
+      List<String> indexedColumns = entry.getKey();
+      ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
+      for (int i = 0; i < indexedColumns.size(); i++) {
+        pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
+      }
+      List<DataMapWriter> writers = entry.getValue();
+      for (DataMapWriter writer : writers) {
+        writer.onPageAdded(blockletId, pageId, pages);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index f5fdd4d..02ceb06 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
new file mode 100644
index 0000000..6b54d2d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datatypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+
+/**
+ * Generic DataType interface which will be used while data loading for complex types like Array &
+ * Struct
+ */
+public interface GenericDataType<T> {
+
+  /**
+   * @return name of the column
+   */
+  String getName();
+
+  /**
+   * @return - columns parent name
+   */
+  String getParentname();
+
+  /**
+   * @param children - To add children dimension for parent complex type
+   */
+  void addChildren(GenericDataType children);
+
+  /**
+   * @param primitiveChild - Returns all primitive type columns in complex type
+   */
+  void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
+
+  /**
+   * writes to byte stream
+   * @param dataOutputStream
+   * @throws IOException
+   */
+  void writeByteArray(T input, DataOutputStream dataOutputStream)
+      throws IOException, DictionaryGenerationException;
+
+  /**
+   * @return surrogateIndex for primitive column in complex type
+   */
+  int getSurrogateIndex();
+
+  /**
+   * @param surrIndex - surrogate index of primitive column in complex type
+   */
+  void setSurrogateIndex(int surrIndex);
+
+  /**
+   * converts integer surrogate to bit packed surrogate value
+   * @param byteArrayInput
+   * @param dataOutputStream
+   * @param generator
+   * @throws IOException
+   * @throws KeyGenException
+   */
+  void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+      KeyGenerator[] generator) throws IOException, KeyGenException;
+
+  /**
+   * @return columns count of each complex type
+   */
+  int getColsCount();
+
+  /**
+   * @return column uuid string
+   */
+  String getColumnId();
+
+  /**
+   * set array index to be referred while creating metadata column
+   * @param outputArrayIndex
+   */
+  void setOutputArrayIndex(int outputArrayIndex);
+
+  /**
+   * @return array index count of metadata column
+   */
+  int getMaxOutputArrayIndex();
+
+  /**
+   * Split byte array into complex metadata column and primitive column
+   * @param columnsArray
+   * @param inputArray
+   */
+  void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
+
+  /**
+   * @return current read row count
+   */
+  int getDataCounter();
+
+  /**
+   * fill agg key block including complex types
+   * @param aggKeyBlockWithComplex
+   * @param aggKeyBlock
+   */
+  void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
+
+  /**
+   * fill block key size including complex types
+   * @param blockKeySizeWithComplex
+   * @param primitiveBlockKeySize
+   */
+  void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
+
+  /**
+   * fill cardinality value including complex types
+   * @param dimCardWithComplex
+   * @param maxSurrogateKeyArray
+   */
+  void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
+
+  /**
+   * Fill the cardinality of the primitive datatypes
+   * @param dimCardWithComplex
+   */
+  void fillCardinality(List<Integer> dimCardWithComplex);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index a24a324..e7e48e9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 94ee9f6..a61144e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
index d5730a2..8feea6a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
@@ -21,8 +21,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index d30582b..e9b0a78 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -23,13 +23,13 @@ import java.util.Map;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.newflow.DataField;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 9c48af7..a716340 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -35,10 +35,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -51,6 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -74,6 +73,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * data writer
    */
   private CarbonFactDataWriter dataWriter;
+
   /**
    * File manager
    */
@@ -87,11 +87,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process
    * once this size of input is reached
    */
-  private int blockletSize;
-  /**
-   * keyGenerator
-   */
-  private ColumnarSplitter columnarSplitter;
+  private int pageSize;
   /**
    * keyBlockHolder
    */
@@ -120,7 +116,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   /**
    * a private class that will hold the data for blocklets
    */
-  private BlockletDataHolder blockletDataHolder;
+  private TablePageList tablePageList;
   /**
    * number of cores configured
    */
@@ -146,8 +142,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private ColumnarFormatVersion version;
 
-  private SortScopeOptions.SortScope sortScope;
-
   /**
    * CarbonFactDataHandler constructor
    */
@@ -202,11 +196,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         noInvertedIdxCol += (cd.getColName() + ",");
       }
     }
+
     LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol);
   }
 
   private void initParameters(CarbonFactDataHandlerModel model) {
-    this.sortScope = model.getSortScope();
+    SortScopeOptions.SortScope sortScope = model.getSortScope();
     this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
 
     //TODO need to pass carbon table identifier to metadata
@@ -254,10 +249,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     consumerExecutorService = Executors.newFixedThreadPool(1);
     consumerExecutorServiceTaskList = new ArrayList<>(1);
     semaphore = new Semaphore(numberOfCores);
-    blockletDataHolder = new BlockletDataHolder();
+    tablePageList = new TablePageList();
 
     // Start the consumer which will take each blocklet/page in order and write to a file
-    Consumer consumer = new Consumer(blockletDataHolder);
+    Consumer consumer = new Consumer(tablePageList);
     consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
   }
 
@@ -314,20 +309,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.entryCount++;
     // if entry count reaches to leaf node size then we are ready to write
     // this to leaf node file and update the intermediate files
-    if (this.entryCount == this.blockletSize) {
+    if (this.entryCount == this.pageSize) {
       try {
         semaphore.acquire();
 
         producerExecutorServiceTaskList.add(
             producerExecutorService.submit(
-                new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)
+                new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, false)
             )
         );
         blockletProcessingCount.incrementAndGet();
         // set the entry count to zero
         processedDataCount += entryCount;
         LOGGER.info("Total Number Of records added to store: " + processedDataCount);
-        dataRows = new ArrayList<>(this.blockletSize);
+        dataRows = new ArrayList<>(this.pageSize);
         this.entryCount = 0;
       } catch (InterruptedException e) {
         LOGGER.error(e, e.getMessage());
@@ -339,10 +334,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   /**
    * generate the EncodedTablePage from the input rows (one page in case of V3 format)
    */
-  private EncodedTablePage processDataRows(List<CarbonRow> dataRows)
+  private TablePage processDataRows(List<CarbonRow> dataRows)
       throws CarbonDataWriterException, KeyGenException, MemoryException, IOException {
     if (dataRows.size() == 0) {
-      return EncodedTablePage.newEmptyInstance();
+      return new TablePage(model, 0);
     }
     TablePage tablePage = new TablePage(model, dataRows.size());
     int rowId = 0;
@@ -352,11 +347,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       tablePage.addRow(rowId++, row);
     }
 
-    EncodedTablePage encoded = tablePage.encode();
-    tablePage.freeMemory();
+    tablePage.encode();
 
     LOGGER.info("Number Of records processed: " + dataRows.size());
-    return encoded;
+    return tablePage;
   }
 
   /**
@@ -370,7 +364,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     try {
       semaphore.acquire();
       producerExecutorServiceTaskList.add(producerExecutorService
-          .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
+          .submit(new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, true)));
       blockletProcessingCount.incrementAndGet();
       processedDataCount += entryCount;
       closeWriterExecutionService(producerExecutorService);
@@ -471,19 +465,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private void setWritingConfiguration() throws CarbonDataWriterException {
     // get blocklet size
-    this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+    this.pageSize = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
     if (version == ColumnarFormatVersion.V3) {
-      this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+      this.pageSize = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
               CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
     }
-    LOGGER.info("Number of rows per column blocklet " + blockletSize);
-    dataRows = new ArrayList<>(this.blockletSize);
+    LOGGER.info("Number of rows per column blocklet " + pageSize);
+    dataRows = new ArrayList<>(this.pageSize);
     int dimSet =
         Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
-    // if atleast one dimension is present then initialize column splitter otherwise null
+    // if at least one dimension is present then initialize column splitter otherwise null
     int noOfColStore = colGrpModel.getNoOfColumnStore();
     int[] keyBlockSize = new int[noOfColStore + getExpandedComplexColsCount()];
 
@@ -494,16 +488,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       //row store will be in single column store
       //e.g if {0,1,2,3,4,5} is dimension and {0,1,2) is row store dimension
       //than below splitter will return column as {0,1,2}{3}{4}{5}
-      this.columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
+      ColumnarSplitter columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
       System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore);
       this.keyBlockHolder =
-          new CarbonKeyBlockHolder[this.columnarSplitter.getBlockKeySize().length];
+          new CarbonKeyBlockHolder[columnarSplitter.getBlockKeySize().length];
     } else {
       this.keyBlockHolder = new CarbonKeyBlockHolder[0];
     }
 
     for (int i = 0; i < keyBlockHolder.length; i++) {
-      this.keyBlockHolder[i] = new CarbonKeyBlockHolder(blockletSize);
+      this.keyBlockHolder[i] = new CarbonKeyBlockHolder(pageSize);
       this.keyBlockHolder[i].resetCounter();
     }
 
@@ -535,7 +529,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         .getBlockKeySize());
     System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
         blockKeySize.length - noOfColStore);
-    this.dataWriter = getFactDataWriter(keyBlockSize);
+    this.dataWriter = getFactDataWriter();
     this.dataWriter.setIsNoDictionary(isNoDictionary);
     // initialize the channel;
     this.dataWriter.initializeWriter();
@@ -574,21 +568,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   /**
    * Below method will be used to get the fact data writer instance
    *
-   * @param keyBlockSize
    * @return data writer instance
    */
-  private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
+  private CarbonFactDataWriter<?> getFactDataWriter() {
     return CarbonDataWriterFactory.getInstance()
-        .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
+        .getFactDataWriter(version, getDataWriterVo());
   }
 
   /**
    * Below method will be used to get the writer vo
    *
-   * @param keyBlockSize size of each key block
    * @return data writer vo object
    */
-  private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) {
+  private CarbonDataWriterVo getDataWriterVo() {
     CarbonDataWriterVo carbonDataWriterVo = new CarbonDataWriterVo();
     carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
     carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
@@ -608,6 +600,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setBucketNumber(model.getBucketId());
     carbonDataWriterVo.setTaskExtension(model.getTaskExtension());
     carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp());
+    carbonDataWriterVo.setListener(model.getDataMapWriterlistener());
     return carbonDataWriterVo;
   }
 
@@ -644,14 +637,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   /**
-   * This class will hold the holder objects and manage producer and consumer for reading
-   * and writing the blocklet data
+   * This class will hold the table page data
    */
-  private final class BlockletDataHolder {
+  private final class TablePageList {
     /**
-     * array of blocklet data holder objects
+     * array of table page added by Producer and get by Consumer
      */
-    private EncodedTablePage[] encodedTablePages;
+    private TablePage[] tablePages;
     /**
      * flag to check whether the producer has completed processing for holder
      * object which is required to be picked form an index
@@ -662,8 +654,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     private int currentIndex;
 
-    private BlockletDataHolder() {
-      encodedTablePages = new EncodedTablePage[numberOfCores];
+    private TablePageList() {
+      tablePages = new TablePage[numberOfCores];
       available = new AtomicBoolean(false);
     }
 
@@ -671,32 +663,32 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      * @return a node holder object
      * @throws InterruptedException if consumer thread is interrupted
      */
-    public synchronized EncodedTablePage get() throws InterruptedException {
-      EncodedTablePage encodedTablePage = encodedTablePages[currentIndex];
+    public synchronized TablePage get() throws InterruptedException {
+      TablePage tablePage = tablePages[currentIndex];
       // if node holder is null means producer thread processing the data which has to
       // be inserted at this current index has not completed yet
-      if (null == encodedTablePage && !processingComplete) {
+      if (null == tablePage && !processingComplete) {
         available.set(false);
       }
       while (!available.get()) {
         wait();
       }
-      encodedTablePage = encodedTablePages[currentIndex];
-      encodedTablePages[currentIndex] = null;
+      tablePage = tablePages[currentIndex];
+      tablePages[currentIndex] = null;
       currentIndex++;
       // reset current index when it reaches length of node holder array
-      if (currentIndex >= encodedTablePages.length) {
+      if (currentIndex >= tablePages.length) {
         currentIndex = 0;
       }
-      return encodedTablePage;
+      return tablePage;
     }
 
     /**
      * @param encodedTablePage
      * @param index
      */
-    public synchronized void put(EncodedTablePage encodedTablePage, int index) {
-      encodedTablePages[index] = encodedTablePage;
+    public synchronized void put(TablePage tablePage, int index) {
+      tablePages[index] = tablePage;
       // notify the consumer thread when index at which object is to be inserted
       // becomes equal to current index from where data has to be picked for writing
       if (index == currentIndex) {
@@ -711,16 +703,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private final class Producer implements Callable<Void> {
 
-    private BlockletDataHolder blockletDataHolder;
+    private TablePageList tablePageList;
     private List<CarbonRow> dataRows;
-    private int sequenceNumber;
+    private int pageId;
     private boolean isLastPage;
 
-    private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows,
-        int sequenceNumber, boolean isLastPage) {
-      this.blockletDataHolder = blockletDataHolder;
+    private Producer(TablePageList tablePageList, List<CarbonRow> dataRows,
+        int pageId, boolean isLastPage) {
+      this.tablePageList = tablePageList;
       this.dataRows = dataRows;
-      this.sequenceNumber = sequenceNumber;
+      this.pageId = pageId;
       this.isLastPage = isLastPage;
     }
 
@@ -732,11 +724,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     @Override public Void call() throws Exception {
       try {
-        EncodedTablePage encodedTablePage = processDataRows(dataRows);
-        encodedTablePage.setIsLastPage(isLastPage);
+        TablePage tablePage = processDataRows(dataRows);
+        tablePage.setIsLastPage(isLastPage);
         // insert the object in array according to sequence number
-        int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
-        blockletDataHolder.put(encodedTablePage, indexInNodeHolderArray);
+        int indexInNodeHolderArray = (pageId - 1) % numberOfCores;
+        tablePageList.put(tablePage, indexInNodeHolderArray);
         return null;
       } catch (Throwable throwable) {
         LOGGER.error(throwable, "Error in producer");
@@ -752,10 +744,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private final class Consumer implements Callable<Void> {
 
-    private BlockletDataHolder blockletDataHolder;
+    private TablePageList tablePageList;
 
-    private Consumer(BlockletDataHolder blockletDataHolder) {
-      this.blockletDataHolder = blockletDataHolder;
+    private Consumer(TablePageList tablePageList) {
+      this.tablePageList = tablePageList;
     }
 
     /**
@@ -766,11 +758,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     @Override public Void call() throws Exception {
       while (!processingComplete || blockletProcessingCount.get() > 0) {
-        EncodedTablePage encodedTablePage = null;
+        TablePage tablePage = null;
         try {
-          encodedTablePage = blockletDataHolder.get();
-          if (null != encodedTablePage) {
-            dataWriter.writeTablePage(encodedTablePage);
+          tablePage = tablePageList.get();
+          if (null != tablePage) {
+            dataWriter.writeTablePage(tablePage);
+            tablePage.freeMemory();
           }
           blockletProcessingCount.decrementAndGet();
         } catch (Throwable throwable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 51ec84b..c059030 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -39,6 +38,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
@@ -159,6 +160,8 @@ public class CarbonFactDataHandlerModel {
 
   private SortScopeOptions.SortScope sortScope;
 
+  private DataMapWriterListener dataMapWriterlistener;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
@@ -254,6 +257,11 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.taskExtension = taskExtension;
     carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+
+    DataMapWriterListener listener = new DataMapWriterListener();
+    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId());
+    carbonFactDataHandlerModel.dataMapWriterlistener = listener;
+
     return carbonFactDataHandlerModel;
   }
 
@@ -557,5 +565,9 @@ public class CarbonFactDataHandlerModel {
   public SortScopeOptions.SortScope getSortScope() {
     return sortScope;
   }
+
+  public DataMapWriterListener getDataMapWriterlistener() {
+    return dataMapWriterlistener;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 03f3e5e..d2363f1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.DimensionType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -44,9 +44,8 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 
 import org.apache.spark.sql.types.Decimal;
 
@@ -73,7 +72,12 @@ public class TablePage {
 
   private TablePageKey key;
 
-  private ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+  private EncodedTablePage encodedTablePage;
+
+  private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+
+  // true if it is last page of all input rows
+  private boolean isLastPage;
 
   TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException {
     this.model = model;
@@ -240,14 +244,16 @@ public class TablePage {
     return output;
   }
 
-  EncodedTablePage encode() throws KeyGenException, MemoryException, IOException {
+  void encode() throws KeyGenException, MemoryException, IOException {
     // encode dimensions and measure
     EncodedDimensionPage[] dimensions = encodeAndCompressDimensions();
     EncodedMeasurePage[] measures = encodeAndCompressMeasures();
-    return EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
+    this.encodedTablePage = EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
   }
 
-  private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+  public EncodedTablePage getEncodedTablePage() {
+    return encodedTablePage;
+  }
 
   // apply measure and set encodedData in `encodedData`
   private EncodedMeasurePage[] encodeAndCompressMeasures()
@@ -301,6 +307,52 @@ public class TablePage {
     encodedDimensions.addAll(encodedComplexDimenions);
     return encodedDimensions.toArray(new EncodedDimensionPage[encodedDimensions.size()]);
   }
+
+  /**
+   * return column page of specified column name
+   */
+  public ColumnPage getColumnPage(String columnName) {
+    int dictDimensionIndex = -1;
+    int noDictDimensionIndex = -1;
+    ColumnPage page = null;
+    TableSpec spec = model.getTableSpec();
+    int numDimensions = spec.getNumDimensions();
+    for (int i = 0; i < numDimensions; i++) {
+      DimensionType type = spec.getDimensionSpec(i).getDimensionType();
+      if ((type == DimensionType.GLOBAL_DICTIONARY) || (type == DimensionType.DIRECT_DICTIONARY)) {
+        page = dictDimensionPages[++dictDimensionIndex];
+      } else if (type == DimensionType.PLAIN_VALUE) {
+        page = noDictDimensionPages[++noDictDimensionIndex];
+      } else {
+        // do not support datamap on complex column
+        continue;
+      }
+      String fieldName = spec.getDimensionSpec(i).getFieldName();
+      if (fieldName.equalsIgnoreCase(columnName)) {
+        return page;
+      }
+    }
+    int numMeasures = spec.getNumMeasures();
+    for (int i = 0; i < numMeasures; i++) {
+      String fieldName = spec.getMeasureSpec(i).getFieldName();
+      if (fieldName.equalsIgnoreCase(columnName)) {
+        return measurePage[i];
+      }
+    }
+    throw new IllegalArgumentException("DataMap: must have '" + columnName + "' column in schema");
+  }
+
+  public boolean isLastPage() {
+    return isLastPage;
+  }
+
+  public void setIsLastPage(boolean isWriteAll) {
+    this.isLastPage = isWriteAll;
+  }
+
+  public int getPageSize() {
+    return pageSize;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a34ed01..bcc0112 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -24,7 +24,6 @@ import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-//import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -44,17 +43,14 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMergerUtil;
@@ -66,6 +62,7 @@ import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.IndexHeader;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.file.FileData;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -97,11 +94,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   protected String carbonDataFileTempPath;
 
   /**
-   * The name of carbonData file
+   * The name of carbonData file (blockId)
    */
   protected String carbonDataFileName;
 
   /**
+   * The sequence number of blocklet inside one block
+   */
+  protected int blockletId = 0;
+
+  /**
+   * The sequence number of page inside one blocklet
+   */
+  protected int pageId = 0;
+
+  /**
    * Local cardinality for the segment
    */
   protected int[] localCardinality;
@@ -132,7 +139,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   /**
    * data block size for one carbon data file
    */
-  private long dataBlockSize;
+  private long blockSizeThreshold;
   /**
    * file size at any given point
    */
@@ -152,6 +159,11 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
 
+  /**
+   * listener to write data map
+   */
+  protected DataMapWriterListener listener;
+
   public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
     this.dataWriterVo = dataWriterVo;
     this.blockletInfoList =
@@ -163,22 +175,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     this.fileSizeInBytes =
         (long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
             * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
-    /*
-    size reserved in one file for writing block meta data. It will be in percentage
-   */
+
+    // size reserved in one file for writing block meta data. It will be in percentage
     int spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
         .getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
             CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
-    this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
-    LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + dataBlockSize);
+    this.blockSizeThreshold =
+        fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
+    LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
+        blockSizeThreshold);
 
     this.executorService = Executors.newFixedThreadPool(1);
     executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     // in case of compaction we will pass the cardinality.
     this.localCardinality = dataWriterVo.getColCardinality();
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + dataWriterVo
-            .getTableName());
+
     //TODO: We should delete the levelmetadata file after reading here.
     // so only data loading flow will need to read from cardinality file.
     if (null == this.localCardinality) {
@@ -202,6 +213,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     this.dataChunksLength = new ArrayList<>();
     blockletMetadata = new ArrayList<BlockletInfo3>();
     blockletIndex = new ArrayList<>();
+    listener = dataWriterVo.getListener();
   }
 
   /**
@@ -241,18 +253,19 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   }
 
   /**
-   * This method will be used to update the file channel with new file; new
-   * file will be created once existing file reached the file size limit This
+   * This method will be used to update the file channel with new file if exceeding block size
+   * threshold, new file will be created once existing file reached the file size limit This
    * method will first check whether existing file size is exceeded the file
    * size limit if yes then write the leaf metadata to file then set the
    * current file size to 0 close the existing file channel get the new file
    * name and get the channel for new file
    *
-   * @param blockletDataSize data size of one block
+   * @param blockletSizeToBeAdded data size of one block
    * @throws CarbonDataWriterException if any problem
    */
-  protected void updateBlockletFileChannel(long blockletDataSize) throws CarbonDataWriterException {
-    if ((currentFileSize + blockletDataSize) >= dataBlockSize && currentFileSize != 0) {
+  protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded)
+      throws CarbonDataWriterException {
+    if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
       // set the current file size to zero
       LOGGER.info("Writing data to file as max file size reached for file: "
           + carbonDataFileTempPath + " .Data block size: " + currentFileSize);
@@ -265,16 +278,42 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       this.dataChunksLength = new ArrayList<>();
       this.blockletMetadata = new ArrayList<>();
       this.blockletIndex = new ArrayList<>();
-      CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-      // rename carbon data file from in progress status to actual
-      renameCarbonDataFile();
-      executorServiceSubmitList.add(executorService
-          .submit(new CopyThread(this.carbonDataFileTempPath
-              .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')))));
+      commitCurrentFile(false);
       // initialize the new channel
       initializeWriter();
     }
-    currentFileSize += blockletDataSize;
+    currentFileSize += blockletSizeToBeAdded;
+  }
+
+  private void notifyDataMapBlockStart() {
+    if (listener != null) {
+      listener.onBlockStart(carbonDataFileName);
+    }
+  }
+
+  private void notifyDataMapBlockEnd() {
+    if (listener != null) {
+      listener.onBlockEnd(carbonDataFileName);
+    }
+    blockletId = 0;
+  }
+
+  /**
+   * Finish writing current file. It will flush stream, copy and rename temp file to final file
+   * @param copyInCurrentThread set to false if want to do data copy in a new thread
+   */
+  protected void commitCurrentFile(boolean copyInCurrentThread) {
+    notifyDataMapBlockEnd();
+    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+    // rename carbon data file from in progress status to actual
+    renameCarbonDataFile();
+    String fileName = this.carbonDataFileTempPath.substring(0,
+        this.carbonDataFileTempPath.lastIndexOf('.'));
+    if (copyInCurrentThread) {
+      copyCarbonDataFileToCarbonStorePath(fileName);
+    } else {
+      executorServiceSubmitList.add(executorService.submit(new CopyThread(fileName)));
+    }
   }
 
   /**
@@ -310,6 +349,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       throw new CarbonDataWriterException("Problem while getting the FileChannel for Leaf File",
           fileNotFoundException);
     }
+    notifyDataMapBlockStart();
   }
 
   private int initFileCount() {
@@ -433,18 +473,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @throws CarbonDataWriterException
    */
   public void closeWriter() throws CarbonDataWriterException {
-    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
     if (this.blockletInfoList.size() > 0) {
-      renameCarbonDataFile();
-      copyCarbonDataFileToCarbonStorePath(
-          this.carbonDataFileTempPath
-              .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
+      commitCurrentFile(true);
       try {
         writeIndexFile();
       } catch (IOException e) {
         throw new CarbonDataWriterException("Problem while writing the index file", e);
       }
     }
+    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
     closeExecutorService();
   }
 
@@ -590,17 +627,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   }
 
   /**
-   * This method will be used to write leaf data to file
-   * file format
-   * <key><measure1><measure2>....
-   *
-   * @throws CarbonDataWriterException
-   * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
-   */
-  public abstract void writeTablePage(EncodedTablePage encodedTablePage)
-      throws CarbonDataWriterException;
-
-  /**
    * Below method will be used to update the min or max value
    * by removing the length from it
    *
@@ -608,10 +634,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   protected byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
     return valueWithLength;
-//    ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
-//    byte[] actualValue = new byte[buffer.getShort()];
-//    buffer.get(actualValue);
-//    return actualValue;
   }
 
   /**
@@ -640,5 +662,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       copyCarbonDataFileToCarbonStorePath(fileName);
       return null;
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 225e031..26fff09 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -20,6 +20,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 
@@ -64,6 +65,8 @@ public class CarbonDataWriterVo {
 
   private int taskExtension;
 
+  private DataMapWriterListener listener;
+
   /**
    * @return the storeLocation
    */
@@ -303,4 +306,12 @@ public class CarbonDataWriterVo {
   public void setTaskExtension(int taskExtension) {
     this.taskExtension = taskExtension;
   }
+
+  public void setListener(DataMapWriterListener listener) {
+    this.listener = listener;
+  }
+
+  public DataMapWriterListener getListener() {
+    return listener;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index f194f74..3b26b7c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -18,14 +18,15 @@
 package org.apache.carbondata.processing.store.writer;
 
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.processing.store.TablePage;
 
 public interface CarbonFactDataWriter<T> {
 
   /**
    * write a encoded table page
+   * @param tablePage
    */
-  void writeTablePage(EncodedTablePage encodedTablePage) throws CarbonDataWriterException;
+  void writeTablePage(TablePage tablePage) throws CarbonDataWriterException;
 
   /**
    * Below method will be used to write the leaf meta data to file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index 0f1b52b..f849e21 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.TablePage;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 
@@ -199,14 +200,14 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     return holder;
   }
 
-  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+  @Override public void writeTablePage(TablePage tablePage)
       throws CarbonDataWriterException {
-    if (encodedTablePage.getPageSize() == 0) {
+    if (tablePage.getPageSize() == 0) {
       return;
     }
-    long blockletDataSize = encodedTablePage.getEncodedSize();
-    updateBlockletFileChannel(blockletDataSize);
-    NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
+    long blockletDataSize = tablePage.getEncodedTablePage().getEncodedSize();
+    createNewFileIfReachThreshold(blockletDataSize);
+    NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage());
     // write data to file and get its offset
     long offset = writeDataToFile(nodeHolder, fileChannel);
     // get the blocklet info for currently added blocklet

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index e19a5ce..3f49a7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.TablePage;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
 
@@ -63,19 +64,19 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
   /**
    * Below method will be used to write the data to carbon data file
    *
-   * @param encodedTablePage
+   * @param tablePage
    * @throws CarbonDataWriterException any problem in writing operation
    */
-  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+  @Override public void writeTablePage(TablePage tablePage)
       throws CarbonDataWriterException {
-    NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
-    if (encodedTablePage.getPageSize() == 0) {
+    NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage());
+    if (tablePage.getPageSize() == 0) {
       return;
     }
     // size to calculate the size of the blocklet
     int size = 0;
     // get the blocklet info object
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(encodedTablePage, 0);
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(tablePage.getEncodedTablePage(), 0);
 
     List<DataChunk2> datachunks = null;
     try {
@@ -105,7 +106,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
         nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength() + size;
     // if size of the file already reached threshold size then create a new file and get the file
     // channel object
-    updateBlockletFileChannel(blockletDataSize);
+    createNewFileIfReachThreshold(blockletDataSize);
     // writer the version header in the file if current file size is zero
     // this is done so carbondata file can be read separately
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
new file mode 100644
index 0000000..68aee95
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.processing.store.TablePage;
+
+public class BlockletDataHolder {
+  private List<EncodedTablePage> encodedTablePage;
+  private List<TablePage> rawTablePages;
+  private long currentSize;
+
+  public BlockletDataHolder() {
+    this.encodedTablePage = new ArrayList<>();
+    this.rawTablePages = new ArrayList<>();
+  }
+
+  public void clear() {
+    encodedTablePage.clear();
+    rawTablePages.clear();
+    currentSize = 0;
+  }
+
+  public void addPage(TablePage rawTablePage) {
+    EncodedTablePage encodedTablePage = rawTablePage.getEncodedTablePage();
+    this.encodedTablePage.add(encodedTablePage);
+    this.rawTablePages.add(rawTablePage);
+    currentSize += encodedTablePage.getEncodedSize();
+  }
+
+  public long getSize() {
+    // increasing it by 15 percent for data chunk 3 of each column each page
+    return currentSize + ((currentSize * 15) / 100);
+  }
+
+  public int getNumberOfPagesAdded() {
+    return encodedTablePage.size();
+  }
+
+  public int getTotalRows() {
+    int rows = 0;
+    for (EncodedTablePage nh : encodedTablePage) {
+      rows += nh.getPageSize();
+    }
+    return rows;
+  }
+
+  public List<EncodedTablePage> getEncodedTablePages() {
+    return encodedTablePage;
+  }
+
+  public List<TablePage> getRawTablePages() {
+    return rawTablePages;
+  }
+}