You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/13 20:01:20 UTC

[3/3] carbondata git commit: [CARBONDATA-1363] Add DataMapWriter interface

[CARBONDATA-1363] Add DataMapWriter interface

This closes #1238


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f089287c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f089287c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f089287c

Branch: refs/heads/master
Commit: f089287cef1d685b81e8fa26868325503acdb635
Parents: 85cbad2
Author: Jacky Li <ja...@qq.com>
Authored: Thu Aug 10 13:36:18 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Aug 14 01:30:40 2017 +0530

----------------------------------------------------------------------
 .../core/datamap/DataMapDistributable.java      |  56 +++
 .../carbondata/core/datamap/DataMapMeta.java    |  42 ++
 .../core/datamap/DataMapStoreManager.java       | 144 +++++++
 .../carbondata/core/datamap/TableDataMap.java   | 142 +++++++
 .../carbondata/core/datamap/dev/DataMap.java    |  57 +++
 .../core/datamap/dev/DataMapFactory.java        |  73 ++++
 .../core/datamap/dev/DataMapWriter.java         |  58 +++
 .../core/datastore/page/EncodedTablePage.java   |  11 -
 .../indexstore/BlockletDataMapIndexStore.java   |  13 +-
 .../carbondata/core/indexstore/DataMap.java     |  60 ---
 .../core/indexstore/DataMapDistributable.java   |  56 ---
 .../core/indexstore/DataMapFactory.java         |  87 ----
 .../core/indexstore/DataMapStoreManager.java    | 139 -------
 .../carbondata/core/indexstore/DataMapType.java |  36 --
 .../core/indexstore/DataMapWriter.java          |  50 ---
 .../core/indexstore/TableDataMap.java           | 133 ------
 .../blockletindex/BlockletDataMap.java          |  45 +-
 .../blockletindex/BlockletDataMapFactory.java   |  46 +-
 .../core/metadata/AbsoluteTableIdentifier.java  |   4 +
 .../core/metadata/CarbonTableIdentifier.java    |   6 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   6 +
 .../hadoop/api/CarbonTableInputFormat.java      |  16 +-
 .../spark/load/CarbonLoaderUtilTest.java        | 417 -------------------
 .../validation/FileFooterValidator.java         | 155 -------
 .../testsuite/datamap/DataMapWriterSuite.scala  | 180 ++++++++
 .../spark/rdd/DataManagementFunc.scala          |   5 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |   2 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |   2 +-
 .../iud/DeleteCarbonTableSubqueryTestCase.scala |   5 +-
 .../core/datastore/GenericDataType.java         | 145 -------
 .../datamap/DataMapWriterListener.java          | 138 ++++++
 .../processing/datatypes/ArrayDataType.java     |   1 -
 .../processing/datatypes/GenericDataType.java   | 145 +++++++
 .../processing/datatypes/PrimitiveDataType.java |   1 -
 .../processing/datatypes/StructDataType.java    |   1 -
 .../impl/ComplexFieldConverterImpl.java         |   2 +-
 .../converter/impl/FieldEncoderFactory.java     |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 131 +++---
 .../store/CarbonFactDataHandlerModel.java       |  14 +-
 .../carbondata/processing/store/TablePage.java  |  66 ++-
 .../store/writer/AbstractFactDataWriter.java    | 115 +++--
 .../store/writer/CarbonDataWriterVo.java        |  11 +
 .../store/writer/CarbonFactDataWriter.java      |   5 +-
 .../writer/v1/CarbonFactDataWriterImplV1.java   |  11 +-
 .../writer/v2/CarbonFactDataWriterImplV2.java   |  13 +-
 .../store/writer/v3/BlockletDataHolder.java     |  72 ++++
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 131 +++---
 .../store/writer/v3/DataWriterHolder.java       |  62 ---
 .../util/CarbonDataProcessorUtil.java           |   2 +-
 49 files changed, 1502 insertions(+), 1612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
new file mode 100644
index 0000000..517f629
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import org.apache.carbondata.core.datastore.block.Distributable;
+
+/**
+ * Distributable class for datamap.
+ */
+public abstract class DataMapDistributable implements Distributable {
+
+  private String tablePath;
+
+  private String segmentId;
+
+  private String dataMapName;
+
+  public String getTablePath() {
+    return tablePath;
+  }
+
+  public void setTablePath(String tablePath) {
+    this.tablePath = tablePath;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
+  public String getDataMapName() {
+    return dataMapName;
+  }
+
+  public void setDataMapName(String dataMapName) {
+    this.dataMapName = dataMapName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
new file mode 100644
index 0000000..7746acf
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+
+public class DataMapMeta {
+
+  private List<String> indexedColumns;
+
+  private FilterType optimizedOperation;
+
+  public DataMapMeta(List<String> indexedColumns, FilterType optimizedOperation) {
+    this.indexedColumns = indexedColumns;
+    this.optimizedOperation = optimizedOperation;
+  }
+
+  public List<String> getIndexedColumns() {
+    return indexedColumns;
+  }
+
+  public FilterType getOptimizedOperation() {
+    return optimizedOperation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
new file mode 100644
index 0000000..f5bc22f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * It maintains all the DataMaps in it.
+ */
+public final class DataMapStoreManager {
+
+  private static DataMapStoreManager instance = new DataMapStoreManager();
+
+  /**
+   * Contains the list of datamaps for each table.
+   */
+  private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>();
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
+
+  private DataMapStoreManager() {
+
+  }
+
+  public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
+    return allDataMaps.get(identifier.uniqueName());
+  }
+
+  /**
+   * Get the datamap for reading data.
+   *
+   * @param dataMapName
+   * @param factoryClass
+   * @return
+   */
+  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+      Class<? extends DataMapFactory> factoryClass) {
+    String table = identifier.uniqueName();
+    List<TableDataMap> tableDataMaps = allDataMaps.get(table);
+    TableDataMap dataMap;
+    if (tableDataMaps == null) {
+      dataMap = createAndRegisterDataMap(identifier, factoryClass, dataMapName);
+    } else {
+      dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+    }
+    if (dataMap == null) {
+      throw new RuntimeException("Datamap does not exist");
+    }
+    return dataMap;
+  }
+
+  /**
+   * Return a new datamap instance and registered in the store manager.
+   * The datamap is created using datamap name, datamap factory class and table identifier.
+   */
+  public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
+      Class<? extends DataMapFactory> factoryClass, String dataMapName) {
+    String table = identifier.uniqueName();
+    List<TableDataMap> tableDataMaps = allDataMaps.get(table);
+    if (tableDataMaps == null) {
+      tableDataMaps = new ArrayList<>();
+      allDataMaps.put(table, tableDataMaps);
+    }
+    TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+    if (dataMap != null) {
+      throw new RuntimeException("Already datamap exists in that path with type " + dataMapName);
+    }
+
+    try {
+      DataMapFactory dataMapFactory = factoryClass.newInstance();
+      dataMapFactory.init(identifier, dataMapName);
+      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+    tableDataMaps.add(dataMap);
+    return dataMap;
+  }
+
+  private TableDataMap getAbstractTableDataMap(String dataMapName,
+      List<TableDataMap> tableDataMaps) {
+    TableDataMap dataMap = null;
+    for (TableDataMap tableDataMap: tableDataMaps) {
+      if (tableDataMap.getDataMapName().equals(dataMapName)) {
+        dataMap = tableDataMap;
+        break;
+      }
+    }
+    return dataMap;
+  }
+
+  /**
+   * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+   * @param identifier
+   * @param dataMapName
+   */
+  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier);
+    if (tableDataMaps != null) {
+      int i = 0;
+      for (TableDataMap tableDataMap: tableDataMaps) {
+        if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+          tableDataMap.clear();
+          tableDataMaps.remove(i);
+          break;
+        }
+        i++;
+      }
+    }
+  }
+
+  /**
+   * Returns the singleton instance
+   * @return
+   */
+  public static DataMapStoreManager getInstance() {
+    return instance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
new file mode 100644
index 0000000..b55c5d9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.events.EventListener;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * DataMap at the table level, user can add any number of datamaps for one table. Depends
+ * on the filter condition it can prune the blocklets.
+ */
+public final class TableDataMap implements EventListener {
+
+  private AbsoluteTableIdentifier identifier;
+
+  private String dataMapName;
+
+  private DataMapFactory dataMapFactory;
+
+  /**
+   * It is called to initialize and load the required table datamap metadata.
+   */
+  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+      DataMapFactory dataMapFactory) {
+    this.identifier = identifier;
+    this.dataMapName = dataMapName;
+    this.dataMapFactory = dataMapFactory;
+  }
+
+  /**
+   * Pass the valid segments and prune the datamap using filter expression
+   *
+   * @param segmentIds
+   * @param filterExp
+   * @return
+   */
+  public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp)
+      throws IOException {
+    List<Blocklet> blocklets = new ArrayList<>();
+    for (String segmentId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+      for (DataMap dataMap : dataMaps) {
+        List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+        blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+      }
+    }
+    return blocklets;
+  }
+
+  private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+    for (Blocklet blocklet : pruneBlocklets) {
+      blocklet.setSegmentId(segmentId);
+    }
+    return pruneBlocklets;
+  }
+
+  /**
+   * This is used for making the datamap distributable.
+   * It takes the valid segments and returns all the datamaps as distributable objects so that
+   * it can be distributed across machines.
+   *
+   * @return
+   */
+  public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException {
+    List<DataMapDistributable> distributables = new ArrayList<>();
+    for (String segmentsId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+      for (DataMap dataMap : dataMaps) {
+        distributables.add(dataMap.toDistributable());
+      }
+    }
+    return distributables;
+  }
+
+  /**
+   * This method is used from any machine after it is distributed. It takes the distributable object
+   * to prune the filters.
+   *
+   * @param distributable
+   * @param filterExp
+   * @return
+   */
+  public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+    return dataMapFactory.getDataMap(distributable).prune(filterExp);
+  }
+
+  @Override public void fireEvent(ChangeEvent event) {
+    dataMapFactory.fireEvent(event);
+  }
+
+  /**
+   * Clear only the datamaps of the segments
+   * @param segmentIds
+   */
+  public void clear(List<String> segmentIds) {
+    for (String segmentId: segmentIds) {
+      dataMapFactory.clear(segmentId);
+    }
+  }
+
+  /**
+   * Clears all datamap
+   */
+  public void clear() {
+    dataMapFactory.clear();
+  }
+  /**
+   * Get the unique name of datamap
+   *
+   * @return
+   */
+  public String getDataMapName() {
+    return dataMapName;
+  }
+
+  public DataMapFactory getDataMapFactory() {
+    return dataMapFactory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
new file mode 100644
index 0000000..526572a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * Datamap is an entity which can store and retrieve index data.
+ */
+public interface DataMap {
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  void init(String path) throws MemoryException, IOException;
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  List<Blocklet> prune(FilterResolverIntf filterExp);
+
+  /**
+   * Convert datamap to distributable object
+   * @return
+   */
+  DataMapDistributable toDistributable();
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  void clear();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
new file mode 100644
index 0000000..873457c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+  /**
+   * Initialization of Datamap factory with the identifier and datamap name
+   */
+  void init(AbsoluteTableIdentifier identifier, String dataMapName);
+
+  /**
+   * Return a new write for this datamap
+   */
+  DataMapWriter createWriter(String segmentId);
+
+  /**
+   * Get the datamap for segmentid
+   */
+  List<DataMap> getDataMaps(String segmentId) throws IOException;
+
+  /**
+   * Get datamap for distributable object.
+   */
+  DataMap getDataMap(DataMapDistributable distributable);
+
+  /**
+   *
+   * @param event
+   */
+  void fireEvent(ChangeEvent event);
+
+  /**
+   * Clears datamap of the segment
+   */
+  void clear(String segmentId);
+
+  /**
+   * Clear all datamaps from memory
+   */
+  void clear();
+
+  /**
+   * Return metadata of this datamap
+   */
+  DataMapMeta getMeta();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
new file mode 100644
index 0000000..28163d7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ * Data Map writer
+ */
+public interface DataMapWriter {
+
+  /**
+   *  Start of new block notification.
+   *  @param blockId file name of the carbondata file
+   */
+  void onBlockStart(String blockId);
+
+  /**
+   * End of block notification
+   */
+  void onBlockEnd(String blockId);
+
+  /**
+   * Start of new blocklet notification.
+   * @param blockletId sequence number of blocklet in the block
+   */
+  void onBlockletStart(int blockletId);
+
+  /**
+   * End of blocklet notification
+   * @param blockletId sequence number of blocklet in the block
+   */
+  void onBlockletEnd(int blockletId);
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   *
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
index ea9c373..0aac1d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
@@ -42,9 +42,6 @@ public class EncodedTablePage {
   // number of row in this page
   private int pageSize;
 
-  // true if it is last page of all input rows
-  private boolean isLastPage;
-
   // size in bytes of all encoded columns (including data and metadate)
   private int encodedSize;
 
@@ -128,14 +125,6 @@ public class EncodedTablePage {
     return pageKey;
   }
 
-  public boolean isLastPage() {
-    return isLastPage;
-  }
-
-  public void setIsLastPage(boolean isWriteAll) {
-    this.isLastPage = isWriteAll;
-  }
-
   public EncodedMeasurePage getMeasure(int measureIndex) {
     return measures[measureIndex];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index fc8c273..9d4af7b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -26,8 +26,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.memory.MemoryException;
 
 /**
  * Class to handle loading, unloading,clearing,storing of the table
@@ -73,10 +73,9 @@ public class BlockletDataMapIndexStore
     if (dataMap == null) {
       try {
         dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
-      } catch (IndexBuilderException e) {
-        throw new IOException(e.getMessage(), e);
-      } catch (Throwable e) {
-        throw new IOException("Problem in loading segment block.", e);
+      } catch (MemoryException e) {
+        LOGGER.error("memory exception when loading datamap: " + e.getMessage());
+        throw new RuntimeException(e.getMessage(), e);
       }
     }
     return dataMap;
@@ -93,6 +92,7 @@ public class BlockletDataMapIndexStore
       for (BlockletDataMap dataMap : blockletDataMaps) {
         dataMap.clear();
       }
+      e.printStackTrace();
       throw new IOException("Problem in loading segment blocks.", e);
     }
     return blockletDataMaps;
@@ -130,7 +130,8 @@ public class BlockletDataMapIndexStore
    * @throws IOException
    */
   private BlockletDataMap loadAndGetDataMap(
-      TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+      TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+      throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
     Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
deleted file mode 100644
index 1276494..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.List;
-
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
-/**
- * Datamap is an entity which can store and retrieve index data.
- */
-public interface DataMap {
-
-  /**
-   * Give the writer to write the data.
-   *
-   * @return
-   */
-  DataMapWriter getWriter();
-
-  /**
-   * It is called to load the data map to memory or to initialize it.
-   */
-  void init(String path);
-
-  /**
-   * Prune the datamap with filter expression. It returns the list of
-   * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
-   */
-  List<Blocklet> prune(FilterResolverIntf filterExp);
-
-  /**
-   * Convert datamap to distributable object
-   * @return
-   */
-  DataMapDistributable toDistributable();
-
-  /**
-   * Clear complete index table and release memory.
-   */
-  void clear();
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
deleted file mode 100644
index 4c379f3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-
-/**
- * Distributable class for datamap.
- */
-public abstract class DataMapDistributable implements Distributable {
-
-  private String tablePath;
-
-  private String segmentId;
-
-  private String dataMapName;
-
-  public String getTablePath() {
-    return tablePath;
-  }
-
-  public void setTablePath(String tablePath) {
-    this.tablePath = tablePath;
-  }
-
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-
-  public String getDataMapName() {
-    return dataMapName;
-  }
-
-  public void setDataMapName(String dataMapName) {
-    this.dataMapName = dataMapName;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
deleted file mode 100644
index 72f714f..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.List;
-
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
-/**
- * Interface for datamap factory, it is responsible for creating the datamap.
- */
-public interface DataMapFactory {
-
-  /**
-   * Initialization of Datamap factory
-   * @param identifier
-   * @param dataMapName
-   */
-  void init(AbsoluteTableIdentifier identifier, String dataMapName);
-  /**
-   * Get the datamap writer for each segmentid.
-   *
-   * @param identifier
-   * @param segmentId
-   * @return
-   */
-  DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
-      String segmentId);
-
-  /**
-   * Get the datamap for segmentid
-   *
-   * @param segmentId
-   * @return
-   */
-  List<DataMap> getDataMaps(String segmentId);
-
-  /**
-   * Get datamap for distributable object.
-   *
-   * @param distributable
-   * @return
-   */
-  DataMap getDataMap(DataMapDistributable distributable);
-
-  /**
-   * This method checks whether the columns and the type of filters supported
-   * for this datamap or not
-   *
-   * @param filterType
-   * @return
-   */
-  boolean isFiltersSupported(FilterType filterType);
-
-  /**
-   *
-   * @param event
-   */
-  void fireEvent(ChangeEvent event);
-
-  /**
-   * Clears datamap of the segment
-   */
-  void clear(String segmentId);
-
-  /**
-   * Clear all datamaps from memory
-   */
-  void clear();
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
deleted file mode 100644
index 1664a6a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-
-/**
- * It maintains all the DataMaps in it.
- */
-public final class DataMapStoreManager {
-
-  private static DataMapStoreManager instance = new DataMapStoreManager();
-
-  /**
-   * Contains the list of datamaps for each table.
-   */
-  private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
-
-  private DataMapStoreManager() {
-
-  }
-
-  /**
-   * Get the datamap for reading data.
-   *
-   * @param dataMapName
-   * @param mapType
-   * @return
-   */
-  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
-      DataMapType mapType) {
-    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
-    TableDataMap dataMap;
-    if (tableDataMaps == null) {
-      createTableDataMap(identifier, mapType, dataMapName);
-      tableDataMaps = dataMapMappping.get(identifier);
-    }
-    dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
-    if (dataMap == null) {
-      throw new RuntimeException("Datamap does not exist");
-    }
-    return dataMap;
-  }
-
-  /**
-   * Create new datamap instance using datamap name, datamap type and table identifier
-   *
-   * @param mapType
-   * @return
-   */
-  private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
-      DataMapType mapType, String dataMapName) {
-    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
-    if (tableDataMaps == null) {
-      tableDataMaps = new ArrayList<>();
-      dataMapMappping.put(identifier, tableDataMaps);
-    }
-    TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
-    if (dataMap != null) {
-      throw new RuntimeException("Already datamap exists in that path with type " + mapType);
-    }
-
-    try {
-      DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
-      dataMapFactory.init(identifier, dataMapName);
-      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new RuntimeException(e);
-    }
-    tableDataMaps.add(dataMap);
-    return dataMap;
-  }
-
-  private TableDataMap getAbstractTableDataMap(String dataMapName,
-      List<TableDataMap> tableDataMaps) {
-    TableDataMap dataMap = null;
-    for (TableDataMap tableDataMap: tableDataMaps) {
-      if (tableDataMap.getDataMapName().equals(dataMapName)) {
-        dataMap = tableDataMap;
-        break;
-      }
-    }
-    return dataMap;
-  }
-
-  /**
-   * Clear the datamap/datamaps of a mentioned datamap name and table from memory
-   * @param identifier
-   * @param dataMapName
-   */
-  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
-    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
-    if (tableDataMaps != null) {
-      int i = 0;
-      for (TableDataMap tableDataMap: tableDataMaps) {
-        if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
-          tableDataMap.clear();
-          tableDataMaps.remove(i);
-          break;
-        }
-        i++;
-      }
-    }
-  }
-
-  /**
-   * Returns the singleton instance
-   * @return
-   */
-  public static DataMapStoreManager getInstance() {
-    return instance;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
deleted file mode 100644
index 0059b29..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
-
-/**
- * Datamap type
- */
-public enum DataMapType {
-  BLOCKLET(BlockletDataMapFactory.class);
-
-  private Class<? extends DataMapFactory> classObject;
-
-  DataMapType(Class<? extends DataMapFactory> classObject) {
-    this.classObject = classObject;
-  }
-
-  public Class<? extends DataMapFactory> getClassObject() {
-    return classObject;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
deleted file mode 100644
index bd8be09..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.io.DataOutput;
-
-/**
- * Data Map writer
- */
-public interface DataMapWriter<T> {
-
-  /**
-   * Initialize the data map writer with output stream
-   *
-   * @param outStream
-   */
-  void init(DataOutput outStream);
-
-  /**
-   * Add the index row to the in-memory store.
-   */
-  void writeData(T data);
-
-  /**
-   * Get the added row count
-   *
-   * @return
-   */
-  int getRowCount();
-
-  /**
-   * Finish writing of data map table, otherwise it will not be allowed to read.
-   */
-  void finish();
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
deleted file mode 100644
index 39ca4c5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.events.EventListener;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-/**
- * DataMap at the table level, user can add any number of datamaps for one table. Depends
- * on the filter condition it can prune the blocklets.
- */
-public final class TableDataMap implements EventListener {
-
-  private AbsoluteTableIdentifier identifier;
-
-  private String dataMapName;
-
-  private DataMapFactory dataMapFactory;
-
-  /**
-   * It is called to initialize and load the required table datamap metadata.
-   */
-  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
-      DataMapFactory dataMapFactory) {
-    this.identifier = identifier;
-    this.dataMapName = dataMapName;
-    this.dataMapFactory = dataMapFactory;
-  }
-
-  /**
-   * Pass the valid segments and prune the datamap using filter expression
-   *
-   * @param segmentIds
-   * @param filterExp
-   * @return
-   */
-  public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
-    List<Blocklet> blocklets = new ArrayList<>();
-    for (String segmentId : segmentIds) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
-      for (DataMap dataMap : dataMaps) {
-        List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
-        blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
-      }
-    }
-    return blocklets;
-  }
-
-  private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
-    for (Blocklet blocklet : pruneBlocklets) {
-      blocklet.setSegmentId(segmentId);
-    }
-    return pruneBlocklets;
-  }
-
-  /**
-   * This is used for making the datamap distributable.
-   * It takes the valid segments and returns all the datamaps as distributable objects so that
-   * it can be distributed across machines.
-   *
-   * @return
-   */
-  public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
-    List<DataMapDistributable> distributables = new ArrayList<>();
-    for (String segmentsId : segmentIds) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
-      for (DataMap dataMap : dataMaps) {
-        distributables.add(dataMap.toDistributable());
-      }
-    }
-    return distributables;
-  }
-
-  /**
-   * This method is used from any machine after it is distributed. It takes the distributable object
-   * to prune the filters.
-   *
-   * @param distributable
-   * @param filterExp
-   * @return
-   */
-  public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
-    return dataMapFactory.getDataMap(distributable).prune(filterExp);
-  }
-
-  @Override public void fireEvent(ChangeEvent event) {
-    dataMapFactory.fireEvent(event);
-  }
-
-  /**
-   * Clear only the datamaps of the segments
-   * @param segmentIds
-   */
-  public void clear(List<String> segmentIds) {
-    for (String segmentId: segmentIds) {
-      dataMapFactory.clear(segmentId);
-    }
-  }
-
-  /**
-   * Clears all datamap
-   */
-  public void clear() {
-    dataMapFactory.clear();
-  }
-  /**
-   * Get the unique name of datamap
-   *
-   * @return
-   */
-  public String getDataMapName() {
-    return dataMapName;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 4b5be11..2e82c46 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -31,14 +31,13 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import org.apache.carbondata.core.indexstore.DataMap;
-import org.apache.carbondata.core.indexstore.DataMapDistributable;
-import org.apache.carbondata.core.indexstore.DataMapWriter;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -64,6 +63,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
 
+  public static final String NAME = "clustered.btree.blocklet";
+
   private static int KEY_INDEX = 0;
 
   private static int MIN_VALUES_INDEX = 1;
@@ -88,31 +89,23 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private int[] columnCardinality;
 
-  @Override public DataMapWriter getWriter() {
-    return null;
-  }
-
-  @Override public void init(String path) {
+  @Override public void init(String path) throws IOException, MemoryException {
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    try {
-      List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
-      for (DataFileFooter fileFooter : indexInfo) {
-        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
-        if (segmentProperties == null) {
-          columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
-          segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-          createSchema(segmentProperties);
-        }
-        TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
-        fileFooter = CarbonUtil.readMetadatFile(blockInfo);
-
-        loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
-      }
-      if (unsafeMemoryDMStore != null) {
-        unsafeMemoryDMStore.finishWriting();
+    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+    for (DataFileFooter fileFooter : indexInfo) {
+      List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+      if (segmentProperties == null) {
+        columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+        segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+        createSchema(segmentProperties);
       }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+      TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+      fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+      loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+    }
+    if (unsafeMemoryDMStore != null) {
+      unsafeMemoryDMStore.finishWriting();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2fe6643..e189931 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -25,16 +25,16 @@ import java.util.Map;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.indexstore.DataMap;
-import org.apache.carbondata.core.indexstore.DataMapDistributable;
-import org.apache.carbondata.core.indexstore.DataMapFactory;
-import org.apache.carbondata.core.indexstore.DataMapWriter;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
-import org.apache.carbondata.core.indexstore.schema.FilterType;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 
 /**
@@ -44,21 +44,25 @@ public class BlockletDataMapFactory implements DataMapFactory {
 
   private AbsoluteTableIdentifier identifier;
 
+  // segmentId -> list of index file
   private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
   private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
 
+  @Override
   public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
     this.identifier = identifier;
     cache = CacheProvider.getInstance()
         .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
   }
 
-  public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
-    return null;
+  @Override
+  public DataMapWriter createWriter(String segmentId) {
+    throw new UnsupportedOperationException("not implemented");
   }
 
-  public List<DataMap> getDataMaps(String segmentId) {
+  @Override
+  public List<DataMap> getDataMaps(String segmentId) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segmentId);
     if (tableBlockIndexUniqueIdentifiers == null) {
@@ -77,17 +81,10 @@ public class BlockletDataMapFactory implements DataMapFactory {
       }
     }
 
-    try {
-      return cache.getAll(tableBlockIndexUniqueIdentifiers);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override public boolean isFiltersSupported(FilterType filterType) {
-    return true;
+    return cache.getAll(tableBlockIndexUniqueIdentifiers);
   }
 
+  @Override
   public void clear(String segmentId) {
     List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
     if (blockIndexes != null) {
@@ -99,17 +96,26 @@ public class BlockletDataMapFactory implements DataMapFactory {
     }
   }
 
-  @Override public void clear() {
+  @Override
+  public void clear() {
     for (String segmentId: segmentMap.keySet()) {
       clear(segmentId);
     }
   }
 
-  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+  @Override
+  public DataMap getDataMap(DataMapDistributable distributable) {
     return null;
   }
 
-  @Override public void fireEvent(ChangeEvent event) {
+  @Override
+  public void fireEvent(ChangeEvent event) {
 
   }
+
+  @Override
+  public DataMapMeta getMeta() {
+    // TODO: pass SORT_COLUMNS into this class
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 22faaf2..31ad03b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -156,4 +156,8 @@ public class AbsoluteTableIdentifier implements Serializable {
     }
     return true;
   }
+
+  public String uniqueName() {
+    return storePath + "/" + carbonTableIdentifier.toString().toLowerCase();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
index 31a0b23..cc65d9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
@@ -128,9 +128,9 @@ public class CarbonTableIdentifier implements Serializable {
     return true;
   }
 
-  /*
- * @return table unidque name
- */
+  /**
+   * return unique table name
+   */
   @Override public String toString() {
     return databaseName + '_' + tableName;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index edc4c28..15512a8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1950,5 +1950,11 @@ public final class CarbonUtil {
         throw new IllegalArgumentException("Invalid data type: " + meta.getType());
     }
   }
+
+  public static void requireNotNull(Object obj) {
+    if (obj == null) {
+      throw new IllegalArgumentException("parameter not be null");
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 54ad18b..19e264b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -30,11 +30,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
 import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.DataMapStoreManager;
-import org.apache.carbondata.core.indexstore.DataMapType;
-import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -246,7 +247,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
     TableDataMap blockletMap =
-        DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+        DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
+            BlockletDataMapFactory.class);
     List<String> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
@@ -403,7 +405,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
 
     TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+        .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
     List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
 
     List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
@@ -549,8 +551,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
       throws IOException, KeyGenException {
-    TableDataMap blockletMap =
-        DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+    TableDataMap blockletMap = DataMapStoreManager.getInstance()
+        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
         new SegmentStatusManager(identifier).getValidAndInvalidSegments();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
deleted file mode 100644
index 76c7e6f..0000000
--- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.integration.spark.load;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.spark.load.CarbonLoaderUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test class to test block distribution functionality
- */
-public class CarbonLoaderUtilTest {
-  List<Distributable> blockInfos = null;
-  int noOfNodesInput = -1;
-  List<String> activeNode = null;
-  Map<String, List<Distributable>> expected = null;
-  Map<String, List<Distributable>> mapOfNodes = null;
-
-  @Test public void nodeBlockMapping() throws Exception {
-
-    // scenario when the 3 nodes and 3 executors
-    initSet1();
-    Map<String, List<Distributable>> mapOfNodes =
-            CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
-    // node allocation
-    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
-    // block allocation
-    boolean isEqual = compareResult(expected, mapOfNodes);
-    Assert.assertTrue("Block Allocation", isEqual);
-
-    // 2 node and 3 executors
-    initSet2();
-    mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
-    // node allocation
-    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
-    // block allocation
-    isEqual = compareResult(expected, mapOfNodes);
-    Assert.assertTrue("Block Allocation", isEqual);
-
-    // 3 data node and 2 executors
-    initSet3();
-    mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
-    // node allocation
-    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
-    // block allocation
-    isEqual = compareResult(expected, mapOfNodes);
-    Assert.assertTrue("Block Allocation", isEqual);
-  }
-
-  /**
-   * compares the blocks allocation
-   *
-   * @param expectedResult
-   * @param actualResult
-   * @return
-   */
-  private boolean compareResult(Map<String, List<Distributable>> expectedResult,
-                                Map<String, List<Distributable>> actualResult) {
-    expectedResult = sortByListSize(expectedResult);
-    actualResult = sortByListSize(actualResult);
-    List<List<Distributable>> expectedList = new LinkedList(expectedResult.entrySet());
-    List<List<Distributable>> mapOfNodesList = new LinkedList(actualResult.entrySet());
-    boolean isEqual = expectedList.size() == mapOfNodesList.size();
-    if (isEqual) {
-      for (int i = 0; i < expectedList.size(); i++) {
-        int size1 = ((List) ((Map.Entry) (expectedList.get(i))).getValue()).size();
-        int size2 = ((List) ((Map.Entry) (mapOfNodesList.get(i))).getValue()).size();
-        isEqual = size1 == size2;
-        if (!isEqual) {
-          break;
-        }
-      }
-    }
-    return isEqual;
-  }
-
-  /**
-   * sort by list size
-   *
-   * @param map
-   * @return
-   */
-  private static Map<String, List<Distributable>> sortByListSize(
-          Map<String, List<Distributable>> map) {
-    List<List<Distributable>> list = new LinkedList(map.entrySet());
-    Collections.sort(list, new Comparator() {
-      public int compare(Object obj1, Object obj2) {
-        if (obj1 == null && obj2 == null) {
-          return 0;
-        } else if (obj1 == null) {
-          return 1;
-        } else if (obj2 == null) {
-          return -1;
-        }
-        int size1 = ((List) ((Map.Entry) (obj1)).getValue()).size();
-        int size2 = ((List) ((Map.Entry) (obj2)).getValue()).size();
-        return size2 - size1;
-      }
-    });
-
-    Map res = new LinkedHashMap();
-    for (Iterator it = list.iterator(); it.hasNext(); ) {
-      Map.Entry entry = (Map.Entry) it.next();
-      res.put(entry.getKey(), entry.getValue());
-    }
-    return res;
-  }
-
-  void initSet1() {
-    blockInfos = new ArrayList<>();
-    activeNode = new ArrayList<>();
-    activeNode.add("node-7");
-    activeNode.add("node-9");
-    activeNode.add("node-11");
-    String[] location = { "node-7", "node-9", "node-11" };
-    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
-    expected = new HashMap<>();
-    expected.put("node-7", blockInfos.subList(0, 2));
-    expected.put("node-9", blockInfos.subList(2, 4));
-    expected.put("node-11", blockInfos.subList(4, 6));
-  }
-
-  void initSet2() {
-    blockInfos = new ArrayList<>();
-    activeNode = new ArrayList<>();
-    activeNode.add("node-7");
-    activeNode.add("node-9");
-    activeNode.add("node-11");
-    String[] location = { "node-7", "node-11" };
-    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
-    expected = new HashMap<>();
-    expected.put("node-7", blockInfos.subList(0, 2));
-    expected.put("node-9", blockInfos.subList(2, 4));
-    expected.put("node-11", blockInfos.subList(4, 6));
-  }
-
-  void initSet3() {
-    blockInfos = new ArrayList<>();
-    activeNode = new ArrayList<>();
-    activeNode.add("node-7");
-    activeNode.add("node-11");
-    String[] location = { "node-7", "node-9", "node-11" };
-    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
-    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
-    expected = new HashMap<>();
-    expected.put("node-7", blockInfos.subList(0, 3));
-    expected.put("node-11", blockInfos.subList(3, 6));
-  }
-
-
-  /**
-   * Test case with 4 blocks and 4 nodes with 3 replication.
-   *
-   * @throws Exception
-   */
-  @Test public void nodeBlockMapping() throws Exception {
-
-    Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
-    TableBlockInfo block1 =
-        new TableBlockInfo("path1", 123, "1", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block2 =
-        new TableBlockInfo("path2", 123, "2", new String[] { "2", "3", "4" }, 111);
-    TableBlockInfo block3 =
-        new TableBlockInfo("path3", 123, "3", new String[] { "3", "4", "1" }, 111);
-    TableBlockInfo block4 =
-        new TableBlockInfo("path4", 123, "4", new String[] { "1", "2", "4" }, 111);
-
-    inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block2, Arrays.asList(new String[]{"2","3","4"}));
-    inputMap.put(block3, Arrays.asList(new String[]{"3","4","1"}));
-    inputMap.put(block4, Arrays.asList(new String[]{"1","2","4"}));
-
-    List<TableBlockInfo> inputBlocks = new ArrayList(6);
-    inputBlocks.add(block1);
-    inputBlocks.add(block2);
-    inputBlocks.add(block3);
-    inputBlocks.add(block4);
-
-    Map<String, List<TableBlockInfo>> outputMap
-        = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
-    Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 4, 4));
-
-    Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 4, 4));
-  }
-
-  private boolean calculateBlockLocality(Map<TableBlockInfo, List<String>> inputMap,
-      Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) {
-
-    double notInNodeLocality = 0;
-    for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) {
-
-      List<TableBlockInfo> blockListOfANode = entry.getValue();
-
-      for (TableBlockInfo eachBlock : blockListOfANode) {
-
-        // for each block check the node locality
-
-        List<String> blockLocality = inputMap.get(eachBlock);
-        if (!blockLocality.contains(entry.getKey())) {
-          notInNodeLocality++;
-        }
-      }
-    }
-
-    System.out.println(
-        ((notInNodeLocality / numberOfBlocks) * 100) + " " + "is the node locality mismatch");
-    if ((notInNodeLocality / numberOfBlocks) * 100 > 30) {
-      return false;
-    }
-    return true;
-  }
-
-  private boolean calculateBlockDistribution(Map<TableBlockInfo, List<String>> inputMap,
-      Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) {
-
-    int nodesPerBlock = numberOfBlocks / numberOfNodes;
-
-    for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) {
-
-      if (entry.getValue().size() < nodesPerBlock) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Test case with 5 blocks and 3 nodes
-   *
-   * @throws Exception
-   */
-  @Test public void nodeBlockMappingTestWith5blocks3nodes() throws Exception {
-
-    Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
-    TableBlockInfo block1 =
-        new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block2 =
-        new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block3 =
-        new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block4 =
-        new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block5 =
-        new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111);
-
-    inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"}));
-
-    List<TableBlockInfo> inputBlocks = new ArrayList(6);
-    inputBlocks.add(block1);
-    inputBlocks.add(block2);
-    inputBlocks.add(block3);
-    inputBlocks.add(block4);
-    inputBlocks.add(block5);
-
-    Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 3);
-
-    Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 5, 3));
-
-    Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 5, 3));
-
-  }
-
-  /**
-   * Test case with 6 blocks and 4 nodes where 4 th node doesnt have any local data.
-   *
-   * @throws Exception
-   */
-  @Test public void nodeBlockMappingTestWith6Blocks4nodes() throws Exception {
-
-    Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
-    TableBlockInfo block1 =
-        new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block2 =
-        new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block3 =
-        new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block4 =
-        new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block5 =
-        new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111);
-    TableBlockInfo block6 =
-        new TableBlockInfo("part-5-0-1462341987000", 123, "6", new String[] { "1", "2", "3" }, 111);
-
-    inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"}));
-    inputMap.put(block6, Arrays.asList(new String[]{"1","2","3"}));
-
-
-    List<TableBlockInfo> inputBlocks = new ArrayList(6);
-    inputBlocks.add(block1);
-    inputBlocks.add(block2);
-    inputBlocks.add(block3);
-    inputBlocks.add(block4);
-    inputBlocks.add(block5);
-    inputBlocks.add(block6);
-
-    Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
-    Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 6, 4));
-
-    Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 6, 4));
-
-  }
-
-  /**
-   * Test case with 10 blocks and 4 nodes with 10,60,30 % distribution
-   *
-   * @throws Exception
-   */
-  @Test public void nodeBlockMappingTestWith10Blocks4nodes() throws Exception {
-
-    Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5);
-
-    TableBlockInfo block1 =
-        new TableBlockInfo("part-1-0-1462341987000", 123, "1", new String[] { "2", "4" }, 111);
-    TableBlockInfo block2 =
-        new TableBlockInfo("part-2-0-1462341987000", 123, "2", new String[] { "2", "4" }, 111);
-    TableBlockInfo block3 =
-        new TableBlockInfo("part-3-0-1462341987000", 123, "3", new String[] { "2", "4" }, 111);
-    TableBlockInfo block4 =
-        new TableBlockInfo("part-4-0-1462341987000", 123, "4", new String[] { "2", "4" }, 111);
-    TableBlockInfo block5 =
-        new TableBlockInfo("part-5-0-1462341987000", 123, "5", new String[] { "2", "4" }, 111);
-    TableBlockInfo block6 =
-        new TableBlockInfo("part-6-0-1462341987000", 123, "6", new String[] { "2", "4" }, 111);
-    TableBlockInfo block7 =
-        new TableBlockInfo("part-7-0-1462341987000", 123, "7", new String[] { "3", "4" }, 111);
-    TableBlockInfo block8 =
-        new TableBlockInfo("part-8-0-1462341987000", 123, "8", new String[] { "3", "4" }, 111);
-    TableBlockInfo block9 =
-        new TableBlockInfo("part-9-0-1462341987000", 123, "9", new String[] { "3", "4" }, 111);
-    TableBlockInfo block10 =
-        new TableBlockInfo("part-10-0-1462341987000", 123, "9", new String[] { "1", "4" }, 111);
-
-    inputMap.put(block1, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block2, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block3, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block4, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block5, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block6, Arrays.asList(new String[]{"2","4"}));
-    inputMap.put(block7, Arrays.asList(new String[]{"3","4"}));
-    inputMap.put(block8, Arrays.asList(new String[]{"3","4"}));
-    inputMap.put(block9, Arrays.asList(new String[]{"3","4"}));
-    inputMap.put(block10, Arrays.asList(new String[]{"1","4"}));
-
-    List<TableBlockInfo> inputBlocks = new ArrayList(6);
-    inputBlocks.add(block1);
-    inputBlocks.add(block2);
-    inputBlocks.add(block3);
-    inputBlocks.add(block4);
-    inputBlocks.add(block5);
-    inputBlocks.add(block6);
-    inputBlocks.add(block7);
-    inputBlocks.add(block8);
-    inputBlocks.add(block9);
-    inputBlocks.add(block10);
-
-    Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4);
-
-    Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 10, 4));
-
-    Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 10, 4));
-  }
-
-}
\ No newline at end of file