You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/13 20:01:18 UTC

[1/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter interface

Repository: carbondata
Updated Branches:
  refs/heads/master 85cbad246 -> f089287ce


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index adb97ae..5edd675 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.FileFooter3;
+import org.apache.carbondata.processing.store.TablePage;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 
@@ -57,22 +58,25 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
   /**
    * persist the page data to be written in the file
    */
-  private DataWriterHolder dataWriterHolder;
+  private BlockletDataHolder blockletDataHolder;
 
-  private long blockletSize;
+  /**
+   * Threshold of blocklet size in MB
+   */
+  private long blockletSizeThreshold;
 
   public CarbonFactDataWriterImplV3(CarbonDataWriterVo dataWriterVo) {
     super(dataWriterVo);
-    blockletSize = Long.parseLong(CarbonProperties.getInstance()
+    blockletSizeThreshold = Long.parseLong(CarbonProperties.getInstance()
         .getProperty(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB,
             CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE))
         * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
         * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
-    if (blockletSize > fileSizeInBytes) {
-      blockletSize = fileSizeInBytes;
-      LOGGER.info("Blocklet size configure for table is: " + blockletSize);
+    if (blockletSizeThreshold > fileSizeInBytes) {
+      blockletSizeThreshold = fileSizeInBytes;
+      LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold);
     }
-    dataWriterHolder = new DataWriterHolder();
+    blockletDataHolder = new BlockletDataHolder();
   }
 
   @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
@@ -100,88 +104,118 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
   }
 
   /**
-   * Below method will be used to write one table page data
+   * Below method will be used to write one table page data, invoked by Consumer
+   * @param tablePage
    */
-  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+  @Override public void writeTablePage(TablePage tablePage)
       throws CarbonDataWriterException {
     // condition for writting all the pages
-    if (!encodedTablePage.isLastPage()) {
+    if (!tablePage.isLastPage()) {
       boolean isAdded = false;
       // check if size more than blocklet size then write the page to file
-      if (dataWriterHolder.getSize() + encodedTablePage.getEncodedSize() >= blockletSize) {
-        // if one page size is more than blocklet size
-        if (dataWriterHolder.getEncodedTablePages().size() == 0) {
+      if (blockletDataHolder.getSize() + tablePage.getEncodedTablePage().getEncodedSize() >=
+          blockletSizeThreshold) {
+        // if blocklet size exceeds threshold, write blocklet data
+        if (blockletDataHolder.getEncodedTablePages().size() == 0) {
           isAdded = true;
-          dataWriterHolder.addPage(encodedTablePage);
+          addPageData(tablePage);
         }
 
-        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
-            + " :Rows Added: " + dataWriterHolder.getTotalRows());
+        LOGGER.info("Number of Pages for blocklet is: " + blockletDataHolder.getNumberOfPagesAdded()
+            + " :Rows Added: " + blockletDataHolder.getTotalRows());
+
         // write the data
         writeBlockletToFile();
+
       }
       if (!isAdded) {
-        dataWriterHolder.addPage(encodedTablePage);
+        addPageData(tablePage);
       }
     } else {
       //for last blocklet check if the last page will exceed the blocklet size then write
       // existing pages and then last page
-      if (encodedTablePage.getPageSize() > 0) {
-        dataWriterHolder.addPage(encodedTablePage);
+
+      if (tablePage.getPageSize() > 0) {
+        addPageData(tablePage);
       }
-      if (dataWriterHolder.getNumberOfPagesAdded() > 0) {
-        LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
-            + " :Rows Added: " + dataWriterHolder.getTotalRows());
+      if (blockletDataHolder.getNumberOfPagesAdded() > 0) {
+        LOGGER.info("Number of Pages for blocklet is: " + blockletDataHolder.getNumberOfPagesAdded()
+            + " :Rows Added: " + blockletDataHolder.getTotalRows());
         writeBlockletToFile();
       }
     }
   }
 
+  private void addPageData(TablePage tablePage) {
+    blockletDataHolder.addPage(tablePage);
+    if (listener != null) {
+      if (pageId == 0) {
+        listener.onBlockletStart(blockletId);
+      }
+      listener.onPageAdded(blockletId, pageId++, tablePage);
+    }
+  }
+
   /**
-   * Write one blocklet data to file
+   * Write the collect blocklet data (blockletDataHolder) to file
    */
   private void writeBlockletToFile() {
     // get the list of all encoded table page
-    List<EncodedTablePage> encodedTablePageList = dataWriterHolder.getEncodedTablePages();
+    List<EncodedTablePage> encodedTablePageList = blockletDataHolder.getEncodedTablePages();
     int numDimensions = encodedTablePageList.get(0).getNumDimensions();
     int numMeasures = encodedTablePageList.get(0).getNumMeasures();
-    long blockletDataSize = 0;
     // get data chunks for all the column
     byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][];
+    long metadataSize = fillDataChunk(encodedTablePageList, dataChunkBytes);
+    // calculate the total size of data to be written
+    long blockletSize = blockletDataHolder.getSize() + metadataSize;
+    // to check if data size will exceed the block size then create a new file
+    createNewFileIfReachThreshold(blockletSize);
+
+    // write data to file
+    try {
+      if (fileChannel.size() == 0) {
+        // write the header if file is empty
+        writeHeaderToFile(fileChannel);
+      }
+      writeBlockletToFile(fileChannel, dataChunkBytes);
+      if (listener != null) {
+        listener.onBlockletEnd(blockletId++);
+      }
+      pageId = 0;
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem when writing file", e);
+    }
+    // clear the data holder
+    blockletDataHolder.clear();
+
+  }
+
+  /**
+   * Fill dataChunkBytes and return total size of page metadata
+   */
+  private long fillDataChunk(List<EncodedTablePage> encodedTablePageList, byte[][] dataChunkBytes) {
+    int size = 0;
+    int numDimensions = encodedTablePageList.get(0).getNumDimensions();
+    int numMeasures = encodedTablePageList.get(0).getNumMeasures();
     int measureStartIndex = numDimensions;
     // calculate the size of data chunks
     try {
       for (int i = 0; i < numDimensions; i++) {
         dataChunkBytes[i] = CarbonUtil.getByteArray(
             CarbonMetadataUtil.getDimensionDataChunk3(encodedTablePageList, i));
-        blockletDataSize += dataChunkBytes[i].length;
+        size += dataChunkBytes[i].length;
       }
       for (int i = 0; i < numMeasures; i++) {
         dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(
             CarbonMetadataUtil.getMeasureDataChunk3(encodedTablePageList, i));
-        blockletDataSize += dataChunkBytes[measureStartIndex].length;
+        size += dataChunkBytes[measureStartIndex].length;
         measureStartIndex++;
       }
     } catch (IOException e) {
       throw new CarbonDataWriterException("Problem while getting the data chunks", e);
     }
-    // calculate the total size of data to be written
-    blockletDataSize += dataWriterHolder.getSize();
-    // to check if data size will exceed the block size then create a new file
-    updateBlockletFileChannel(blockletDataSize);
-
-    // write data to file
-    try {
-      if (fileChannel.size() == 0) {
-        // write the header if file is empty
-        writeHeaderToFile(fileChannel);
-      }
-      writeBlockletToFile(fileChannel, dataChunkBytes);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem when writing file", e);
-    }
-    // clear the data holder
-    dataWriterHolder.clear();
+    return size;
   }
 
   /**
@@ -210,7 +244,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     List<Long> currentDataChunksOffset = new ArrayList<>();
     // to maintain the length of each data chunk in blocklet
     List<Integer> currentDataChunksLength = new ArrayList<>();
-    List<EncodedTablePage> encodedTablePages = dataWriterHolder.getEncodedTablePages();
+    List<EncodedTablePage> encodedTablePages = blockletDataHolder.getEncodedTablePages();
     int numberOfDimension = encodedTablePages.get(0).getNumDimensions();
     int numberOfMeasures = encodedTablePages.get(0).getNumMeasures();
     ByteBuffer buffer = null;
@@ -258,7 +292,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
             encodedTablePages, dataWriterVo.getSegmentProperties().getMeasures()));
     BlockletInfo3 blockletInfo3 =
         new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength,
-            dimensionOffset, measureOffset, dataWriterHolder.getEncodedTablePages().size());
+            dimensionOffset, measureOffset, blockletDataHolder.getEncodedTablePages().size());
     blockletMetadata.add(blockletInfo3);
   }
 
@@ -328,10 +362,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
    * @throws CarbonDataWriterException
    */
   public void closeWriter() throws CarbonDataWriterException {
-    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    renameCarbonDataFile();
-    copyCarbonDataFileToCarbonStorePath(
-        this.carbonDataFileTempPath.substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
+    commitCurrentFile(true);
     try {
       writeIndexFile();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
deleted file mode 100644
index 246fa86..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.writer.v3;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
-
-public class DataWriterHolder {
-  private List<EncodedTablePage> encodedTablePage;
-  private long currentSize;
-
-  public DataWriterHolder() {
-    this.encodedTablePage = new ArrayList<EncodedTablePage>();
-  }
-
-  public void clear() {
-    encodedTablePage.clear();
-    currentSize = 0;
-  }
-
-  public void addPage(EncodedTablePage encodedTablePage) {
-    this.encodedTablePage.add(encodedTablePage);
-    currentSize += encodedTablePage.getEncodedSize();
-  }
-
-  public long getSize() {
-    // increasing it by 15 percent for data chunk 3 of each column each page
-    return currentSize + ((currentSize * 15) / 100);
-  }
-
-  public int getNumberOfPagesAdded() {
-    return encodedTablePage.size();
-  }
-
-  public int getTotalRows() {
-    int rows = 0;
-    for (EncodedTablePage nh : encodedTablePage) {
-      rows += nh.getPageSize();
-    }
-    return rows;
-  }
-
-  public List<EncodedTablePage> getEncodedTablePages() {
-    return encodedTablePage;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index b46a42c..f823ade 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.DimensionType;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -50,6 +49,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.model.CarbonDataLoadSchema;


[2/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter interface

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
deleted file mode 100644
index 5837f0c..0000000
--- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.integration.spark.testsuite.validation;
-
-import org.apache.spark.sql.common.util.CarbonHiveContext;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.reader.CarbonFooterReader;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.format.BlockletIndex;
-import org.apache.carbondata.format.BlockletInfo;
-import org.apache.carbondata.format.DataChunk;
-import org.apache.carbondata.format.Encoding;
-import org.apache.carbondata.format.FileFooter;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class FileFooterValidator {
-
-  private static FileFooter fileFooter;
-
-  private static boolean setUpIsDone;
-
-  @Before public void setUp() throws Exception {
-
-    if (setUpIsDone) {
-      return;
-    }
-    CarbonHiveContext.sql(
-            "CREATE CUBE validatefooter DIMENSIONS (empno Integer, empname String,"
-            + " designation String,"
-            + " doj Timestamp, workgroupcategory Integer, workgroupcategoryname String, "
-            + "deptno Integer, deptname String, projectcode Integer, projectjoindate Timestamp,"
-            + " projectenddate Timestamp) MEASURES (attendance Integer,utilization Integer,"
-            + "salary Integer) OPTIONS (PARTITIONER [PARTITION_COUNT=1])");
-    CarbonHiveContext.sql(
-            "LOAD DATA fact from './src/test/resources/data.csv' INTO CUBE validatefooter "
-                + "PARTITIONDATA(DELIMITER ',', QUOTECHAR '\"')");
-    String storePath =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
-    CarbonTableIdentifier tableIdentifier =
-            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "validatefooter", "1");
-    String segmentPath = CarbonStorePath.getCarbonTablePath(storePath, tableIdentifier)
-        .getCarbonDataDirectoryPath("0", "0");
-    CarbonFile carbonFile =
-        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-    CarbonFile[] list = carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-          return true;
-        }
-        return false;
-      }
-    });
-
-    for (CarbonFile file : list) {
-      String fileLocation = file.getAbsolutePath();
-      CarbonFile factFile =
-          FileFactory.getCarbonFile(fileLocation, FileFactory.getFileType(fileLocation));
-      long offset = factFile.getSize() - CarbonCommonConstants.LONG_SIZE_IN_BYTE;
-      FileHolder fileHolder = FileFactory.getFileHolder(FileFactory.getFileType(fileLocation));
-      offset = fileHolder.readLong(fileLocation, offset);
-      CarbonFooterReader metaDataReader = new CarbonFooterReader(fileLocation, offset);
-      fileFooter = metaDataReader.readFooter();
-    }
-    setUpIsDone = true;
-  }
-
-  @AfterClass public static void tearDownAfterClass() {
-    CarbonHiveContext.sql("drop CUBE validatefooter");
-  }
-
-  @Test public void testFileFooterExist() {
-    assertTrue(fileFooter != null);
-  }
-
-  @Test public void testFileFooterVersion() {
-    assertTrue(fileFooter.getVersion() >= 0);
-  }
-
-  @Test public void testFileFooterNumRows() {
-    assertTrue(fileFooter.getNum_rows() > 0);
-  }
-
-  @Test public void testFileFooterTableColumns() {
-    assertTrue(fileFooter.getTable_columns() != null && fileFooter.getTable_columns().size() > 0);
-  }
-
-  @Test public void testFileFooterSegmentInfo() {
-    assertTrue(
-        fileFooter.getSegment_info() != null && fileFooter.getSegment_info().getNum_cols() > 0
-            && fileFooter.getSegment_info().getColumn_cardinalities().size() > 0);
-  }
-
-  @Test public void testFileFooterBlockletIndex() {
-    assertTrue(fileFooter.getBlocklet_index_list() != null
-        && fileFooter.getBlocklet_index_list().size() > 0);
-    for (BlockletIndex blockletIndex : fileFooter.getBlocklet_index_list()) {
-      assertTrue(blockletIndex.getMin_max_index().getMin_values() != null
-          && blockletIndex.getMin_max_index().getMin_values().size() > 0
-          && blockletIndex.getMin_max_index().getMax_values() != null
-          && blockletIndex.getMin_max_index().getMax_values().size() > 0
-          && blockletIndex.getMin_max_index().getMin_values().size() == blockletIndex
-          .getMin_max_index().getMax_values().size());
-      assertTrue(blockletIndex.getB_tree_index().getStart_key() != null
-          && blockletIndex.getB_tree_index().getEnd_key() != null);
-    }
-  }
-
-  @Test public void testFileFooterBlockletInfo() {
-    assertTrue(fileFooter.getBlocklet_info_list() != null
-        && fileFooter.getBlocklet_info_list().size() > 0);
-    for (BlockletInfo blockletInfo : fileFooter.getBlocklet_info_list()) {
-      assertTrue(blockletInfo.getNum_rows() > 0 && blockletInfo.getColumn_data_chunks() != null
-          && blockletInfo.getColumn_data_chunks().size() > 0);
-      for (DataChunk columnDataChunk : blockletInfo.getColumn_data_chunks()) {
-        testColumnDataChunk(columnDataChunk);
-      }
-    }
-  }
-
-  private void testColumnDataChunk(DataChunk columnDatachunk) {
-    assertTrue(columnDatachunk.getEncoders() != null && columnDatachunk.getChunk_meta() != null
-        && columnDatachunk.getChunk_meta().getCompression_codec() != null);
-    // For Measure
-    if (columnDatachunk.getEncoders().contains(Encoding.DELTA)) {
-      assertTrue(
-          columnDatachunk.getPresence() != null && columnDatachunk.getEncoder_meta() != null);
-    } else {
-      assertTrue(columnDatachunk.getSort_state() != null);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
new file mode 100644
index 0000000..b0e4833
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.events.ChangeEvent
+import org.apache.carbondata.core.indexstore.schema.FilterType
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.util.CarbonProperties
+
+class C2DataMapFactory() extends DataMapFactory {
+
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapName: String): Unit = {}
+
+  override def fireEvent(event: ChangeEvent[_]): Unit = ???
+
+  override def clear(segmentId: String): Unit = ???
+
+  override def clear(): Unit = ???
+
+  override def getDataMap(distributable: DataMapDistributable): DataMap = ???
+
+  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+
+  override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+
+  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO)
+}
+
+class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
+
+  def buildTestData(numRows: Int): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numRows)
+      .map(x => ("a", "b", x))
+      .toDF("c1", "c2", "c3")
+  }
+
+  def dropTable(): Unit = {
+    sql("DROP TABLE IF EXISTS carbon1")
+    sql("DROP TABLE IF EXISTS carbon2")
+  }
+
+  override def beforeAll {
+    dropTable()
+  }
+
+  test("test write datamap 2 pages") {
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      AbsoluteTableIdentifier.from(storeLocation, "default", "carbon1"),
+      classOf[C2DataMapFactory],
+      "test")
+
+    val df = buildTestData(33000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+    assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+      "blocklet start 0",
+      "add page data: blocklet 0, page 0",
+      "add page data: blocklet 0, page 1",
+      "blocklet end: 0"
+    ))
+    DataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  test("test write datamap 2 blocklet") {
+    // register datamap writer
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      AbsoluteTableIdentifier.from(storeLocation, "default", "carbon2"),
+      classOf[C2DataMapFactory],
+      "test")
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.blockletgroup.size.in.mb", "1")
+
+    val df = buildTestData(300000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon2")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
+    assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
+      "blocklet start 0",
+      "add page data: blocklet 0, page 0",
+      "add page data: blocklet 0, page 1",
+      "add page data: blocklet 0, page 2",
+      "add page data: blocklet 0, page 3",
+      "add page data: blocklet 0, page 4",
+      "add page data: blocklet 0, page 5",
+      "add page data: blocklet 0, page 6",
+      "add page data: blocklet 0, page 7",
+      "blocklet end: 0",
+      "blocklet start 1",
+      "add page data: blocklet 1, page 0",
+      "add page data: blocklet 1, page 1",
+      "blocklet end: 1"
+    ))
+    DataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  override def afterAll {
+    dropTable()
+  }
+}
+
+object DataMapWriterSuite {
+  var callbackSeq: Seq[String] = Seq[String]()
+
+  val dataMapWriterC2Mock = new DataMapWriter {
+
+    override def onPageAdded(
+        blockletId: Int,
+        pageId: Int,
+        pages: Array[ColumnPage]): Unit = {
+      assert(pages.length == 1)
+      assert(pages(0).getDataType == DataType.BYTE_ARRAY)
+      val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
+      assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
+      callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
+    }
+
+    override def onBlockletEnd(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet end: $blockletId"
+    }
+
+    override def onBlockEnd(blockId: String): Unit = {
+      callbackSeq :+= s"block end $blockId"
+    }
+
+    override def onBlockletStart(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet start $blockletId"
+    }
+
+    override def onBlockStart(blockId: String): Unit = {
+      callbackSeq :+= s"block start $blockId"
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index e0829ed..a6a8835 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -256,10 +256,7 @@ object DataManagementFunc {
       compactionModel.compactionType
     )
 
-    val future: Future[Void] = executor
-        .submit(new CompactionCallable(compactionCallableModel
-        )
-        )
+    val future: Future[Void] = executor.submit(new CompactionCallable(compactionCallableModel))
     futureList.add(future)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index a32146a..90f57a9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -31,9 +31,9 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.indexstore.DataMapStoreManager
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 4620db0..1837c04 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.indexstore.DataMapStoreManager
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
index e92d06d..697b727 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
@@ -43,6 +43,7 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf
       sql("""select c1 from iud_db_sub.dest"""),
       Seq(Row("c"), Row("d"), Row("e"))
     )
+    sql("drop table if exists iud_db_sub.dest")
   }
 
   test("delete data from  carbon table[where IN (sub query with where clause) ]") {
@@ -54,10 +55,12 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf
       sql("""select c1 from iud_db_sub.dest"""),
       Seq(Row("a"), Row("c"), Row("d"), Row("e"))
     )
+    sql("drop table if exists iud_db_sub.dest")
   }
 
   override def afterAll {
-    sql("use default")
+    sql("drop table if exists iud_db_sub.source2")
     sql("drop database  if exists iud_db_sub cascade")
+    sql("use default")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
deleted file mode 100644
index 12fe27b..0000000
--- a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-
-/**
- * Generic DataType interface which will be used while data loading for complex types like Array &
- * Struct
- */
-public interface GenericDataType<T> {
-
-  /**
-   * @return name of the column
-   */
-  String getName();
-
-  /**
-   * @return - columns parent name
-   */
-  String getParentname();
-
-  /**
-   * @param children - To add children dimension for parent complex type
-   */
-  void addChildren(GenericDataType children);
-
-  /**
-   * @param primitiveChild - Returns all primitive type columns in complex type
-   */
-  void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
-
-  /**
-   * writes to byte stream
-   * @param dataOutputStream
-   * @throws IOException
-   */
-  void writeByteArray(T input, DataOutputStream dataOutputStream)
-      throws IOException, DictionaryGenerationException;
-
-  /**
-   * @return surrogateIndex for primitive column in complex type
-   */
-  int getSurrogateIndex();
-
-  /**
-   * @param surrIndex - surrogate index of primitive column in complex type
-   */
-  void setSurrogateIndex(int surrIndex);
-
-  /**
-   * converts integer surrogate to bit packed surrogate value
-   * @param byteArrayInput
-   * @param dataOutputStream
-   * @param generator
-   * @throws IOException
-   * @throws KeyGenException
-   */
-  void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
-      KeyGenerator[] generator) throws IOException, KeyGenException;
-
-  /**
-   * @return columns count of each complex type
-   */
-  int getColsCount();
-
-  /**
-   * @return column uuid string
-   */
-  String getColumnId();
-
-  /**
-   * set array index to be referred while creating metadata column
-   * @param outputArrayIndex
-   */
-  void setOutputArrayIndex(int outputArrayIndex);
-
-  /**
-   * @return array index count of metadata column
-   */
-  int getMaxOutputArrayIndex();
-
-  /**
-   * Split byte array into complex metadata column and primitive column
-   * @param columnsArray
-   * @param inputArray
-   */
-  void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
-
-  /**
-   * @return current read row count
-   */
-  int getDataCounter();
-
-  /**
-   * fill agg key block including complex types
-   * @param aggKeyBlockWithComplex
-   * @param aggKeyBlock
-   */
-  void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
-
-  /**
-   * fill block key size including complex types
-   * @param blockKeySizeWithComplex
-   * @param primitiveBlockKeySize
-   */
-  void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
-
-  /**
-   * fill cardinality value including complex types
-   * @param dimCardWithComplex
-   * @param maxSurrogateKeyArray
-   */
-  void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
-
-  /**
-   * Fill the cardinality of the primitive datatypes
-   * @param dimCardWithComplex
-   */
-  void fillCardinality(List<Integer> dimCardWithComplex);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
new file mode 100644
index 0000000..4b0113c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datamap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.processing.store.TablePage;
+
+/**
+ * It is for writing DataMap for one table
+ */
+public class DataMapWriterListener {
+
+  private static final LogService LOG = LogServiceFactory.getLogService(
+      DataMapWriterListener.class.getCanonicalName());
+
+  // list indexed column name -> list of data map writer
+  private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>();
+
+  /**
+   * register all datamap writer for specified table and segment
+   */
+  public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+    List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+    if (tableDataMaps != null) {
+      for (TableDataMap tableDataMap : tableDataMaps) {
+        DataMapFactory factory = tableDataMap.getDataMapFactory();
+        register(factory, segmentId);
+      }
+    }
+  }
+
+  /**
+   * Register a DataMapWriter
+   */
+  private void register(DataMapFactory factory, String segmentId) {
+    assert (factory != null);
+    assert (segmentId != null);
+    DataMapMeta meta = factory.getMeta();
+    if (meta == null) {
+      // if data map does not have meta, no need to register
+      return;
+    }
+    List<String> columns = factory.getMeta().getIndexedColumns();
+    List<DataMapWriter> writers = registry.get(columns);
+    DataMapWriter writer = factory.createWriter(segmentId);
+    if (writers != null) {
+      writers.add(writer);
+    } else {
+      writers = new ArrayList<>();
+      writers.add(writer);
+      registry.put(columns, writers);
+    }
+    LOG.info("DataMapWriter " + writer + " added");
+  }
+
+  public void onBlockStart(String blockId) {
+    for (List<DataMapWriter> writers : registry.values()) {
+      for (DataMapWriter writer : writers) {
+        writer.onBlockStart(blockId);
+      }
+    }
+  }
+
+  public void onBlockEnd(String blockId) {
+    for (List<DataMapWriter> writers : registry.values()) {
+      for (DataMapWriter writer : writers) {
+        writer.onBlockEnd(blockId);
+      }
+    }
+  }
+
+  public void onBlockletStart(int blockletId) {
+    for (List<DataMapWriter> writers : registry.values()) {
+      for (DataMapWriter writer : writers) {
+        writer.onBlockletStart(blockletId);
+      }
+    }
+  }
+
+  public void onBlockletEnd(int blockletId) {
+    for (List<DataMapWriter> writers : registry.values()) {
+      for (DataMapWriter writer : writers) {
+        writer.onBlockletEnd(blockletId);
+      }
+    }
+  }
+
+  /**
+   * Pick corresponding column pages and add to all registered datamap
+   *
+   * @param pageId     sequence number of page, start from 0
+   * @param tablePage  page data
+   */
+  public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
+    Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
+    for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
+      List<String> indexedColumns = entry.getKey();
+      ColumnPage[] pages = new ColumnPage[indexedColumns.size()];
+      for (int i = 0; i < indexedColumns.size(); i++) {
+        pages[i] = tablePage.getColumnPage(indexedColumns.get(i));
+      }
+      List<DataMapWriter> writers = entry.getValue();
+      for (DataMapWriter writer : writers) {
+        writer.onPageAdded(blockletId, pageId, pages);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index f5fdd4d..02ceb06 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
new file mode 100644
index 0000000..6b54d2d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.datatypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+
+/**
+ * Generic DataType interface which will be used while data loading for complex types like Array &
+ * Struct
+ */
+public interface GenericDataType<T> {
+
+  /**
+   * @return name of the column
+   */
+  String getName();
+
+  /**
+   * @return - columns parent name
+   */
+  String getParentname();
+
+  /**
+   * @param children - To add children dimension for parent complex type
+   */
+  void addChildren(GenericDataType children);
+
+  /**
+   * @param primitiveChild - Returns all primitive type columns in complex type
+   */
+  void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
+
+  /**
+   * writes to byte stream
+   * @param dataOutputStream
+   * @throws IOException
+   */
+  void writeByteArray(T input, DataOutputStream dataOutputStream)
+      throws IOException, DictionaryGenerationException;
+
+  /**
+   * @return surrogateIndex for primitive column in complex type
+   */
+  int getSurrogateIndex();
+
+  /**
+   * @param surrIndex - surrogate index of primitive column in complex type
+   */
+  void setSurrogateIndex(int surrIndex);
+
+  /**
+   * converts integer surrogate to bit packed surrogate value
+   * @param byteArrayInput
+   * @param dataOutputStream
+   * @param generator
+   * @throws IOException
+   * @throws KeyGenException
+   */
+  void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+      KeyGenerator[] generator) throws IOException, KeyGenException;
+
+  /**
+   * @return columns count of each complex type
+   */
+  int getColsCount();
+
+  /**
+   * @return column uuid string
+   */
+  String getColumnId();
+
+  /**
+   * set array index to be referred while creating metadata column
+   * @param outputArrayIndex
+   */
+  void setOutputArrayIndex(int outputArrayIndex);
+
+  /**
+   * @return array index count of metadata column
+   */
+  int getMaxOutputArrayIndex();
+
+  /**
+   * Split byte array into complex metadata column and primitive column
+   * @param columnsArray
+   * @param inputArray
+   */
+  void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
+
+  /**
+   * @return current read row count
+   */
+  int getDataCounter();
+
+  /**
+   * fill agg key block including complex types
+   * @param aggKeyBlockWithComplex
+   * @param aggKeyBlock
+   */
+  void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
+
+  /**
+   * fill block key size including complex types
+   * @param blockKeySizeWithComplex
+   * @param primitiveBlockKeySize
+   */
+  void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
+
+  /**
+   * fill cardinality value including complex types
+   * @param dimCardWithComplex
+   * @param maxSurrogateKeyArray
+   */
+  void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
+
+  /**
+   * Fill the cardinality of the primitive datatypes
+   * @param dimCardWithComplex
+   */
+  void fillCardinality(List<Integer> dimCardWithComplex);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index a24a324..e7e48e9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 94ee9f6..a61144e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
index d5730a2..8feea6a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
@@ -21,8 +21,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index d30582b..e9b0a78 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -23,13 +23,13 @@ import java.util.Map;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.newflow.DataField;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 9c48af7..a716340 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -35,10 +35,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -51,6 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -74,6 +73,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * data writer
    */
   private CarbonFactDataWriter dataWriter;
+
   /**
    * File manager
    */
@@ -87,11 +87,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process
    * once this size of input is reached
    */
-  private int blockletSize;
-  /**
-   * keyGenerator
-   */
-  private ColumnarSplitter columnarSplitter;
+  private int pageSize;
   /**
    * keyBlockHolder
    */
@@ -120,7 +116,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   /**
    * a private class that will hold the data for blocklets
    */
-  private BlockletDataHolder blockletDataHolder;
+  private TablePageList tablePageList;
   /**
    * number of cores configured
    */
@@ -146,8 +142,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private ColumnarFormatVersion version;
 
-  private SortScopeOptions.SortScope sortScope;
-
   /**
    * CarbonFactDataHandler constructor
    */
@@ -202,11 +196,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         noInvertedIdxCol += (cd.getColName() + ",");
       }
     }
+
     LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol);
   }
 
   private void initParameters(CarbonFactDataHandlerModel model) {
-    this.sortScope = model.getSortScope();
+    SortScopeOptions.SortScope sortScope = model.getSortScope();
     this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
 
     //TODO need to pass carbon table identifier to metadata
@@ -254,10 +249,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     consumerExecutorService = Executors.newFixedThreadPool(1);
     consumerExecutorServiceTaskList = new ArrayList<>(1);
     semaphore = new Semaphore(numberOfCores);
-    blockletDataHolder = new BlockletDataHolder();
+    tablePageList = new TablePageList();
 
     // Start the consumer which will take each blocklet/page in order and write to a file
-    Consumer consumer = new Consumer(blockletDataHolder);
+    Consumer consumer = new Consumer(tablePageList);
     consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
   }
 
@@ -314,20 +309,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.entryCount++;
     // if entry count reaches to leaf node size then we are ready to write
     // this to leaf node file and update the intermediate files
-    if (this.entryCount == this.blockletSize) {
+    if (this.entryCount == this.pageSize) {
       try {
         semaphore.acquire();
 
         producerExecutorServiceTaskList.add(
             producerExecutorService.submit(
-                new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)
+                new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, false)
             )
         );
         blockletProcessingCount.incrementAndGet();
         // set the entry count to zero
         processedDataCount += entryCount;
         LOGGER.info("Total Number Of records added to store: " + processedDataCount);
-        dataRows = new ArrayList<>(this.blockletSize);
+        dataRows = new ArrayList<>(this.pageSize);
         this.entryCount = 0;
       } catch (InterruptedException e) {
         LOGGER.error(e, e.getMessage());
@@ -339,10 +334,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   /**
    * generate the EncodedTablePage from the input rows (one page in case of V3 format)
    */
-  private EncodedTablePage processDataRows(List<CarbonRow> dataRows)
+  private TablePage processDataRows(List<CarbonRow> dataRows)
       throws CarbonDataWriterException, KeyGenException, MemoryException, IOException {
     if (dataRows.size() == 0) {
-      return EncodedTablePage.newEmptyInstance();
+      return new TablePage(model, 0);
     }
     TablePage tablePage = new TablePage(model, dataRows.size());
     int rowId = 0;
@@ -352,11 +347,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       tablePage.addRow(rowId++, row);
     }
 
-    EncodedTablePage encoded = tablePage.encode();
-    tablePage.freeMemory();
+    tablePage.encode();
 
     LOGGER.info("Number Of records processed: " + dataRows.size());
-    return encoded;
+    return tablePage;
   }
 
   /**
@@ -370,7 +364,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     try {
       semaphore.acquire();
       producerExecutorServiceTaskList.add(producerExecutorService
-          .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
+          .submit(new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, true)));
       blockletProcessingCount.incrementAndGet();
       processedDataCount += entryCount;
       closeWriterExecutionService(producerExecutorService);
@@ -471,19 +465,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private void setWritingConfiguration() throws CarbonDataWriterException {
     // get blocklet size
-    this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+    this.pageSize = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
     if (version == ColumnarFormatVersion.V3) {
-      this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+      this.pageSize = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
               CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
     }
-    LOGGER.info("Number of rows per column blocklet " + blockletSize);
-    dataRows = new ArrayList<>(this.blockletSize);
+    LOGGER.info("Number of rows per column blocklet " + pageSize);
+    dataRows = new ArrayList<>(this.pageSize);
     int dimSet =
         Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
-    // if atleast one dimension is present then initialize column splitter otherwise null
+    // if at least one dimension is present then initialize column splitter otherwise null
     int noOfColStore = colGrpModel.getNoOfColumnStore();
     int[] keyBlockSize = new int[noOfColStore + getExpandedComplexColsCount()];
 
@@ -494,16 +488,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       //row store will be in single column store
       //e.g if {0,1,2,3,4,5} is dimension and {0,1,2) is row store dimension
       //than below splitter will return column as {0,1,2}{3}{4}{5}
-      this.columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
+      ColumnarSplitter columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
       System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore);
       this.keyBlockHolder =
-          new CarbonKeyBlockHolder[this.columnarSplitter.getBlockKeySize().length];
+          new CarbonKeyBlockHolder[columnarSplitter.getBlockKeySize().length];
     } else {
       this.keyBlockHolder = new CarbonKeyBlockHolder[0];
     }
 
     for (int i = 0; i < keyBlockHolder.length; i++) {
-      this.keyBlockHolder[i] = new CarbonKeyBlockHolder(blockletSize);
+      this.keyBlockHolder[i] = new CarbonKeyBlockHolder(pageSize);
       this.keyBlockHolder[i].resetCounter();
     }
 
@@ -535,7 +529,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         .getBlockKeySize());
     System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
         blockKeySize.length - noOfColStore);
-    this.dataWriter = getFactDataWriter(keyBlockSize);
+    this.dataWriter = getFactDataWriter();
     this.dataWriter.setIsNoDictionary(isNoDictionary);
     // initialize the channel;
     this.dataWriter.initializeWriter();
@@ -574,21 +568,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   /**
    * Below method will be used to get the fact data writer instance
    *
-   * @param keyBlockSize
    * @return data writer instance
    */
-  private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
+  private CarbonFactDataWriter<?> getFactDataWriter() {
     return CarbonDataWriterFactory.getInstance()
-        .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
+        .getFactDataWriter(version, getDataWriterVo());
   }
 
   /**
    * Below method will be used to get the writer vo
    *
-   * @param keyBlockSize size of each key block
    * @return data writer vo object
    */
-  private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) {
+  private CarbonDataWriterVo getDataWriterVo() {
     CarbonDataWriterVo carbonDataWriterVo = new CarbonDataWriterVo();
     carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
     carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
@@ -608,6 +600,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setBucketNumber(model.getBucketId());
     carbonDataWriterVo.setTaskExtension(model.getTaskExtension());
     carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp());
+    carbonDataWriterVo.setListener(model.getDataMapWriterlistener());
     return carbonDataWriterVo;
   }
 
@@ -644,14 +637,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   /**
-   * This class will hold the holder objects and manage producer and consumer for reading
-   * and writing the blocklet data
+   * This class will hold the table page data
    */
-  private final class BlockletDataHolder {
+  private final class TablePageList {
     /**
-     * array of blocklet data holder objects
+     * array of table page added by Producer and get by Consumer
      */
-    private EncodedTablePage[] encodedTablePages;
+    private TablePage[] tablePages;
     /**
      * flag to check whether the producer has completed processing for holder
      * object which is required to be picked form an index
@@ -662,8 +654,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     private int currentIndex;
 
-    private BlockletDataHolder() {
-      encodedTablePages = new EncodedTablePage[numberOfCores];
+    private TablePageList() {
+      tablePages = new TablePage[numberOfCores];
       available = new AtomicBoolean(false);
     }
 
@@ -671,32 +663,32 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      * @return a node holder object
      * @throws InterruptedException if consumer thread is interrupted
      */
-    public synchronized EncodedTablePage get() throws InterruptedException {
-      EncodedTablePage encodedTablePage = encodedTablePages[currentIndex];
+    public synchronized TablePage get() throws InterruptedException {
+      TablePage tablePage = tablePages[currentIndex];
       // if node holder is null means producer thread processing the data which has to
       // be inserted at this current index has not completed yet
-      if (null == encodedTablePage && !processingComplete) {
+      if (null == tablePage && !processingComplete) {
         available.set(false);
       }
       while (!available.get()) {
         wait();
       }
-      encodedTablePage = encodedTablePages[currentIndex];
-      encodedTablePages[currentIndex] = null;
+      tablePage = tablePages[currentIndex];
+      tablePages[currentIndex] = null;
       currentIndex++;
       // reset current index when it reaches length of node holder array
-      if (currentIndex >= encodedTablePages.length) {
+      if (currentIndex >= tablePages.length) {
         currentIndex = 0;
       }
-      return encodedTablePage;
+      return tablePage;
     }
 
     /**
      * @param encodedTablePage
      * @param index
      */
-    public synchronized void put(EncodedTablePage encodedTablePage, int index) {
-      encodedTablePages[index] = encodedTablePage;
+    public synchronized void put(TablePage tablePage, int index) {
+      tablePages[index] = tablePage;
       // notify the consumer thread when index at which object is to be inserted
       // becomes equal to current index from where data has to be picked for writing
       if (index == currentIndex) {
@@ -711,16 +703,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private final class Producer implements Callable<Void> {
 
-    private BlockletDataHolder blockletDataHolder;
+    private TablePageList tablePageList;
     private List<CarbonRow> dataRows;
-    private int sequenceNumber;
+    private int pageId;
     private boolean isLastPage;
 
-    private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows,
-        int sequenceNumber, boolean isLastPage) {
-      this.blockletDataHolder = blockletDataHolder;
+    private Producer(TablePageList tablePageList, List<CarbonRow> dataRows,
+        int pageId, boolean isLastPage) {
+      this.tablePageList = tablePageList;
       this.dataRows = dataRows;
-      this.sequenceNumber = sequenceNumber;
+      this.pageId = pageId;
       this.isLastPage = isLastPage;
     }
 
@@ -732,11 +724,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     @Override public Void call() throws Exception {
       try {
-        EncodedTablePage encodedTablePage = processDataRows(dataRows);
-        encodedTablePage.setIsLastPage(isLastPage);
+        TablePage tablePage = processDataRows(dataRows);
+        tablePage.setIsLastPage(isLastPage);
         // insert the object in array according to sequence number
-        int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
-        blockletDataHolder.put(encodedTablePage, indexInNodeHolderArray);
+        int indexInNodeHolderArray = (pageId - 1) % numberOfCores;
+        tablePageList.put(tablePage, indexInNodeHolderArray);
         return null;
       } catch (Throwable throwable) {
         LOGGER.error(throwable, "Error in producer");
@@ -752,10 +744,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private final class Consumer implements Callable<Void> {
 
-    private BlockletDataHolder blockletDataHolder;
+    private TablePageList tablePageList;
 
-    private Consumer(BlockletDataHolder blockletDataHolder) {
-      this.blockletDataHolder = blockletDataHolder;
+    private Consumer(TablePageList tablePageList) {
+      this.tablePageList = tablePageList;
     }
 
     /**
@@ -766,11 +758,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     @Override public Void call() throws Exception {
       while (!processingComplete || blockletProcessingCount.get() > 0) {
-        EncodedTablePage encodedTablePage = null;
+        TablePage tablePage = null;
         try {
-          encodedTablePage = blockletDataHolder.get();
-          if (null != encodedTablePage) {
-            dataWriter.writeTablePage(encodedTablePage);
+          tablePage = tablePageList.get();
+          if (null != tablePage) {
+            dataWriter.writeTablePage(tablePage);
+            tablePage.freeMemory();
           }
           blockletProcessingCount.decrementAndGet();
         } catch (Throwable throwable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 51ec84b..c059030 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -39,6 +38,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
@@ -159,6 +160,8 @@ public class CarbonFactDataHandlerModel {
 
   private SortScopeOptions.SortScope sortScope;
 
+  private DataMapWriterListener dataMapWriterlistener;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
@@ -254,6 +257,11 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.taskExtension = taskExtension;
     carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+
+    DataMapWriterListener listener = new DataMapWriterListener();
+    listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId());
+    carbonFactDataHandlerModel.dataMapWriterlistener = listener;
+
     return carbonFactDataHandlerModel;
   }
 
@@ -557,5 +565,9 @@ public class CarbonFactDataHandlerModel {
   public SortScopeOptions.SortScope getSortScope() {
     return sortScope;
   }
+
+  public DataMapWriterListener getDataMapWriterlistener() {
+    return dataMapWriterlistener;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 03f3e5e..d2363f1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.DimensionType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -44,9 +44,8 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 
 import org.apache.spark.sql.types.Decimal;
 
@@ -73,7 +72,12 @@ public class TablePage {
 
   private TablePageKey key;
 
-  private ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+  private EncodedTablePage encodedTablePage;
+
+  private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+
+  // true if it is last page of all input rows
+  private boolean isLastPage;
 
   TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException {
     this.model = model;
@@ -240,14 +244,16 @@ public class TablePage {
     return output;
   }
 
-  EncodedTablePage encode() throws KeyGenException, MemoryException, IOException {
+  void encode() throws KeyGenException, MemoryException, IOException {
     // encode dimensions and measure
     EncodedDimensionPage[] dimensions = encodeAndCompressDimensions();
     EncodedMeasurePage[] measures = encodeAndCompressMeasures();
-    return EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
+    this.encodedTablePage = EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
   }
 
-  private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+  public EncodedTablePage getEncodedTablePage() {
+    return encodedTablePage;
+  }
 
   // apply measure and set encodedData in `encodedData`
   private EncodedMeasurePage[] encodeAndCompressMeasures()
@@ -301,6 +307,52 @@ public class TablePage {
     encodedDimensions.addAll(encodedComplexDimenions);
     return encodedDimensions.toArray(new EncodedDimensionPage[encodedDimensions.size()]);
   }
+
+  /**
+   * return column page of specified column name
+   */
+  public ColumnPage getColumnPage(String columnName) {
+    int dictDimensionIndex = -1;
+    int noDictDimensionIndex = -1;
+    ColumnPage page = null;
+    TableSpec spec = model.getTableSpec();
+    int numDimensions = spec.getNumDimensions();
+    for (int i = 0; i < numDimensions; i++) {
+      DimensionType type = spec.getDimensionSpec(i).getDimensionType();
+      if ((type == DimensionType.GLOBAL_DICTIONARY) || (type == DimensionType.DIRECT_DICTIONARY)) {
+        page = dictDimensionPages[++dictDimensionIndex];
+      } else if (type == DimensionType.PLAIN_VALUE) {
+        page = noDictDimensionPages[++noDictDimensionIndex];
+      } else {
+        // do not support datamap on complex column
+        continue;
+      }
+      String fieldName = spec.getDimensionSpec(i).getFieldName();
+      if (fieldName.equalsIgnoreCase(columnName)) {
+        return page;
+      }
+    }
+    int numMeasures = spec.getNumMeasures();
+    for (int i = 0; i < numMeasures; i++) {
+      String fieldName = spec.getMeasureSpec(i).getFieldName();
+      if (fieldName.equalsIgnoreCase(columnName)) {
+        return measurePage[i];
+      }
+    }
+    throw new IllegalArgumentException("DataMap: must have '" + columnName + "' column in schema");
+  }
+
+  public boolean isLastPage() {
+    return isLastPage;
+  }
+
+  public void setIsLastPage(boolean isWriteAll) {
+    this.isLastPage = isWriteAll;
+  }
+
+  public int getPageSize() {
+    return pageSize;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a34ed01..bcc0112 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -24,7 +24,6 @@ import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-//import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -44,17 +43,14 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMergerUtil;
@@ -66,6 +62,7 @@ import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.IndexHeader;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.file.FileData;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -97,11 +94,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   protected String carbonDataFileTempPath;
 
   /**
-   * The name of carbonData file
+   * The name of carbonData file (blockId)
    */
   protected String carbonDataFileName;
 
   /**
+   * The sequence number of blocklet inside one block
+   */
+  protected int blockletId = 0;
+
+  /**
+   * The sequence number of page inside one blocklet
+   */
+  protected int pageId = 0;
+
+  /**
    * Local cardinality for the segment
    */
   protected int[] localCardinality;
@@ -132,7 +139,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   /**
    * data block size for one carbon data file
    */
-  private long dataBlockSize;
+  private long blockSizeThreshold;
   /**
    * file size at any given point
    */
@@ -152,6 +159,11 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
 
+  /**
+   * listener to write data map
+   */
+  protected DataMapWriterListener listener;
+
   public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
     this.dataWriterVo = dataWriterVo;
     this.blockletInfoList =
@@ -163,22 +175,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     this.fileSizeInBytes =
         (long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
             * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
-    /*
-    size reserved in one file for writing block meta data. It will be in percentage
-   */
+
+    // size reserved in one file for writing block meta data. It will be in percentage
     int spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
         .getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
             CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
-    this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
-    LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + dataBlockSize);
+    this.blockSizeThreshold =
+        fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
+    LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
+        blockSizeThreshold);
 
     this.executorService = Executors.newFixedThreadPool(1);
     executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     // in case of compaction we will pass the cardinality.
     this.localCardinality = dataWriterVo.getColCardinality();
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + dataWriterVo
-            .getTableName());
+
     //TODO: We should delete the levelmetadata file after reading here.
     // so only data loading flow will need to read from cardinality file.
     if (null == this.localCardinality) {
@@ -202,6 +213,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     this.dataChunksLength = new ArrayList<>();
     blockletMetadata = new ArrayList<BlockletInfo3>();
     blockletIndex = new ArrayList<>();
+    listener = dataWriterVo.getListener();
   }
 
   /**
@@ -241,18 +253,19 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   }
 
   /**
-   * This method will be used to update the file channel with new file; new
-   * file will be created once existing file reached the file size limit This
+   * This method will be used to update the file channel with new file if exceeding block size
+   * threshold, new file will be created once existing file reached the file size limit This
    * method will first check whether existing file size is exceeded the file
    * size limit if yes then write the leaf metadata to file then set the
    * current file size to 0 close the existing file channel get the new file
    * name and get the channel for new file
    *
-   * @param blockletDataSize data size of one block
+   * @param blockletSizeToBeAdded data size of one block
    * @throws CarbonDataWriterException if any problem
    */
-  protected void updateBlockletFileChannel(long blockletDataSize) throws CarbonDataWriterException {
-    if ((currentFileSize + blockletDataSize) >= dataBlockSize && currentFileSize != 0) {
+  protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded)
+      throws CarbonDataWriterException {
+    if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
       // set the current file size to zero
       LOGGER.info("Writing data to file as max file size reached for file: "
           + carbonDataFileTempPath + " .Data block size: " + currentFileSize);
@@ -265,16 +278,42 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       this.dataChunksLength = new ArrayList<>();
       this.blockletMetadata = new ArrayList<>();
       this.blockletIndex = new ArrayList<>();
-      CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-      // rename carbon data file from in progress status to actual
-      renameCarbonDataFile();
-      executorServiceSubmitList.add(executorService
-          .submit(new CopyThread(this.carbonDataFileTempPath
-              .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')))));
+      commitCurrentFile(false);
       // initialize the new channel
       initializeWriter();
     }
-    currentFileSize += blockletDataSize;
+    currentFileSize += blockletSizeToBeAdded;
+  }
+
+  private void notifyDataMapBlockStart() {
+    if (listener != null) {
+      listener.onBlockStart(carbonDataFileName);
+    }
+  }
+
+  private void notifyDataMapBlockEnd() {
+    if (listener != null) {
+      listener.onBlockEnd(carbonDataFileName);
+    }
+    blockletId = 0;
+  }
+
+  /**
+   * Finish writing current file. It will flush stream, copy and rename temp file to final file
+   * @param copyInCurrentThread set to false if want to do data copy in a new thread
+   */
+  protected void commitCurrentFile(boolean copyInCurrentThread) {
+    notifyDataMapBlockEnd();
+    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+    // rename carbon data file from in progress status to actual
+    renameCarbonDataFile();
+    String fileName = this.carbonDataFileTempPath.substring(0,
+        this.carbonDataFileTempPath.lastIndexOf('.'));
+    if (copyInCurrentThread) {
+      copyCarbonDataFileToCarbonStorePath(fileName);
+    } else {
+      executorServiceSubmitList.add(executorService.submit(new CopyThread(fileName)));
+    }
   }
 
   /**
@@ -310,6 +349,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       throw new CarbonDataWriterException("Problem while getting the FileChannel for Leaf File",
           fileNotFoundException);
     }
+    notifyDataMapBlockStart();
   }
 
   private int initFileCount() {
@@ -433,18 +473,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @throws CarbonDataWriterException
    */
   public void closeWriter() throws CarbonDataWriterException {
-    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
     if (this.blockletInfoList.size() > 0) {
-      renameCarbonDataFile();
-      copyCarbonDataFileToCarbonStorePath(
-          this.carbonDataFileTempPath
-              .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
+      commitCurrentFile(true);
       try {
         writeIndexFile();
       } catch (IOException e) {
         throw new CarbonDataWriterException("Problem while writing the index file", e);
       }
     }
+    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
     closeExecutorService();
   }
 
@@ -590,17 +627,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   }
 
   /**
-   * This method will be used to write leaf data to file
-   * file format
-   * <key><measure1><measure2>....
-   *
-   * @throws CarbonDataWriterException
-   * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
-   */
-  public abstract void writeTablePage(EncodedTablePage encodedTablePage)
-      throws CarbonDataWriterException;
-
-  /**
    * Below method will be used to update the min or max value
    * by removing the length from it
    *
@@ -608,10 +634,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   protected byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
     return valueWithLength;
-//    ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
-//    byte[] actualValue = new byte[buffer.getShort()];
-//    buffer.get(actualValue);
-//    return actualValue;
   }
 
   /**
@@ -640,5 +662,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       copyCarbonDataFileToCarbonStorePath(fileName);
       return null;
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 225e031..26fff09 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -20,6 +20,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 
@@ -64,6 +65,8 @@ public class CarbonDataWriterVo {
 
   private int taskExtension;
 
+  private DataMapWriterListener listener;
+
   /**
    * @return the storeLocation
    */
@@ -303,4 +306,12 @@ public class CarbonDataWriterVo {
   public void setTaskExtension(int taskExtension) {
     this.taskExtension = taskExtension;
   }
+
+  public void setListener(DataMapWriterListener listener) {
+    this.listener = listener;
+  }
+
+  public DataMapWriterListener getListener() {
+    return listener;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index f194f74..3b26b7c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -18,14 +18,15 @@
 package org.apache.carbondata.processing.store.writer;
 
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.processing.store.TablePage;
 
 public interface CarbonFactDataWriter<T> {
 
   /**
    * write a encoded table page
+   * @param tablePage
    */
-  void writeTablePage(EncodedTablePage encodedTablePage) throws CarbonDataWriterException;
+  void writeTablePage(TablePage tablePage) throws CarbonDataWriterException;
 
   /**
    * Below method will be used to write the leaf meta data to file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index 0f1b52b..f849e21 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.TablePage;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 
@@ -199,14 +200,14 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     return holder;
   }
 
-  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+  @Override public void writeTablePage(TablePage tablePage)
       throws CarbonDataWriterException {
-    if (encodedTablePage.getPageSize() == 0) {
+    if (tablePage.getPageSize() == 0) {
       return;
     }
-    long blockletDataSize = encodedTablePage.getEncodedSize();
-    updateBlockletFileChannel(blockletDataSize);
-    NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
+    long blockletDataSize = tablePage.getEncodedTablePage().getEncodedSize();
+    createNewFileIfReachThreshold(blockletDataSize);
+    NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage());
     // write data to file and get its offset
     long offset = writeDataToFile(nodeHolder, fileChannel);
     // get the blocklet info for currently added blocklet

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index e19a5ce..3f49a7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.TablePage;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
 
@@ -63,19 +64,19 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
   /**
    * Below method will be used to write the data to carbon data file
    *
-   * @param encodedTablePage
+   * @param tablePage
    * @throws CarbonDataWriterException any problem in writing operation
    */
-  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+  @Override public void writeTablePage(TablePage tablePage)
       throws CarbonDataWriterException {
-    NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
-    if (encodedTablePage.getPageSize() == 0) {
+    NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage());
+    if (tablePage.getPageSize() == 0) {
       return;
     }
     // size to calculate the size of the blocklet
     int size = 0;
     // get the blocklet info object
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(encodedTablePage, 0);
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(tablePage.getEncodedTablePage(), 0);
 
     List<DataChunk2> datachunks = null;
     try {
@@ -105,7 +106,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
         nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength() + size;
     // if size of the file already reached threshold size then create a new file and get the file
     // channel object
-    updateBlockletFileChannel(blockletDataSize);
+    createNewFileIfReachThreshold(blockletDataSize);
     // writer the version header in the file if current file size is zero
     // this is done so carbondata file can be read separately
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
new file mode 100644
index 0000000..68aee95
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.processing.store.TablePage;
+
+public class BlockletDataHolder {
+  private List<EncodedTablePage> encodedTablePage;
+  private List<TablePage> rawTablePages;
+  private long currentSize;
+
+  public BlockletDataHolder() {
+    this.encodedTablePage = new ArrayList<>();
+    this.rawTablePages = new ArrayList<>();
+  }
+
+  public void clear() {
+    encodedTablePage.clear();
+    rawTablePages.clear();
+    currentSize = 0;
+  }
+
+  public void addPage(TablePage rawTablePage) {
+    EncodedTablePage encodedTablePage = rawTablePage.getEncodedTablePage();
+    this.encodedTablePage.add(encodedTablePage);
+    this.rawTablePages.add(rawTablePage);
+    currentSize += encodedTablePage.getEncodedSize();
+  }
+
+  public long getSize() {
+    // increasing it by 15 percent for data chunk 3 of each column each page
+    return currentSize + ((currentSize * 15) / 100);
+  }
+
+  public int getNumberOfPagesAdded() {
+    return encodedTablePage.size();
+  }
+
+  public int getTotalRows() {
+    int rows = 0;
+    for (EncodedTablePage nh : encodedTablePage) {
+      rows += nh.getPageSize();
+    }
+    return rows;
+  }
+
+  public List<EncodedTablePage> getEncodedTablePages() {
+    return encodedTablePage;
+  }
+
+  public List<TablePage> getRawTablePages() {
+    return rawTablePages;
+  }
+}


[3/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter interface

Posted by ra...@apache.org.
[CARBONDATA-1363] Add DataMapWriter interface

This closes #1238


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f089287c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f089287c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f089287c

Branch: refs/heads/master
Commit: f089287cef1d685b81e8fa26868325503acdb635
Parents: 85cbad2
Author: Jacky Li <ja...@qq.com>
Authored: Thu Aug 10 13:36:18 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Aug 14 01:30:40 2017 +0530

----------------------------------------------------------------------
 .../core/datamap/DataMapDistributable.java      |  56 +++
 .../carbondata/core/datamap/DataMapMeta.java    |  42 ++
 .../core/datamap/DataMapStoreManager.java       | 144 +++++++
 .../carbondata/core/datamap/TableDataMap.java   | 142 +++++++
 .../carbondata/core/datamap/dev/DataMap.java    |  57 +++
 .../core/datamap/dev/DataMapFactory.java        |  73 ++++
 .../core/datamap/dev/DataMapWriter.java         |  58 +++
 .../core/datastore/page/EncodedTablePage.java   |  11 -
 .../indexstore/BlockletDataMapIndexStore.java   |  13 +-
 .../carbondata/core/indexstore/DataMap.java     |  60 ---
 .../core/indexstore/DataMapDistributable.java   |  56 ---
 .../core/indexstore/DataMapFactory.java         |  87 ----
 .../core/indexstore/DataMapStoreManager.java    | 139 -------
 .../carbondata/core/indexstore/DataMapType.java |  36 --
 .../core/indexstore/DataMapWriter.java          |  50 ---
 .../core/indexstore/TableDataMap.java           | 133 ------
 .../blockletindex/BlockletDataMap.java          |  45 +-
 .../blockletindex/BlockletDataMapFactory.java   |  46 +-
 .../core/metadata/AbsoluteTableIdentifier.java  |   4 +
 .../core/metadata/CarbonTableIdentifier.java    |   6 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   6 +
 .../hadoop/api/CarbonTableInputFormat.java      |  16 +-
 .../spark/load/CarbonLoaderUtilTest.java        | 417 -------------------
 .../validation/FileFooterValidator.java         | 155 -------
 .../testsuite/datamap/DataMapWriterSuite.scala  | 180 ++++++++
 .../spark/rdd/DataManagementFunc.scala          |   5 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |   2 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |   2 +-
 .../iud/DeleteCarbonTableSubqueryTestCase.scala |   5 +-
 .../core/datastore/GenericDataType.java         | 145 -------
 .../datamap/DataMapWriterListener.java          | 138 ++++++
 .../processing/datatypes/ArrayDataType.java     |   1 -
 .../processing/datatypes/GenericDataType.java   | 145 +++++++
 .../processing/datatypes/PrimitiveDataType.java |   1 -
 .../processing/datatypes/StructDataType.java    |   1 -
 .../impl/ComplexFieldConverterImpl.java         |   2 +-
 .../converter/impl/FieldEncoderFactory.java     |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 131 +++---
 .../store/CarbonFactDataHandlerModel.java       |  14 +-
 .../carbondata/processing/store/TablePage.java  |  66 ++-
 .../store/writer/AbstractFactDataWriter.java    | 115 +++--
 .../store/writer/CarbonDataWriterVo.java        |  11 +
 .../store/writer/CarbonFactDataWriter.java      |   5 +-
 .../writer/v1/CarbonFactDataWriterImplV1.java   |  11 +-
 .../writer/v2/CarbonFactDataWriterImplV2.java   |  13 +-
 .../store/writer/v3/BlockletDataHolder.java     |  72 ++++
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 131 +++---
 .../store/writer/v3/DataWriterHolder.java       |  62 ---
 .../util/CarbonDataProcessorUtil.java           |   2 +-
 49 files changed, 1502 insertions(+), 1612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
new file mode 100644
index 0000000..517f629
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import org.apache.carbondata.core.datastore.block.Distributable;
+
+/**
+ * Distributable class for datamap.
+ */
+public abstract class DataMapDistributable implements Distributable {
+
+  private String tablePath;
+
+  private String segmentId;
+
+  private String dataMapName;
+
+  public String getTablePath() {
+    return tablePath;
+  }
+
+  public void setTablePath(String tablePath) {
+    this.tablePath = tablePath;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
+  public String getDataMapName() {
+    return dataMapName;
+  }
+
+  public void setDataMapName(String dataMapName) {
+    this.dataMapName = dataMapName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
new file mode 100644
index 0000000..7746acf
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+
+public class DataMapMeta {
+
+  private List<String> indexedColumns;
+
+  private FilterType optimizedOperation;
+
+  public DataMapMeta(List<String> indexedColumns, FilterType optimizedOperation) {
+    this.indexedColumns = indexedColumns;
+    this.optimizedOperation = optimizedOperation;
+  }
+
+  public List<String> getIndexedColumns() {
+    return indexedColumns;
+  }
+
+  public FilterType getOptimizedOperation() {
+    return optimizedOperation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
new file mode 100644
index 0000000..f5bc22f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * It maintains all the DataMaps in it.
+ */
+public final class DataMapStoreManager {
+
+  private static DataMapStoreManager instance = new DataMapStoreManager();
+
+  /**
+   * Contains the list of datamaps for each table.
+   */
+  private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>();
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
+
+  private DataMapStoreManager() {
+
+  }
+
+  public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
+    return allDataMaps.get(identifier.uniqueName());
+  }
+
+  /**
+   * Get the datamap for reading data.
+   *
+   * @param dataMapName
+   * @param factoryClass
+   * @return
+   */
+  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+      Class<? extends DataMapFactory> factoryClass) {
+    String table = identifier.uniqueName();
+    List<TableDataMap> tableDataMaps = allDataMaps.get(table);
+    TableDataMap dataMap;
+    if (tableDataMaps == null) {
+      dataMap = createAndRegisterDataMap(identifier, factoryClass, dataMapName);
+    } else {
+      dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+    }
+    if (dataMap == null) {
+      throw new RuntimeException("Datamap does not exist");
+    }
+    return dataMap;
+  }
+
+  /**
+   * Return a new datamap instance and registered in the store manager.
+   * The datamap is created using datamap name, datamap factory class and table identifier.
+   */
+  public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
+      Class<? extends DataMapFactory> factoryClass, String dataMapName) {
+    String table = identifier.uniqueName();
+    List<TableDataMap> tableDataMaps = allDataMaps.get(table);
+    if (tableDataMaps == null) {
+      tableDataMaps = new ArrayList<>();
+      allDataMaps.put(table, tableDataMaps);
+    }
+    TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+    if (dataMap != null) {
+      throw new RuntimeException("Already datamap exists in that path with type " + dataMapName);
+    }
+
+    try {
+      DataMapFactory dataMapFactory = factoryClass.newInstance();
+      dataMapFactory.init(identifier, dataMapName);
+      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+    tableDataMaps.add(dataMap);
+    return dataMap;
+  }
+
+  private TableDataMap getAbstractTableDataMap(String dataMapName,
+      List<TableDataMap> tableDataMaps) {
+    TableDataMap dataMap = null;
+    for (TableDataMap tableDataMap: tableDataMaps) {
+      if (tableDataMap.getDataMapName().equals(dataMapName)) {
+        dataMap = tableDataMap;
+        break;
+      }
+    }
+    return dataMap;
+  }
+
+  /**
+   * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+   * @param identifier
+   * @param dataMapName
+   */
+  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier);
+    if (tableDataMaps != null) {
+      int i = 0;
+      for (TableDataMap tableDataMap: tableDataMaps) {
+        if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+          tableDataMap.clear();
+          tableDataMaps.remove(i);
+          break;
+        }
+        i++;
+      }
+    }
+  }
+
+  /**
+   * Returns the singleton instance
+   * @return
+   */
+  public static DataMapStoreManager getInstance() {
+    return instance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
new file mode 100644
index 0000000..b55c5d9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.events.EventListener;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * DataMap at the table level, user can add any number of datamaps for one table. Depends
+ * on the filter condition it can prune the blocklets.
+ */
+public final class TableDataMap implements EventListener {
+
+  private AbsoluteTableIdentifier identifier;
+
+  private String dataMapName;
+
+  private DataMapFactory dataMapFactory;
+
+  /**
+   * It is called to initialize and load the required table datamap metadata.
+   */
+  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+      DataMapFactory dataMapFactory) {
+    this.identifier = identifier;
+    this.dataMapName = dataMapName;
+    this.dataMapFactory = dataMapFactory;
+  }
+
+  /**
+   * Pass the valid segments and prune the datamap using filter expression
+   *
+   * @param segmentIds
+   * @param filterExp
+   * @return
+   */
+  public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp)
+      throws IOException {
+    List<Blocklet> blocklets = new ArrayList<>();
+    for (String segmentId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+      for (DataMap dataMap : dataMaps) {
+        List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+        blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+      }
+    }
+    return blocklets;
+  }
+
+  private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+    for (Blocklet blocklet : pruneBlocklets) {
+      blocklet.setSegmentId(segmentId);
+    }
+    return pruneBlocklets;
+  }
+
+  /**
+   * This is used for making the datamap distributable.
+   * It takes the valid segments and returns all the datamaps as distributable objects so that
+   * it can be distributed across machines.
+   *
+   * @return
+   */
+  public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException {
+    List<DataMapDistributable> distributables = new ArrayList<>();
+    for (String segmentsId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+      for (DataMap dataMap : dataMaps) {
+        distributables.add(dataMap.toDistributable());
+      }
+    }
+    return distributables;
+  }
+
+  /**
+   * This method is used from any machine after it is distributed. It takes the distributable object
+   * to prune the filters.
+   *
+   * @param distributable
+   * @param filterExp
+   * @return
+   */
+  public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+    return dataMapFactory.getDataMap(distributable).prune(filterExp);
+  }
+
+  @Override public void fireEvent(ChangeEvent event) {
+    dataMapFactory.fireEvent(event);
+  }
+
+  /**
+   * Clear only the datamaps of the segments
+   * @param segmentIds
+   */
+  public void clear(List<String> segmentIds) {
+    for (String segmentId: segmentIds) {
+      dataMapFactory.clear(segmentId);
+    }
+  }
+
+  /**
+   * Clears all datamap
+   */
+  public void clear() {
+    dataMapFactory.clear();
+  }
+  /**
+   * Get the unique name of datamap
+   *
+   * @return
+   */
+  public String getDataMapName() {
+    return dataMapName;
+  }
+
+  public DataMapFactory getDataMapFactory() {
+    return dataMapFactory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
new file mode 100644
index 0000000..526572a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * Datamap is an entity which can store and retrieve index data.
+ */
+public interface DataMap {
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  void init(String path) throws MemoryException, IOException;
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  List<Blocklet> prune(FilterResolverIntf filterExp);
+
+  /**
+   * Convert datamap to distributable object
+   * @return
+   */
+  DataMapDistributable toDistributable();
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  void clear();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
new file mode 100644
index 0000000..873457c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  void init(AbsoluteTableIdentifier identifier, String dataMapName);
+
+  /**
+   * Return a new write for this datamap
+   */
+  DataMapWriter createWriter(String segmentId);
+
+  /**
+   * Get the datamap for segmentid
+   */
+  List<DataMap> getDataMaps(String segmentId) throws IOException;
+
+  /**
+   * Get datamap for distributable object.
+   */
+  DataMap getDataMap(DataMapDistributable distributable);
+
+  /**
+   *
+   * @param event
+   */
+  void fireEvent(ChangeEvent event);
+
+  /**
+   * Clears datamap of the segment
+   */
+  void clear(String segmentId);
+
+  /**
+   * Clear all datamaps from memory
+   */
+  void clear();
+
+  /**
+   * Return metadata of this datamap
+   */
+  DataMapMeta getMeta();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
new file mode 100644
index 0000000..28163d7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ * Data Map writer
+ */
+public interface DataMapWriter {
+
+  /**
+   *  Start of new block notification.
+   *  @param blockId file name of the carbondata file
+   */
+  void onBlockStart(String blockId);
+
+  /**
+   * End of block notification
+   */
+  void onBlockEnd(String blockId);
+
+  /**
+   * Start of new blocklet notification.
+   * @param blockletId sequence number of blocklet in the block
+   */
+  void onBlockletStart(int blockletId);
+
+  /**
+   * End of blocklet notification
+   * @param blockletId sequence number of blocklet in the block
+   */
+  void onBlockletEnd(int blockletId);
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
index ea9c373..0aac1d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
@@ -42,9 +42,6 @@ public class EncodedTablePage {
   // number of row in this page
   private int pageSize;
 
-  // true if it is last page of all input rows
-  private boolean isLastPage;
-
   // size in bytes of all encoded columns (including data and metadate)
   private int encodedSize;
 
@@ -128,14 +125,6 @@ public class EncodedTablePage {
     return pageKey;
   }
 
-  public boolean isLastPage() {
-    return isLastPage;
-  }
-
-  public void setIsLastPage(boolean isWriteAll) {
-    this.isLastPage = isWriteAll;
-  }
-
   public EncodedMeasurePage getMeasure(int measureIndex) {
     return measures[measureIndex];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index fc8c273..9d4af7b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -26,8 +26,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.memory.MemoryException;
 
 /**
  * Class to handle loading, unloading,clearing,storing of the table
@@ -73,10 +73,9 @@ public class BlockletDataMapIndexStore
     if (dataMap == null) {
       try {
         dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
-      } catch (IndexBuilderException e) {
-        throw new IOException(e.getMessage(), e);
-      } catch (Throwable e) {
-        throw new IOException("Problem in loading segment block.", e);
+      } catch (MemoryException e) {
+        LOGGER.error("memory exception when loading datamap: " + e.getMessage());
+        throw new RuntimeException(e.getMessage(), e);
       }
     }
     return dataMap;
@@ -93,6 +92,7 @@ public class BlockletDataMapIndexStore
       for (BlockletDataMap dataMap : blockletDataMaps) {
         dataMap.clear();
       }
+      e.printStackTrace();
       throw new IOException("Problem in loading segment blocks.", e);
     }
     return blockletDataMaps;
@@ -130,7 +130,8 @@ public class BlockletDataMapIndexStore
    * @throws IOException
    */
   private BlockletDataMap loadAndGetDataMap(
-      TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+      TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+      throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
     Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
deleted file mode 100644
index 1276494..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.List;
-
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
-/**
- * Datamap is an entity which can store and retrieve index data.
- */
-public interface DataMap {
-
-  /**
-   * Give the writer to write the data.
-   *
-   * @return
-   */
-  DataMapWriter getWriter();
-
-  /**
-   * It is called to load the data map to memory or to initialize it.
-   */
-  void init(String path);
-
-  /**
-   * Prune the datamap with filter expression. It returns the list of
-   * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
-   */
-  List<Blocklet> prune(FilterResolverIntf filterExp);
-
-  /**
-   * Convert datamap to distributable object
-   * @return
-   */
-  DataMapDistributable toDistributable();
-
-  /**
-   * Clear complete index table and release memory.
-   */
-  void clear();
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
deleted file mode 100644
index 4c379f3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-
-/**
- * Distributable class for datamap.
- */
-public abstract class DataMapDistributable implements Distributable {
-
-  private String tablePath;
-
-  private String segmentId;
-
-  private String dataMapName;
-
-  public String getTablePath() {
-    return tablePath;
-  }
-
-  public void setTablePath(String tablePath) {
-    this.tablePath = tablePath;
-  }
-
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-
-  public String getDataMapName() {
-    return dataMapName;
-  }
-
-  public void setDataMapName(String dataMapName) {
-    this.dataMapName = dataMapName;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
deleted file mode 100644
index 72f714f..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.List;
-
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
-/**
- * Interface for datamap factory, it is responsible for creating the datamap.
- */
-public interface DataMapFactory {
-
-  /**
-   * Initialization of Datamap factory
-   * @param identifier
-   * @param dataMapName
-   */
-  void init(AbsoluteTableIdentifier identifier, String dataMapName);
-  /**
-   * Get the datamap writer for each segmentid.
-   *
-   * @param identifier
-   * @param segmentId
-   * @return
-   */
-  DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
-      String segmentId);
-
-  /**
-   * Get the datamap for segmentid
-   *
-   * @param segmentId
-   * @return
-   */
-  List<DataMap> getDataMaps(String segmentId);
-
-  /**
-   * Get datamap for distributable object.
-   *
-   * @param distributable
-   * @return
-   */
-  DataMap getDataMap(DataMapDistributable distributable);
-
-  /**
-   * This method checks whether the columns and the type of filters supported
-   * for this datamap or not
-   *
-   * @param filterType
-   * @return
-   */
-  boolean isFiltersSupported(FilterType filterType);
-
-  /**
-   *
-   * @param event
-   */
-  void fireEvent(ChangeEvent event);
-
-  /**
-   * Clears datamap of the segment
-   */
-  void clear(String segmentId);
-
-  /**
-   * Clear all datamaps from memory
-   */
-  void clear();
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
deleted file mode 100644
index 1664a6a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
-/**
- * It maintains all the DataMaps in it.
- */
-public final class DataMapStoreManager {
-
-  private static DataMapStoreManager instance = new DataMapStoreManager();
-
-  /**
-   * Contains the list of datamaps for each table.
-   */
-  private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
-
-  private DataMapStoreManager() {
-
-  }
-
-  /**
-   * Get the datamap for reading data.
-   *
-   * @param dataMapName
-   * @param mapType
-   * @return
-   */
-  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
-      DataMapType mapType) {
-    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
-    TableDataMap dataMap;
-    if (tableDataMaps == null) {
-      createTableDataMap(identifier, mapType, dataMapName);
-      tableDataMaps = dataMapMappping.get(identifier);
-    }
-    dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
-    if (dataMap == null) {
-      throw new RuntimeException("Datamap does not exist");
-    }
-    return dataMap;
-  }
-
-  /**
-   * Create new datamap instance using datamap name, datamap type and table identifier
-   *
-   * @param mapType
-   * @return
-   */
-  private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
-      DataMapType mapType, String dataMapName) {
-    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
-    if (tableDataMaps == null) {
-      tableDataMaps = new ArrayList<>();
-      dataMapMappping.put(identifier, tableDataMaps);
-    }
-    TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
-    if (dataMap != null) {
-      throw new RuntimeException("Already datamap exists in that path with type " + mapType);
-    }
-
-    try {
-      DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
-      dataMapFactory.init(identifier, dataMapName);
-      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new RuntimeException(e);
-    }
-    tableDataMaps.add(dataMap);
-    return dataMap;
-  }
-
-  private TableDataMap getAbstractTableDataMap(String dataMapName,
-      List<TableDataMap> tableDataMaps) {
-    TableDataMap dataMap = null;
-    for (TableDataMap tableDataMap: tableDataMaps) {
-      if (tableDataMap.getDataMapName().equals(dataMapName)) {
-        dataMap = tableDataMap;
-        break;
-      }
-    }
-    return dataMap;
-  }
-
-  /**
-   * Clear the datamap/datamaps of a mentioned datamap name and table from memory
-   * @param identifier
-   * @param dataMapName
-   */
-  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
-    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
-    if (tableDataMaps != null) {
-      int i = 0;
-      for (TableDataMap tableDataMap: tableDataMaps) {
-        if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
-          tableDataMap.clear();
-          tableDataMaps.remove(i);
-          break;
-        }
-        i++;
-      }
-    }
-  }
-
-  /**
-   * Returns the singleton instance
-   * @return
-   */
-  public static DataMapStoreManager getInstance() {
-    return instance;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
deleted file mode 100644
index 0059b29..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
-
-/**
- * Datamap type
- */
-public enum DataMapType {
-  BLOCKLET(BlockletDataMapFactory.class);
-
-  private Class<? extends DataMapFactory> classObject;
-
-  DataMapType(Class<? extends DataMapFactory> classObject) {
-    this.classObject = classObject;
-  }
-
-  public Class<? extends DataMapFactory> getClassObject() {
-    return classObject;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
deleted file mode 100644
index bd8be09..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.io.DataOutput;
-
-/**
- * Data Map writer
- */
-public interface DataMapWriter<T> {
-
-  /**
-   * Initialize the data map writer with output stream
-   *
-   * @param outStream
-   */
-  void init(DataOutput outStream);
-
-  /**
-   * Add the index row to the in-memory store.
-   */
-  void writeData(T data);
-
-  /**
-   * Get the added row count
-   *
-   * @return
-   */
-  int getRowCount();
-
-  /**
-   * Finish writing of data map table, otherwise it will not be allowed to read.
-   */
-  void finish();
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
deleted file mode 100644
index 39ca4c5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.events.EventListener;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-/**
- * DataMap at the table level, user can add any number of datamaps for one table. Depends
- * on the filter condition it can prune the blocklets.
- */
-public final class TableDataMap implements EventListener {
-
-  private AbsoluteTableIdentifier identifier;
-
-  private String dataMapName;
-
-  private DataMapFactory dataMapFactory;
-
-  /**
-   * It is called to initialize and load the required table datamap metadata.
-   */
-  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
-      DataMapFactory dataMapFactory) {
-    this.identifier = identifier;
-    this.dataMapName = dataMapName;
-    this.dataMapFactory = dataMapFactory;
-  }
-
-  /**
-   * Pass the valid segments and prune the datamap using filter expression
-   *
-   * @param segmentIds
-   * @param filterExp
-   * @return
-   */
-  public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
-    List<Blocklet> blocklets = new ArrayList<>();
-    for (String segmentId : segmentIds) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
-      for (DataMap dataMap : dataMaps) {
-        List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
-        blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
-      }
-    }
-    return blocklets;
-  }
-
-  private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
-    for (Blocklet blocklet : pruneBlocklets) {
-      blocklet.setSegmentId(segmentId);
-    }
-    return pruneBlocklets;
-  }
-
-  /**
-   * This is used for making the datamap distributable.
-   * It takes the valid segments and returns all the datamaps as distributable objects so that
-   * it can be distributed across machines.
-   *
-   * @return
-   */
-  public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
-    List<DataMapDistributable> distributables = new ArrayList<>();
-    for (String segmentsId : segmentIds) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
-      for (DataMap dataMap : dataMaps) {
-        distributables.add(dataMap.toDistributable());
-      }
-    }
-    return distributables;
-  }
-
-  /**
-   * This method is used from any machine after it is distributed. It takes the distributable object
-   * to prune the filters.
-   *
-   * @param distributable
-   * @param filterExp
-   * @return
-   */
-  public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
-    return dataMapFactory.getDataMap(distributable).prune(filterExp);
-  }
-
-  @Override public void fireEvent(ChangeEvent event) {
-    dataMapFactory.fireEvent(event);
-  }
-
-  /**
-   * Clear only the datamaps of the segments
-   * @param segmentIds
-   */
-  public void clear(List<String> segmentIds) {
-    for (String segmentId: segmentIds) {
-      dataMapFactory.clear(segmentId);
-    }
-  }
-
-  /**
-   * Clears all datamap
-   */
-  public void clear() {
-    dataMapFactory.clear();
-  }
-  /**
-   * Get the unique name of datamap
-   *
-   * @return
-   */
-  public String getDataMapName() {
-    return dataMapName;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 4b5be11..2e82c46 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -31,14 +31,13 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import org.apache.carbondata.core.indexstore.DataMap;
-import org.apache.carbondata.core.indexstore.DataMapDistributable;
-import org.apache.carbondata.core.indexstore.DataMapWriter;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -64,6 +63,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
 
+  public static final String NAME = "clustered.btree.blocklet";
+
   private static int KEY_INDEX = 0;
 
   private static int MIN_VALUES_INDEX = 1;
@@ -88,31 +89,23 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private int[] columnCardinality;
 
-  @Override public DataMapWriter getWriter() {
-    return null;
-  }
-
-  @Override public void init(String path) {
+  @Override public void init(String path) throws IOException, MemoryException {
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    try {
-      List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
-      for (DataFileFooter fileFooter : indexInfo) {
-        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
-        if (segmentProperties == null) {
-          columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
-          segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-          createSchema(segmentProperties);
-        }
-        TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
-        fileFooter = CarbonUtil.readMetadatFile(blockInfo);
-
-        loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
-      }
-      if (unsafeMemoryDMStore != null) {
-        unsafeMemoryDMStore.finishWriting();
+    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+    for (DataFileFooter fileFooter : indexInfo) {
+      List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+      if (segmentProperties == null) {
+        columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+        segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+        createSchema(segmentProperties);
       }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+      TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+      fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+      loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+    }
+    if (unsafeMemoryDMStore != null) {
+      unsafeMemoryDMStore.finishWriting();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2fe6643..e189931 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -25,16 +25,16 @@ import java.util.Map;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.DataMap;
-import org.apache.carbondata.core.indexstore.DataMapDistributable;
-import org.apache.carbondata.core.indexstore.DataMapFactory;
-import org.apache.carbondata.core.indexstore.DataMapWriter;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 
 /**
@@ -44,21 +44,25 @@ public class BlockletDataMapFactory implements DataMapFactory {
 
   private AbsoluteTableIdentifier identifier;
 
+  // segmentId -> list of index file
   private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
   private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
 
+  @Override
   public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
     this.identifier = identifier;
     cache = CacheProvider.getInstance()
         .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
   }
 
-  public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
-    return null;
+  @Override
+  public DataMapWriter createWriter(String segmentId) {
+    throw new UnsupportedOperationException("not implemented");
   }
 
-  public List<DataMap> getDataMaps(String segmentId) {
+  @Override
+  public List<DataMap> getDataMaps(String segmentId) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segmentId);
     if (tableBlockIndexUniqueIdentifiers == null) {
@@ -77,17 +81,10 @@ public class BlockletDataMapFactory implements DataMapFactory {
       }
     }
 
-    try {
-      return cache.getAll(tableBlockIndexUniqueIdentifiers);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override public boolean isFiltersSupported(FilterType filterType) {
-    return true;
+    return cache.getAll(tableBlockIndexUniqueIdentifiers);
   }
 
+  @Override
   public void clear(String segmentId) {
     List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
     if (blockIndexes != null) {
@@ -99,17 +96,26 @@ public class BlockletDataMapFactory implements DataMapFactory {
     }
   }
 
-  @Override public void clear() {
+  @Override
+  public void clear() {
     for (String segmentId: segmentMap.keySet()) {
       clear(segmentId);
     }
   }
 
-  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+  @Override
+  public DataMap getDataMap(DataMapDistributable distributable) {
     return null;
   }
 
-  @Override public void fireEvent(ChangeEvent event) {
+  @Override
+  public void fireEvent(ChangeEvent event) {
 
   }
+
+  @Override
+  public DataMapMeta getMeta() {
+    // TODO: pass SORT_COLUMNS into this class
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 22faaf2..31ad03b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -156,4 +156,8 @@ public class AbsoluteTableIdentifier implements Serializable {
     }
     return true;
   }
+
+  public String uniqueName() {
+    return storePath + "/" + carbonTableIdentifier.toString().toLowerCase();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
index 31a0b23..cc65d9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
@@ -128,9 +128,9 @@ public class CarbonTableIdentifier implements Serializable {
     return true;
   }
 
-  /*
- * @return table unidque name
- */
+  /**
+   * return unique table name
+   */
   @Override public String toString() {
     return databaseName + '_' + tableName;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index edc4c28..15512a8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1950,5 +1950,11 @@ public final class CarbonUtil {
         throw new IllegalArgumentException("Invalid data type: " + meta.getType());
     }
   }
+
+  public static void requireNotNull(Object obj) {
+    if (obj == null) {
+      throw new IllegalArgumentException("parameter not be null");
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 54ad18b..19e264b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -30,11 +30,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
 import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.DataMapStoreManager;
-import org.apache.carbondata.core.indexstore.DataMapType;
-import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -246,7 +247,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
     TableDataMap blockletMap =
-        DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+        DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
+            BlockletDataMapFactory.class);
     List<String> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
@@ -403,7 +405,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
 
     TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+        .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
     List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
 
     List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
@@ -549,8 +551,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
       throws IOException, KeyGenException {
-    TableDataMap blockletMap =
-        DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+    TableDataMap blockletMap = DataMapStoreManager.getInstance()
+        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
         new SegmentStatusManager(identifier).getValidAndInvalidSegments();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
deleted file mode 100644
index 76c7e6f..0000000
--- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.integration.spark.load;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.spark.load.CarbonLoaderUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test class to test block distribution functionality
- */
-public class CarbonLoaderUtilTest {
-  List<Distributable> blockInfos = null;
-  int noOfNodesInput = -1;
-  List<String> activeNode = null;
-  Map<String, List<Distributable>> expected = null;
-  Map<String, List<Distributable>> mapOfNodes = null;
-
-  @Test public void nodeBlockMapping() throws Exception {
-
-    // scenario when the 3 nodes and 3 executors
-    initSet1();
-    Map<String, List<Distributable>> mapOfNodes =
-            CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
-    // node allocation
-    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
-    // block allocation
-    boolean isEqual = compareResult(expected, mapOfNodes);
-    Assert.assertTrue("Block Allocation", isEqual);
-
-    // 2 node and 3 executors
-    initSet2();
-    mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
-    // node allocation
-    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
-    // block allocation
-    isEqual = compareResult(expected, mapOfNodes);
-    Assert.assertTrue("Block Allocation", isEqual);
-
-    // 3 data node and 2 executors
-    initSet3();
-    mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
-    // node allocation
-    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
-    // block allocation
-    isEqual = compareResult(expected, mapOfNodes);
-    Assert.assertTrue("Block Allocation", isEqual);
-  }
-
-  /**
-   * compares the blocks allocation
-   *
-   * @param expectedResult
-   * @param actualResult
-   * @return
-   */
-  private boolean compareResult(Map<String, List<Distributable>> expectedResult,
-                                Map<String, List<Distributable>> actualResult) {
-    expectedResult = sortByListSize(expectedResult);
-    actualResult = sortByListSize(actualResult);
-    List<List<Distributable>> expectedList = new LinkedList(expectedResult.entrySet());
-    List<List<Distributable>> mapOfNodesList = new LinkedList(actualResult.entrySet());
-    boolean isEqual = expectedList.size() == mapOfNodesList.size();
-    if (isEqual) {
-      for (int i = 0; i < expectedList.size(); i++) {
-        int size1 = ((List) ((Map.Entry) (expectedList.get(i))).getValue()).size();
-        int size2 = ((List) ((Map.Entry) (mapOfNodesList.get(i))).getValue()).size();
-        isEqual = size1 == size2;
-        if (!isEqual) {
-          break;
-        }
-      }
-    }
-    return isEqual;
-  }
-
-  /**
-   * sort by list size
-   *
-   * @param map
-   * @return
-   */
-  private static Map<String, List<Distributable>> sortByListSize(
-          Map<String, List<Distributable>> map) {
-    List<List<Distributable>> list = new LinkedList(map.entrySet());
-    Collections.sort(list, new Comparator() {
-      public int compare(Object obj1, Object obj2) {
-        if (obj1 == null && obj2 == null) {
-          return 0;
-        } else if (obj1 == null) {
-          return 1;
-        } else if (obj2 == null) {
-          return -1;
-        }
-        int size1 = ((List) ((Map.Entry) (obj1)).getValue()).size();
-        int size2 = ((List) ((Map.Entry) (obj2)).getValue()).size();
-        return size2 - size1;
-      }
-    });
-
-    Map res = new LinkedHashMap();
-    for (Iterator it = list.iterator(); it.hasNext(); ) {
-      Map.Entry entry = (Map.Entry) it.next();
-      res.put(entry.getKey(), entry.getValue());
-    }
-    return res;
-  }
-
-  void initSet1() {
-    blockInfos = new ArrayList<>();
-    activeNode = new ArrayList<>();
-    activeNode.add("node-7");
-    activeNode.add("node-9");
-    activeNode.add("node-11");
-    String[] location = { "node-7", "node-9", "node-11" };
-    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
-    expected = new HashMap<>();
-    expected.put("node-7", blockInfos.subList(0, 2));
-    expected.put("node-9", blockInfos.subList(2, 4));
-    expected.put("node-11", blockInfos.subList(4, 6));
-  }
-
-  void initSet2() {
-    blockInfos = new ArrayList<>();
-    activeNode = new ArrayList<>();
-    activeNode.add("node-7");
-    activeNode.add("node-9");
-    activeNode.add("node-11");
-    String[] location = { "node-7", "node-11" };
-    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
-    expected = new HashMap<>();
-    expected.put("node-7", blockInfos.subList(0, 2));
-    expected.put("node-9", blockInfos.subList(2, 4));
-    expected.put("node-11", blockInfos.subList(4, 6));
-  }
-
-  void initSet3() {
-    blockInfos = new ArrayList<>();
-    activeNode = new ArrayList<>();
-    activeNode.add("node-7");
-    activeNode.add("node-11");
-    String[] location = { "node-7", "node-9", "node-11" };
-    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
-    expected = new HashMap<>();
-    expected.put("node-7", blockInfos.subList(0, 3));
-    expected.put("node-11", blockInfos.subList(3, 6));
-  }
-
-
-  /**
-   * Test case with 4 blocks and 4 nodes with 3 replication.
-   *
-   * @throws Exception
-   */
-  @Test public void nodeBlockMapping() throws Exception {
-
-    Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
-    TableBlockInfo block1 =
-        new TableBlockInfo("path1", 123, "1", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block2 =
-        new TableBlockInfo("path2", 123, "2", new String[] { "2", "3", "4" }, 111);
-    TableBlockInfo block3 =
-        new TableBlockInfo("path3", 123, "3", new String[] { "3", "4", "1" }, 111);
-    TableBlockInfo block4 =
-        new TableBlockInfo("path4", 123, "4", new String[] { "1", "2", "4" }, 111);
-
-    inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block2, Arrays.asList(new String[]{"2","3","4"}));
-    inputMap.put(block3, Arrays.asList(new String[]{"3","4","1"}));
-    inputMap.put(block4, Arrays.asList(new String[]{"1","2","4"}));
-
-    List<TableBlockInfo> inputBlocks = new ArrayList(6);
-    inputBlocks.add(block1);
-    inputBlocks.add(block2);
-    inputBlocks.add(block3);
-    inputBlocks.add(block4);
-
-    Map<String, List<TableBlockInfo>> outputMap
-        = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
-    Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 4, 4));
-
-    Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 4, 4));
-  }
-
-  private boolean calculateBlockLocality(Map<TableBlockInfo, List<String>> inputMap,
-      Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) {
-
-    double notInNodeLocality = 0;
-    for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) {
-
-      List<TableBlockInfo> blockListOfANode = entry.getValue();
-
-      for (TableBlockInfo eachBlock : blockListOfANode) {
-
-        // for each block check the node locality
-
-        List<String> blockLocality = inputMap.get(eachBlock);
-        if (!blockLocality.contains(entry.getKey())) {
-          notInNodeLocality++;
-        }
-      }
-    }
-
-    System.out.println(
-        ((notInNodeLocality / numberOfBlocks) * 100) + " " + "is the node locality mismatch");
-    if ((notInNodeLocality / numberOfBlocks) * 100 > 30) {
-      return false;
-    }
-    return true;
-  }
-
-  private boolean calculateBlockDistribution(Map<TableBlockInfo, List<String>> inputMap,
-      Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) {
-
-    int nodesPerBlock = numberOfBlocks / numberOfNodes;
-
-    for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) {
-
-      if (entry.getValue().size() < nodesPerBlock) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Test case with 5 blocks and 3 nodes
-   *
-   * @throws Exception
-   */
-  @Test public void nodeBlockMappingTestWith5blocks3nodes() throws Exception {
-
-    Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
-    TableBlockInfo block1 =
-        new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block2 =
-        new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block3 =
-        new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block4 =
-        new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block5 =
-        new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111);
-
-    inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"}));
-
-    List<TableBlockInfo> inputBlocks = new ArrayList(6);
-    inputBlocks.add(block1);
-    inputBlocks.add(block2);
-    inputBlocks.add(block3);
-    inputBlocks.add(block4);
-    inputBlocks.add(block5);
-
-    Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 3);
-
-    Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 5, 3));
-
-    Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 5, 3));
-
-  }
-
-  /**
-   * Test case with 6 blocks and 4 nodes where 4 th node doesnt have any local data.
-   *
-   * @throws Exception
-   */
-  @Test public void nodeBlockMappingTestWith6Blocks4nodes() throws Exception {
-
-    Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
-    TableBlockInfo block1 =
-        new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block2 =
-        new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block3 =
-        new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block4 =
-        new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block5 =
-        new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block6 =
-        new TableBlockInfo("part-5-0-1462341987000", 123, "6", new String[] { "1", "2", "3" }, 111);
-
-    inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block6, Arrays.asList(new String[]{"1","2","3"}));
-
-
-    List<TableBlockInfo> inputBlocks = new ArrayList(6);
-    inputBlocks.add(block1);
-    inputBlocks.add(block2);
-    inputBlocks.add(block3);
-    inputBlocks.add(block4);
-    inputBlocks.add(block5);
-    inputBlocks.add(block6);
-
-    Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
-    Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 6, 4));
-
-    Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 6, 4));
-
-  }
-
-  /**
-   * Test case with 10 blocks and 4 nodes with 10,60,30 % distribution
-   *
-   * @throws Exception
-   */
-  @Test public void nodeBlockMappingTestWith10Blocks4nodes() throws Exception {
-
-    Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
-    TableBlockInfo block1 =
-        new TableBlockInfo("part-1-0-1462341987000", 123, "1", new String[] { "2", "4" }, 111);
-    TableBlockInfo block2 =
-        new TableBlockInfo("part-2-0-1462341987000", 123, "2", new String[] { "2", "4" }, 111);
-    TableBlockInfo block3 =
-        new TableBlockInfo("part-3-0-1462341987000", 123, "3", new String[] { "2", "4" }, 111);
-    TableBlockInfo block4 =
-        new TableBlockInfo("part-4-0-1462341987000", 123, "4", new String[] { "2", "4" }, 111);
-    TableBlockInfo block5 =
-        new TableBlockInfo("part-5-0-1462341987000", 123, "5", new String[] { "2", "4" }, 111);
-    TableBlockInfo block6 =
-        new TableBlockInfo("part-6-0-1462341987000", 123, "6", new String[] { "2", "4" }, 111);
-    TableBlockInfo block7 =
-        new TableBlockInfo("part-7-0-1462341987000", 123, "7", new String[] { "3", "4" }, 111);
-    TableBlockInfo block8 =
-        new TableBlockInfo("part-8-0-1462341987000", 123, "8", new String[] { "3", "4" }, 111);
-    TableBlockInfo block9 =
-        new TableBlockInfo("part-9-0-1462341987000", 123, "9", new String[] { "3", "4" }, 111);
-    TableBlockInfo block10 =
-        new TableBlockInfo("part-10-0-1462341987000", 123, "9", new String[] { "1", "4" }, 111);
-
-    inputMap.put(block1, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block2, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block3, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block4, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block5, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block6, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block7, Arrays.asList(new String[]{"3","4"}));
-    inputMap.put(block8, Arrays.asList(new String[]{"3","4"}));
-    inputMap.put(block9, Arrays.asList(new String[]{"3","4"}));
-    inputMap.put(block10, Arrays.asList(new String[]{"1","4"}));
-
-    List<TableBlockInfo> inputBlocks = new ArrayList(6);
-    inputBlocks.add(block1);
-    inputBlocks.add(block2);
-    inputBlocks.add(block3);
-    inputBlocks.add(block4);
-    inputBlocks.add(block5);
-    inputBlocks.add(block6);
-    inputBlocks.add(block7);
-    inputBlocks.add(block8);
-    inputBlocks.add(block9);
-    inputBlocks.add(block10);
-
-    Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
-    Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 10, 4));
-
-    Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 10, 4));
-  }
-
-}
\ No newline at end of file