You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/11 05:18:00 UTC

[1/3] carbondata git commit: [CARBONDATA-1232] Datamap implementation for Blocklet

Repository: carbondata
Updated Branches:
  refs/heads/datamap 3ecb3ec58 -> 6d71d9c47


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index c328a64..01b4e61 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -17,14 +17,31 @@
 
 package org.apache.carbondata.presto.impl;
 
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.*;
-import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.DataRefNodeFinder;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.BlockletInfos;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -52,18 +69,24 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CacheClient;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.thrift.TBase;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 import static java.util.Objects.requireNonNull;
 
 /** CarbonTableReader will be a facade of these utils
@@ -312,8 +335,9 @@ public class CarbonTableReader {
           TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
 
           if (IUDTable) {
-            if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
-                updateStatusManager)) {
+            if (CarbonUtil
+                .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+                    invalidBlockVOForSegmentId, updateStatusManager)) {
               continue;
             }
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 277005b..da0d082 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -71,7 +72,7 @@ class CarbonIUDMergerRDD[K, V](
 
     var blocksOfLastSegment: List[TableBlockInfo] = null
 
-    CarbonInputFormat.setSegmentsToAccess(
+    CarbonTableInputFormat.setSegmentsToAccess(
       job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
 
     // get splits

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index caa389a..0d50882 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -43,8 +43,8 @@ import org.apache.carbondata.core.mutate.UpdateVO
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
 import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -286,7 +286,7 @@ class CarbonMergerRDD[K, V](
     for (eachSeg <- carbonMergerMapping.validSegments) {
 
       // map for keeping the relation of a task and its blocks.
-      job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+      job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
 
       if (updateStatusManager.getUpdateStatusDetails.length != 0) {
          updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg)
@@ -308,7 +308,8 @@ class CarbonMergerRDD[K, V](
           updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
         )
         ((!updated) || ((updated) && (!CarbonUtil
-          .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager))))
+          .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
+            updateDetails, updateStatusManager))))
       })
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 4807b90..62a6a92 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.hadoop._
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
 
@@ -245,21 +246,22 @@ class CarbonScanRDD(
     iterator.asInstanceOf[Iterator[InternalRow]]
   }
 
-  private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[Object] = {
-    CarbonInputFormat.setCarbonTable(conf, carbonTable)
+  private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
+    CarbonTableInputFormat.setCarbonTable(conf, carbonTable)
     createInputFormat(conf)
   }
 
-  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
-    CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
+  private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
+    CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
     createInputFormat(conf)
   }
 
-  private def createInputFormat(conf: Configuration): CarbonInputFormat[Object] = {
-    val format = new CarbonInputFormat[Object]
-    CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath))
-    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
-    CarbonInputFormat.setColumnProjection(conf, columnProjection)
+  private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
+    val format = new CarbonTableInputFormat[Object]
+    CarbonTableInputFormat.setTablePath(conf,
+      identifier.appendWithLocalPrefix(identifier.getTablePath))
+    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
+    CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
     format
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 2ca3b8c..4950227 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 
 
 /**
@@ -38,8 +38,8 @@ object QueryPlanUtil {
    * createCarbonInputFormat from query model
    */
   def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
-  (CarbonInputFormat[Array[Object]], Job) = {
-    val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+  (CarbonTableInputFormat[Array[Object]], Job) = {
+    val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -47,8 +47,8 @@ object QueryPlanUtil {
   }
 
   def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
-      conf: Configuration) : CarbonInputFormat[V] = {
-    val carbonInputFormat = new CarbonInputFormat[V]()
+      conf: Configuration) : CarbonTableInputFormat[V] = {
+    val carbonInputFormat = new CarbonTableInputFormat[V]()
     val job: Job = new Job(conf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
     carbonInputFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0e6153f..e3362e4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,7 +38,8 @@ import org.apache.spark.util.SerializableConfiguration
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
 import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
@@ -88,17 +89,17 @@ private[sql] case class CarbonDatasourceHadoopRelation(
     filters.flatMap { filter =>
       CarbonFilters.createCarbonFilter(dataSchema, filter)
     }.reduceOption(new AndExpression(_, _))
-      .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+      .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _))
 
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
-    CarbonInputFormat.setColumnProjection(conf, projection)
-    CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
+    CarbonTableInputFormat.setColumnProjection(conf, projection)
+    CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
 
     new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
       new SerializableConfiguration(conf),
       absIdentifier,
-      classOf[CarbonInputFormat[Row]],
+      classOf[CarbonTableInputFormat[Row]],
       classOf[Row]
     )
   }
@@ -118,7 +119,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
   @transient sc: SparkContext,
   conf: SerializableConfiguration,
   identifier: AbsoluteTableIdentifier,
-  inputFormatClass: Class[_ <: CarbonInputFormat[V]],
+  inputFormatClass: Class[_ <: CarbonTableInputFormat[V]],
   valueClass: Class[V])
   extends RDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index a292cde..c38f0e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -133,13 +133,6 @@ private[sql] case class ProjectForUpdateCommand(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
 
-
-   //  sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
-    //  .EXECUTION_ID_KEY, null)
-    // DataFrame(sqlContext, plan).show(truncate = false)
-    // return Seq.empty
-
-
     val res = plan find {
       case relation: LogicalRelation if (relation.relation
         .isInstanceOf[CarbonDatasourceRelation]) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 70c7caf..e0a8b58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 
 /**
  * All the utility functions for carbon plan creation
@@ -37,8 +37,8 @@ object QueryPlanUtil {
    * createCarbonInputFormat from query model
    */
   def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
-  (CarbonInputFormat[Array[Object]], Job) = {
-    val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
+  (CarbonTableInputFormat[Array[Object]], Job) = {
+    val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
@@ -46,8 +46,8 @@ object QueryPlanUtil {
   }
 
   def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
-      conf: Configuration) : CarbonInputFormat[V] = {
-    val carbonInputFormat = new CarbonInputFormat[V]()
+      conf: Configuration) : CarbonTableInputFormat[V] = {
+    val carbonInputFormat = new CarbonTableInputFormat[V]()
     val job: Job = new Job(conf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
     carbonInputFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 04a94ce..9fde546 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -40,7 +40,8 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.indexstore.{DataMapStoreManager, DataMapType, TableDataMap}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -511,8 +512,10 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
 
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+    val tablePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+      new CarbonTableIdentifier(dbName, tableName, ""))
+    val metadataFilePath = tablePath.getMetadataDirectoryPath
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath.getPath)
 
     val fileType = FileFactory.getFileType(metadataFilePath)
 
@@ -528,6 +531,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
         case Some(tableMeta) =>
           metadata.tablesMeta -= tableMeta
           CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+          DataMapStoreManager.getInstance.clearDataMap(identifier, "blocklet")
           updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
         case None =>
           LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 8cdcd26..99bfd44 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.merger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -379,4 +380,35 @@ public class CarbonCompactionUtil {
     }
     return restructuredBlockExists;
   }
+
+  /**
+   * This method will check for any restructured block in the blocks selected for compaction
+   *
+   * @param segmentMapping
+   * @param tableLastUpdatedTime
+   * @return
+   */
+  public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping,
+      long tableLastUpdatedTime) {
+    boolean restructuredBlockExists = false;
+    for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+      String segmentId = taskMap.getKey();
+      TaskBlockInfo taskBlockInfo = taskMap.getValue();
+      Collection<List<TableBlockInfo>> infoList = taskBlockInfo.getAllTableBlockInfoList();
+      for (List<TableBlockInfo> listMetadata : infoList) {
+        for (TableBlockInfo blockInfo : listMetadata) {
+          // if schema modified timestamp is greater than footer stored schema timestamp,
+          // it indicates it is a restructured block
+          if (tableLastUpdatedTime > blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp()) {
+            restructuredBlockExists = true;
+            break;
+          }
+        }
+      }
+      if (restructuredBlockExists) {
+        break;
+      }
+    }
+    return restructuredBlockExists;
+  }
 }


[2/3] carbondata git commit: [CARBONDATA-1232] Datamap implementation for Blocklet

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
new file mode 100644
index 0000000..defe766
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+
+/**
+ * It is just a normal row to store data. Implementation classes could be safe and unsafe.
+ * TODO move this class a global row and use across loading after DataType is changed class
+ */
+public abstract class DataMapRow {
+
+  protected DataMapSchema[] schemas;
+
+  public DataMapRow(DataMapSchema[] schemas) {
+    this.schemas = schemas;
+  }
+
+  public abstract byte[] getByteArray(int ordinal);
+
+  public abstract DataMapRow getRow(int ordinal);
+
+  public abstract void setRow(DataMapRow row, int ordinal);
+
+  public abstract void setByteArray(byte[] byteArray, int ordinal);
+
+  public abstract int getInt(int ordinal);
+
+  public abstract void setInt(int value, int ordinal);
+
+  public abstract void setByte(byte value, int ordinal);
+
+  public abstract byte getByte(int ordinal);
+
+  public abstract void setShort(short value, int ordinal);
+
+  public abstract short getShort(int ordinal);
+
+  public abstract void setLong(long value, int ordinal);
+
+  public abstract long getLong(int ordinal);
+
+  public abstract void setFloat(float value, int ordinal);
+
+  public abstract float getFloat(int ordinal);
+
+  public abstract void setDouble(double value, int ordinal);
+
+  public abstract double getDouble(int ordinal);
+
+  public int getTotalSizeInBytes() {
+    int len = 0;
+    for (int i = 0; i < schemas.length; i++) {
+      len += getSizeInBytes(i);
+    }
+    return len;
+  }
+
+  public int getSizeInBytes(int ordinal) {
+    switch (schemas[ordinal].getSchemaType()) {
+      case FIXED:
+        return schemas[ordinal].getLength();
+      case VARIABLE:
+        return getByteArray(ordinal).length + 2;
+      case STRUCT:
+        return getRow(ordinal).getTotalSizeInBytes();
+      default:
+        throw new UnsupportedOperationException("wrong type");
+    }
+  }
+
+  public int getColumnCount() {
+    return schemas.length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
new file mode 100644
index 0000000..adec346
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Data map row.
+ */
+public class DataMapRowImpl extends DataMapRow {
+
+  private Object[] data;
+
+  public DataMapRowImpl(DataMapSchema[] schemas) {
+    super(schemas);
+    this.data = new Object[schemas.length];
+  }
+
+  @Override public byte[] getByteArray(int ordinal) {
+    return (byte[]) data[ordinal];
+  }
+
+  @Override public DataMapRow getRow(int ordinal) {
+    return (DataMapRow) data[ordinal];
+  }
+
+  @Override public void setByteArray(byte[] byteArray, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.BYTE_ARRAY);
+    data[ordinal] = byteArray;
+  }
+
+  @Override public int getInt(int ordinal) {
+    return (Integer) data[ordinal];
+  }
+
+  @Override public void setInt(int value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.INT);
+    data[ordinal] = value;
+  }
+
+  @Override public void setByte(byte value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.BYTE);
+    data[ordinal] = value;
+  }
+
+  @Override public byte getByte(int ordinal) {
+    return (Byte) data[ordinal];
+  }
+
+  @Override public void setShort(short value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.SHORT);
+    data[ordinal] = value;
+  }
+
+  @Override public short getShort(int ordinal) {
+    return (Short) data[ordinal];
+  }
+
+  @Override public void setLong(long value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.LONG);
+    data[ordinal] = value;
+  }
+
+  @Override public long getLong(int ordinal) {
+    return (Long) data[ordinal];
+  }
+
+  @Override public void setFloat(float value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.FLOAT);
+    data[ordinal] = value;
+  }
+
+  @Override public float getFloat(int ordinal) {
+    return (Float) data[ordinal];
+  }
+
+  @Override public void setDouble(double value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.DOUBLE);
+    data[ordinal] = value;
+  }
+
+  @Override public void setRow(DataMapRow row, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.STRUCT);
+    data[ordinal] = row;
+  }
+
+  @Override public double getDouble(int ordinal) {
+    return (Double) data[ordinal];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
new file mode 100644
index 0000000..ef78514
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Unsafe implementation of data map row.
+ */
+public class UnsafeDataMapRow extends DataMapRow {
+
+  private MemoryBlock block;
+
+  private int pointer;
+
+  public UnsafeDataMapRow(DataMapSchema[] schemas, MemoryBlock block, int pointer) {
+    super(schemas);
+    this.block = block;
+    this.pointer = pointer;
+  }
+
+  @Override public byte[] getByteArray(int ordinal) {
+    int length;
+    int position = getPosition(ordinal);
+    switch (schemas[ordinal].getSchemaType()) {
+      case VARIABLE:
+        length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+        position += 2;
+        break;
+      default:
+        length = schemas[ordinal].getLength();
+    }
+    byte[] data = new byte[length];
+    unsafe.copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data,
+        BYTE_ARRAY_OFFSET, data.length);
+    return data;
+  }
+
+  @Override public DataMapRow getRow(int ordinal) {
+    DataMapSchema[] childSchemas =
+        ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
+    return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal));
+  }
+
+  @Override public void setByteArray(byte[] byteArray, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public int getInt(int ordinal) {
+    return unsafe
+        .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setInt(int value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public void setByte(byte value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public byte getByte(int ordinal) {
+    return unsafe
+        .getByte(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setShort(short value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public short getShort(int ordinal) {
+    return unsafe
+        .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setLong(long value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public long getLong(int ordinal) {
+    return unsafe
+        .getLong(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setFloat(float value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public float getFloat(int ordinal) {
+    return unsafe
+        .getFloat(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setDouble(double value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public double getDouble(int ordinal) {
+    return unsafe
+        .getDouble(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setRow(DataMapRow row, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  private int getPosition(int ordinal) {
+    int position = 0;
+    for (int i = 0; i < ordinal; i++) {
+      position += getSizeInBytes(i);
+    }
+    return position;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
new file mode 100644
index 0000000..80c68ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * It just have 2 types right now, either fixed or variable.
+ */
+public abstract class DataMapSchema {
+
+  protected DataType dataType;
+
+  public DataMapSchema(DataType dataType) {
+    this.dataType = dataType;
+  }
+
+  /**
+   * Either fixed or variable length.
+   *
+   * @return
+   */
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  /**
+   * Gives length in case of fixed schema other wise returns length
+   *
+   * @return
+   */
+  public abstract int getLength();
+
+  /**
+   * schema type
+   * @return
+   */
+  public abstract DataMapSchemaType getSchemaType();
+
+  /*
+ * It has always fixed length, length cannot be updated later.
+ * Usage examples : all primitive types like short, int etc
+ */
+  public static class FixedDataMapSchema extends DataMapSchema {
+
+    private int length;
+
+    public FixedDataMapSchema(DataType dataType) {
+      super(dataType);
+    }
+
+    public FixedDataMapSchema(DataType dataType, int length) {
+      super(dataType);
+      this.length = length;
+    }
+
+    @Override public int getLength() {
+      if (length == 0) {
+        return dataType.getSizeInBytes();
+      } else {
+        return length;
+      }
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.FIXED;
+    }
+  }
+
+  public static class VariableDataMapSchema extends DataMapSchema {
+
+    public VariableDataMapSchema(DataType dataType) {
+      super(dataType);
+    }
+
+    @Override public int getLength() {
+      return dataType.getSizeInBytes();
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.VARIABLE;
+    }
+  }
+
+  public static class StructDataMapSchema extends DataMapSchema {
+
+    private DataMapSchema[] childSchemas;
+
+    public StructDataMapSchema(DataType dataType, DataMapSchema[] childSchemas) {
+      super(dataType);
+      this.childSchemas = childSchemas;
+    }
+
+    @Override public int getLength() {
+      return dataType.getSizeInBytes();
+    }
+
+    public DataMapSchema[] getChildSchemas() {
+      return childSchemas;
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.STRUCT;
+    }
+  }
+
+  public enum DataMapSchemaType {
+    FIXED, VARIABLE, STRUCT
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
new file mode 100644
index 0000000..9d77010
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+/**
+ * Types of filters of select query
+ */
+public enum FilterType {
+  EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index bfa9d7e..f81f805 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -17,16 +17,22 @@
 
 package org.apache.carbondata.core.metadata.blocklet;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 
+import org.apache.hadoop.io.Writable;
+
 /**
  * class to store the information about the blocklet
  */
-public class BlockletInfo implements Serializable {
+public class BlockletInfo implements Serializable, Writable {
 
   /**
    * serialization id
@@ -189,4 +195,49 @@ public class BlockletInfo implements Serializable {
     this.numberOfPages = numberOfPages;
   }
 
+  @Override public void write(DataOutput output) throws IOException {
+    output.writeLong(dimensionOffset);
+    output.writeLong(measureOffsets);
+    int dsize = dimensionChunkOffsets != null ? dimensionChunkOffsets.size() : 0;
+    output.writeShort(dsize);
+    for (int i = 0; i < dsize; i++) {
+      output.writeLong(dimensionChunkOffsets.get(i));
+    }
+    for (int i = 0; i < dsize; i++) {
+      output.writeInt(dimensionChunksLength.get(i));
+    }
+    int mSize = measureChunkOffsets != null ? measureChunkOffsets.size() : 0;
+    output.writeShort(mSize);
+    for (int i = 0; i < mSize; i++) {
+      output.writeLong(measureChunkOffsets.get(i));
+    }
+    for (int i = 0; i < mSize; i++) {
+      output.writeInt(measureChunksLength.get(i));
+    }
+  }
+
+  @Override public void readFields(DataInput input) throws IOException {
+    dimensionOffset = input.readLong();
+    measureOffsets = input.readLong();
+    short dimensionChunkOffsetsSize = input.readShort();
+    dimensionChunkOffsets = new ArrayList<>(dimensionChunkOffsetsSize);
+    for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+      dimensionChunkOffsets.add(input.readLong());
+    }
+    dimensionChunksLength = new ArrayList<>(dimensionChunkOffsetsSize);
+    for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+      dimensionChunksLength.add(input.readInt());
+    }
+
+    short measureChunkOffsetsSize = input.readShort();
+    measureChunkOffsets = new ArrayList<>(measureChunkOffsetsSize);
+    for (int i = 0; i < measureChunkOffsetsSize; i++) {
+      measureChunkOffsets.add(input.readLong());
+    }
+    measureChunksLength = new ArrayList<>(measureChunkOffsetsSize);
+    for (int i = 0; i < measureChunkOffsetsSize; i++) {
+      measureChunksLength.add(input.readInt());
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
index cd86a07..ae99ed8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.metadata.index;
 
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 
 /**
@@ -45,6 +46,11 @@ public class BlockIndexInfo {
   private BlockletIndex blockletIndex;
 
   /**
+   * to store blocklet info like offsets and lengths of each column.
+   */
+  private BlockletInfo blockletInfo;
+
+  /**
    * Constructor
    *
    * @param numberOfRows  number of rows
@@ -61,6 +67,20 @@ public class BlockIndexInfo {
   }
 
   /**
+   *
+   * @param numberOfRows
+   * @param fileName
+   * @param offset
+   * @param blockletIndex
+   * @param blockletInfo
+   */
+  public BlockIndexInfo(long numberOfRows, String fileName, long offset,
+      BlockletIndex blockletIndex, BlockletInfo blockletInfo) {
+    this(numberOfRows, fileName, offset, blockletIndex);
+    this.blockletInfo = blockletInfo;
+  }
+
+  /**
    * @return the numberOfRows
    */
   public long getNumberOfRows() {
@@ -87,4 +107,11 @@ public class BlockIndexInfo {
   public BlockletIndex getBlockletIndex() {
     return blockletIndex;
   }
+
+  /**
+   * @return BlockletInfo
+   */
+  public BlockletInfo getBlockletInfo() {
+    return blockletInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ff54673..e0ee5bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -41,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -116,23 +119,40 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // so block will be loaded in sorted order this will be required for
     // query execution
     Collections.sort(queryModel.getTableBlockInfos());
-    // get the table blocks
-    CacheProvider cacheProvider = CacheProvider.getInstance();
-    BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
-        (BlockIndexStore) cacheProvider
-            .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
-    // remove the invalid table blocks, block which is deleted or compacted
-    cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
-        queryModel.getAbsoluteTableIdentifier());
-    List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
-        prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
-            queryModel.getAbsoluteTableIdentifier());
-    cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
-    queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
-    queryStatistic
-        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
-    queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
 
+    if (queryModel.getTableBlockInfos().get(0).getDetailInfo() != null) {
+      List<AbstractIndex> indexList = new ArrayList<>();
+      Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
+      for (TableBlockInfo blockInfo: queryModel.getTableBlockInfos()) {
+        List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
+        if (tableBlockInfos == null) {
+          tableBlockInfos = new ArrayList<>();
+          listMap.put(blockInfo.getFilePath(), tableBlockInfos);
+        }
+        tableBlockInfos.add(blockInfo);
+      }
+      for (List<TableBlockInfo> tableBlockInfos: listMap.values()) {
+        indexList.add(new IndexWrapper(tableBlockInfos));
+      }
+      queryProperties.dataBlocks = indexList;
+    } else {
+      // get the table blocks
+      CacheProvider cacheProvider = CacheProvider.getInstance();
+      BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
+          (BlockIndexStore) cacheProvider
+              .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
+      // remove the invalid table blocks, block which is deleted or compacted
+      cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
+          queryModel.getAbsoluteTableIdentifier());
+      List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
+          prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
+              queryModel.getAbsoluteTableIdentifier());
+      cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
+      queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
+      queryStatistic
+          .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
+      queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+    }
     // calculating the total number of aggeragted columns
     int aggTypeCount = queryModel.getQueryMeasures().size();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 8704496..a874835 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -156,7 +156,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
 
-    boolean isScanRequired =
+    boolean isScanRequired =  blockIndex >= blkMaxVal.length ||
         isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index 6823531..c2e077e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -287,7 +287,7 @@ public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl {
     BitSet bitSet = new BitSet(1);
     byte[][] filterValues = this.filterRangesValues;
     int columnIndex = this.dimColEvaluatorInfo.getColumnIndex();
-    boolean isScanRequired =
+    boolean isScanRequired = columnIndex >= blockMinValue.length ||
         isScanRequired(blockMinValue[columnIndex], blockMaxValue[columnIndex], filterValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index be82be7..73352cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -79,7 +79,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
-    boolean isScanRequired =
+    boolean isScanRequired =  dimensionBlocksIndex[0] >= blockMaxValue.length ||
         isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 53da6c5..6e8e188 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
-    boolean isScanRequired =
+    boolean isScanRequired =  dimensionBlocksIndex[0] >= blockMaxValue.length ||
         isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index d694960..d6f7c86 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
-    boolean isScanRequired =
+    boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
         isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index b3dd921..597ba52 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -82,7 +82,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
-    boolean isScanRequired =
+    boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
         isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index fdb5483..ff4f5dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -165,6 +165,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
         new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
             blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader);
     blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+    if (blocksChunkHolder.getDataBlock().getColumnsMaxValue() == null) {
+      return blocksChunkHolder;
+    }
     if (blockletScanner.isScanRequired(blocksChunkHolder)) {
       return blocksChunkHolder;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 92e9594..95030d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
 import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -127,20 +128,27 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
         // set the deleted row to block execution info
         blockInfo.setDeletedRecordsMap(deletedRowsMap);
       }
-      DataRefNode startDataBlock = finder
-          .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
-      while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
-        startDataBlock = startDataBlock.getNextDataRefNode();
-      }
-      long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
-      //if number of block is less than 0 then take end block.
-      if (numberOfBlockToScan <= 0) {
-        DataRefNode endDataBlock = finder
-            .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
-        numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+      DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode();
+      if (dataRefNode instanceof BlockletDataRefNodeWrapper) {
+        BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) dataRefNode;
+        blockInfo.setFirstDataBlock(wrapper);
+        blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes());
+
+      } else {
+        DataRefNode startDataBlock =
+            finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey());
+        while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
+          startDataBlock = startDataBlock.getNextDataRefNode();
+        }
+        long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
+        //if number of block is less than 0 then take end block.
+        if (numberOfBlockToScan <= 0) {
+          DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, blockInfo.getEndKey());
+          numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+        }
+        blockInfo.setFirstDataBlock(startDataBlock);
+        blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
       }
-      blockInfo.setFirstDataBlock(startDataBlock);
-      blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 97b1a1f..34c7709 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -122,6 +122,57 @@ public abstract class AbstractDataFileFooterConverter {
   }
 
   /**
+   * Below method will be used to get the index info from index file
+   *
+   * @param filePath           file path of the index file
+   * @return list of index info
+   * @throws IOException problem while reading the index file
+   */
+  public List<DataFileFooter> getIndexInfo(String filePath) throws IOException {
+    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+    List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+    String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
+    try {
+      // open the reader
+      indexReader.openThriftReader(filePath);
+      // get the index header
+      org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+      List<org.apache.carbondata.format.ColumnSchema> table_columns =
+          readIndexHeader.getTable_columns();
+      for (int i = 0; i < table_columns.size(); i++) {
+        columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+      }
+      // get the segment info
+      SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+      BlockletIndex blockletIndex = null;
+      DataFileFooter dataFileFooter = null;
+      // read the block info from file
+      while (indexReader.hasNext()) {
+        BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+        blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+        dataFileFooter = new DataFileFooter();
+        TableBlockInfo tableBlockInfo = new TableBlockInfo();
+        tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
+        tableBlockInfo.setVersion(
+            ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion()));
+        int blockletSize = getBlockletSize(readBlockIndexInfo);
+        tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
+        tableBlockInfo.setFilePath(parentPath + "/" + readBlockIndexInfo.file_name);
+        dataFileFooter.setBlockletIndex(blockletIndex);
+        dataFileFooter.setColumnInTable(columnSchemaList);
+        dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+        dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
+        dataFileFooter.setSegmentInfo(segmentInfo);
+        dataFileFooters.add(dataFileFooter);
+      }
+    } finally {
+      indexReader.closeThriftReader();
+    }
+    return dataFileFooters;
+  }
+
+  /**
    * the methods returns the number of blocklets in a block
    *
    * @param readBlockIndexInfo
@@ -148,6 +199,8 @@ public abstract class AbstractDataFileFooterConverter {
   public abstract DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
       throws IOException;
 
+  public abstract List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException;
+
   /**
    * Below method will be used to get blocklet index for data file meta
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 200d5ca..51296d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -51,10 +51,13 @@ import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -912,10 +915,26 @@ public final class CarbonUtil {
    * Below method will be used to read the data file matadata
    */
   public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) throws IOException {
-    AbstractDataFileFooterConverter fileFooterConverter =
-        DataFileFooterConverterFactory.getInstance()
-            .getDataFileFooterConverter(tableBlockInfo.getVersion());
-    return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+    BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo();
+    if (detailInfo == null) {
+      AbstractDataFileFooterConverter fileFooterConverter =
+          DataFileFooterConverterFactory.getInstance()
+              .getDataFileFooterConverter(tableBlockInfo.getVersion());
+      return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+    } else {
+      DataFileFooter fileFooter = new DataFileFooter();
+      fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp());
+      ColumnarFormatVersion version =
+          ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber());
+      AbstractDataFileFooterConverter dataFileFooterConverter =
+          DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
+      fileFooter.setColumnInTable(dataFileFooterConverter.getSchema(tableBlockInfo));
+      SegmentInfo segmentInfo = new SegmentInfo();
+      segmentInfo.setColumnCardinality(detailInfo.getDimLens());
+      segmentInfo.setNumberOfColumns(detailInfo.getRowCount());
+      fileFooter.setSegmentInfo(segmentInfo);
+      return fileFooter;
+    }
   }
 
   /**
@@ -1553,24 +1572,23 @@ public final class CarbonUtil {
   }
 
   /**
-   * @param tableInfo
    * @param invalidBlockVOForSegmentId
    * @param updateStatusMngr
    * @return
    */
-  public static boolean isInvalidTableBlock(TableBlockInfo tableInfo,
+  public static boolean isInvalidTableBlock(String segmentId, String filePath,
       UpdateVO invalidBlockVOForSegmentId, SegmentUpdateStatusManager updateStatusMngr) {
 
-    if (!updateStatusMngr.isBlockValid(tableInfo.getSegmentId(),
-        CarbonTablePath.getCarbonDataFileName(tableInfo.getFilePath()) + CarbonTablePath
+    if (!updateStatusMngr.isBlockValid(segmentId,
+        CarbonTablePath.getCarbonDataFileName(filePath) + CarbonTablePath
             .getCarbonDataExtension())) {
       return true;
     }
 
     if (null != invalidBlockVOForSegmentId) {
-      Long blockTimeStamp = Long.parseLong(tableInfo.getFilePath()
-          .substring(tableInfo.getFilePath().lastIndexOf('-') + 1,
-              tableInfo.getFilePath().lastIndexOf('.')));
+      Long blockTimeStamp = Long.parseLong(filePath
+          .substring(filePath.lastIndexOf('-') + 1,
+              filePath.lastIndexOf('.')));
       if ((blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp() && (
           invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp() != null
               && blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 0f82b95..3ac6987 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -121,4 +121,8 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
     blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
     return blockletInfo;
   }
+
+  @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index 4882b0f..8cd437f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -140,4 +140,7 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
     return numberOfDimensionColumns;
   }
 
+  @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 143c1b1..ccb8b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -85,6 +85,17 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
     return dataFileFooter;
   }
 
+  @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+    CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(tableBlockInfo.getFilePath());
+    FileHeader fileHeader = carbonHeaderReader.readHeader();
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+    List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+    for (int i = 0; i < table_columns.size(); i++) {
+      columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+    }
+    return columnSchemaList;
+  }
+
   /**
    * Below method is to convert the blocklet info of the thrift to wrapper
    * blocklet info

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index c055031..4df085a 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -41,4 +41,5 @@ struct BlockIndex{
   2: required string file_name; // Block file name
   3: required i64 offset; // Offset of the footer
   4: required carbondata.BlockletIndex block_index;	// Blocklet index
+  5: optional carbondata.BlockletInfo3 blocklet_info;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1e69648..d03ae3a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -19,7 +19,14 @@ package org.apache.carbondata.hadoop;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
@@ -367,8 +374,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         if (isIUDTable) {
           // In case IUD is not performed in this table avoid searching for
           // invalidated blocks.
-          if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
-              updateStatusManager)) {
+          if (CarbonUtil
+              .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+                  invalidBlockVOForSegmentId, updateStatusManager)) {
             continue;
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 631bc2c..56bade7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -77,6 +78,8 @@ public class CarbonInputSplit extends FileSplit
    */
   private String[] deleteDeltaFiles;
 
+  private BlockletDetailInfo detailInfo;
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -138,10 +141,12 @@ public class CarbonInputSplit extends FileSplit
       BlockletInfos blockletInfos =
           new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
       try {
-        tableBlockInfoList.add(
+        TableBlockInfo blockInfo =
             new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
                 split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
-                split.getDeleteDeltaFiles()));
+                split.getDeleteDeltaFiles());
+        blockInfo.setDetailInfo(split.getDetailInfo());
+        tableBlockInfoList.add(blockInfo);
       } catch (IOException e) {
         throw new RuntimeException("fail to get location of split: " + split, e);
       }
@@ -153,9 +158,12 @@ public class CarbonInputSplit extends FileSplit
     BlockletInfos blockletInfos =
         new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
     try {
-      return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
-          inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
-          blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+      TableBlockInfo blockInfo =
+          new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
+              inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
+              blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+      blockInfo.setDetailInfo(inputSplit.getDetailInfo());
+      return blockInfo;
     } catch (IOException e) {
       throw new RuntimeException("fail to get location of split: " + inputSplit, e);
     }
@@ -180,6 +188,11 @@ public class CarbonInputSplit extends FileSplit
     for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
       deleteDeltaFiles[i] = in.readUTF();
     }
+    boolean detailInfoExists = in.readBoolean();
+    if (detailInfoExists) {
+      detailInfo = new BlockletDetailInfo();
+      detailInfo.readFields(in);
+    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -197,6 +210,10 @@ public class CarbonInputSplit extends FileSplit
         out.writeUTF(deleteDeltaFiles[i]);
       }
     }
+    out.writeBoolean(detailInfo != null);
+    if (detailInfo != null) {
+      detailInfo.write(out);
+    }
   }
 
   public List<String> getInvalidSegments() {
@@ -310,4 +327,16 @@ public class CarbonInputSplit extends FileSplit
   public String[] getDeleteDeltaFiles() {
     return deleteDeltaFiles;
   }
+
+  public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
+    this.deleteDeltaFiles = deleteDeltaFiles;
+  }
+
+  public BlockletDetailInfo getDetailInfo() {
+    return detailInfo;
+  }
+
+  public void setDetailInfo(BlockletDetailInfo detailInfo) {
+    this.detailInfo = detailInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ae9c676..e73c04a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -18,152 +18,556 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.DataMapStoreManager;
+import org.apache.carbondata.core.indexstore.DataMapType;
+import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.mutate.data.BlockMappingVO;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.internal.CarbonInputSplit;
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManager;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManagerFactory;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Input format of CarbonData file.
+ *
  * @param <T>
  */
 public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
 
+  // comma separated list of input segment numbers
+  public static final String INPUT_SEGMENT_NUMBERS =
+      "mapreduce.input.carboninputformat.segmentnumbers";
+  // comma separated list of input files
+  public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+  private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
   private static final String FILTER_PREDICATE =
       "mapreduce.input.carboninputformat.filter.predicate";
+  private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+  private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+  private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
 
-  private SegmentManager segmentManager;
+  /**
+   * It is optional, if user does not set then it reads from store
+   *
+   * @param configuration
+   * @param carbonTable
+   * @throws IOException
+   */
+  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+      throws IOException {
+    if (null != carbonTable) {
+      configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+    }
+  }
 
-  public CarbonTableInputFormat() {
-    this.segmentManager = SegmentManagerFactory.getGlobalSegmentManager();
+  public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
+    String carbonTableStr = configuration.get(CARBON_TABLE);
+    if (carbonTableStr == null) {
+      populateCarbonTable(configuration);
+      // read it from schema file in the store
+      carbonTableStr = configuration.get(CARBON_TABLE);
+      return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+    }
+    return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
   }
 
-  @Override
-  public RecordReader<Void, T> createRecordReader(InputSplit split,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    switch (((CarbonInputSplit)split).formatType()) {
-      case COLUMNAR:
-        // TODO: create record reader for columnar format
-        break;
-      default:
-        throw new RuntimeException("Unsupported format type");
+  /**
+   * this method will read the schema from the physical file and populate into CARBON_TABLE
+   *
+   * @param configuration
+   * @throws IOException
+   */
+  private static void populateCarbonTable(Configuration configuration) throws IOException {
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
     }
-    return null;
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+    // read the schema file to get the absoluteTableIdentifier having the correct table id
+    // persisted in the schema
+    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+    setCarbonTable(configuration, carbonTable);
   }
 
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
+  public static void setTablePath(Configuration configuration, String tablePath)
+      throws IOException {
+    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+  }
 
-    // work as following steps:
-    // get all current valid segment
-    // for each segment, get all input split
+  /**
+   * It sets unresolved filter expression.
+   *
+   * @param configuration
+   * @param filterExpression
+   */
+  public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+    if (filterExpression == null) {
+      return;
+    }
+    try {
+      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+      configuration.set(FILTER_PREDICATE, filterString);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting filter expression to Job", e);
+    }
+  }
 
-    List<InputSplit> output = new LinkedList<>();
-    Expression filter = getFilter(job.getConfiguration());
-    Segment[] segments = segmentManager.getAllValidSegments();
-    FilterResolverIntf filterResolver = CarbonInputFormatUtil.resolveFilter(filter, null);
-    for (Segment segment: segments) {
-      List<InputSplit> splits = segment.getSplits(job, filterResolver);
-      output.addAll(splits);
+  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+    if (projection == null || projection.isEmpty()) {
+      return;
+    }
+    String[] allColumns = projection.getAllColumns();
+    StringBuilder builder = new StringBuilder();
+    for (String column : allColumns) {
+      builder.append(column).append(",");
     }
-    return output;
+    String columnString = builder.toString();
+    columnString = columnString.substring(0, columnString.length() - 1);
+    configuration.set(COLUMN_PROJECTION, columnString);
   }
 
-  /**
-   * set the table path into configuration
-   * @param conf configuration of the job
-   * @param tablePath table path string
-   */
-  public void setTablePath(Configuration conf, String tablePath) {
+  public static String getColumnProjection(Configuration configuration) {
+    return configuration.get(COLUMN_PROJECTION);
+  }
+
+  public static void setCarbonReadSupport(Configuration configuration,
+      Class<? extends CarbonReadSupport> readSupportClass) {
+    if (readSupportClass != null) {
+      configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
+    }
+  }
 
+  private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
+    return CarbonStorePath.getCarbonTablePath(absIdentifier);
   }
 
   /**
-   * return the table path in the configuration
-   * @param conf configuration of the job
-   * @return table path string
+   * Set list of segments to access
    */
-  public String getTablePath(Configuration conf) {
-    return null;
+  public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
+    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
   }
 
   /**
-   * set projection columns into configuration
-   * @param conf configuration of the job
-   * @param projection projection
+   * Set list of files to access
    */
-  public void setProjection(Configuration conf, CarbonProjection projection) {
+  public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
+    configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+  }
 
+  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+      throws IOException {
+    return getCarbonTable(configuration).getAbsoluteTableIdentifier();
   }
 
   /**
-   * return the projection in the configuration
-   * @param conf configuration of the job
-   * @return projection
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR
+   * are used to get table path to read.
+   *
+   * @param job
+   * @return List<InputSplit> list of CarbonInputSplit
+   * @throws IOException
    */
-  public CarbonProjection getProjection(Configuration conf) {
-    return null;
+  @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
+    AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+    TableDataMap blockletMap =
+        DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+    List<String> invalidSegments = new ArrayList<>();
+    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+    List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
+    // get all valid segments and set them into the configuration
+    if (validSegments.size() == 0) {
+      SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+      SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+          segmentStatusManager.getValidAndInvalidSegments();
+      SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+      validSegments = segments.getValidSegments();
+      if (validSegments.size() == 0) {
+        return new ArrayList<>(0);
+      }
+
+      // remove entry in the segment index if there are invalid segments
+      invalidSegments.addAll(segments.getInvalidSegments());
+      for (String invalidSegmentId : invalidSegments) {
+        invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+      }
+      if (invalidSegments.size() > 0) {
+        List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+            new ArrayList<>(invalidSegments.size());
+        for (String segId : invalidSegments) {
+          invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
+        }
+        blockletMap.clear(invalidSegments);
+      }
+    }
+
+    // process and resolve the expression
+    Expression filter = getFilterPredicates(job.getConfiguration());
+    CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+    // this will be null in case of corrupt schema file.
+    if (null == carbonTable) {
+      throw new IOException("Missing/Corrupt schema file for table.");
+    }
+
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+
+    // prune partitions for filter query on partition table
+    BitSet matchedPartitions = null;
+    if (null != filter) {
+      PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+      if (null != partitionInfo) {
+        Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
+        matchedPartitions = new FilterExpressionProcessor()
+            .getFilteredPartitions(filter, partitionInfo, partitioner);
+        if (matchedPartitions.cardinality() == 0) {
+          // no partition is required
+          return new ArrayList<InputSplit>();
+        }
+        if (matchedPartitions.cardinality() == partitioner.numPartitions()) {
+          // all partitions are required, no need to prune partitions
+          matchedPartitions = null;
+        }
+      }
+    }
+
+    FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+
+    // do block filtering and get split
+    List<InputSplit> splits = getSplits(job, filterInterface, validSegments, matchedPartitions);
+    // pass the invalid segment to task side in order to remove index entry in task side
+    if (invalidSegments.size() > 0) {
+      for (InputSplit split : splits) {
+        ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+        ((org.apache.carbondata.hadoop.CarbonInputSplit) split)
+            .setInvalidTimestampRange(invalidTimestampsList);
+      }
+    }
+    return splits;
   }
 
   /**
-   * set filter expression into the configuration
-   * @param conf configuration of the job
-   * @param filter filter expression
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS
+   * are used to get table path to read.
+   *
+   * @return
+   * @throws IOException
    */
-  public void setFilter(Configuration conf, Expression filter) {
+  private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+      List<String> validSegments, BitSet matchedPartitions) throws IOException {
+
+    List<InputSplit> result = new LinkedList<InputSplit>();
+    UpdateVO invalidBlockVOForSegmentId = null;
+    Boolean isIUDTable = false;
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+    isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+    //for each segment fetch blocks matching filter in Driver BTree
+    List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
+        getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+            validSegments);
+    for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+      // Get the UpdateVO for those tables on which IUD operations being performed.
+      if (isIUDTable) {
+        invalidBlockVOForSegmentId =
+            updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+      }
+      if (isIUDTable) {
+        // In case IUD is not performed in this table avoid searching for
+        // invalidated blocks.
+        if (CarbonUtil
+            .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+                invalidBlockVOForSegmentId, updateStatusManager)) {
+          continue;
+        }
+      }
+      String[] deleteDeltaFilePath = null;
+      try {
+        deleteDeltaFilePath =
+            updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+      result.add(inputSplit);
+    }
+    return result;
+  }
+
+  protected Expression getFilterPredicates(Configuration configuration) {
     try {
-      String filterString = ObjectSerializationUtil.convertObjectToString(filter);
-      conf.set(FILTER_PREDICATE, filterString);
-    } catch (Exception e) {
-      throw new RuntimeException("Error while setting filter expression to Job", e);
+      String filterExprString = configuration.get(FILTER_PREDICATE);
+      if (filterExprString == null) {
+        return null;
+      }
+      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+      return (Expression) filter;
+    } catch (IOException e) {
+      throw new RuntimeException("Error while reading filter expression", e);
     }
   }
 
   /**
-   * return filter expression in the configuration
-   * @param conf configuration of the job
-   * @return filter expression
+   * get data blocks of given segment
    */
-  public Expression getFilter(Configuration conf) {
-    Object filter;
-    String filterExprString = conf.get(FILTER_PREDICATE);
-    if (filterExprString == null) {
-      return null;
+  private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+      BitSet matchedPartitions, List<String> segmentIds) throws IOException {
+
+    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+    QueryStatistic statistic = new QueryStatistic();
+
+    // get tokens for all the required FileSystem for table path
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+
+    TableDataMap blockletMap = DataMapStoreManager.getInstance()
+        .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+    List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
+
+    List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+    for (Blocklet blocklet : prunedBlocklets) {
+      int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+          CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
+
+      // matchedPartitions variable will be null in two cases as follows
+      // 1. the table is not a partition table
+      // 2. the table is a partition table, and all partitions are matched by query
+      // for partition table, the task id of carbaondata file name is the partition id.
+      // if this partition is not required, here will skip it.
+      if (matchedPartitions == null || matchedPartitions.get(taskId)) {
+        resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet));
+      }
     }
+    statistic
+        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+    return resultFilterredBlocks;
+  }
+
+  private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet blocklet)
+      throws IOException {
+    blocklet.updateLocations();
+    org.apache.carbondata.hadoop.CarbonInputSplit split =
+        org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
+            new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()),
+            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
+    split.setDetailInfo(blocklet.getDetailInfo());
+    return split;
+  }
+
+  @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
+    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+    return new CarbonRecordReader<T>(queryModel, readSupport);
+  }
+
+  public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    CarbonTable carbonTable = getCarbonTable(configuration);
+    // getting the table absoluteTableIdentifier from the carbonTable
+    // to avoid unnecessary deserialization
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+
+    // query plan includes projection column
+    String projection = getColumnProjection(configuration);
+    CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
+    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+
+    // set the filter to the query model in order to filter blocklet before scan
+    Expression filter = getFilterPredicates(configuration);
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+
+    // update the file level index store if there are invalid segment
+    if (inputSplit instanceof CarbonMultiBlockSplit) {
+      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+      if (invalidSegments.size() > 0) {
+        queryModel.setInvalidSegmentIds(invalidSegments);
+      }
+      List<UpdateVO> invalidTimestampRangeList =
+          split.getAllSplits().get(0).getInvalidTimestampRange();
+      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+        queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+      }
+    }
+    return queryModel;
+  }
+
+  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+    //By default it uses dictionary decoder read class
+    CarbonReadSupport<T> readSupport = null;
+    if (readSupportClass != null) {
+      try {
+        Class<?> myClass = Class.forName(readSupportClass);
+        Constructor<?> constructor = myClass.getConstructors()[0];
+        Object object = constructor.newInstance();
+        if (object instanceof CarbonReadSupport) {
+          readSupport = (CarbonReadSupport) object;
+        }
+      } catch (ClassNotFoundException ex) {
+        LOG.error("Class " + readSupportClass + "not found", ex);
+      } catch (Exception ex) {
+        LOG.error("Error while creating " + readSupportClass, ex);
+      }
+    } else {
+      readSupport = new DictionaryDecodeReadSupport<>();
+    }
+    return readSupport;
+  }
+
+  @Override protected boolean isSplitable(JobContext context, Path filename) {
     try {
-      filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
-    } catch (IOException e) {
-      throw new RuntimeException("Error while reading filter expression", e);
+      // Don't split the file if it is local file system
+      FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+      if (fileSystem instanceof LocalFileSystem) {
+        return false;
+      }
+    } catch (Exception e) {
+      return true;
+    }
+    return true;
+  }
+
+  /**
+   * required to be moved to core
+   *
+   * @return updateExtension
+   */
+  private String getUpdateExtension() {
+    // TODO: required to modify when supporting update, mostly will be update timestamp
+    return "update";
+  }
+
+  /**
+   * return valid segment to access
+   */
+  private String[] getSegmentsToAccess(JobContext job) {
+    String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+    if (segmentString.trim().isEmpty()) {
+      return new String[0];
     }
-    assert (filter instanceof Expression);
-    return (Expression) filter;
+    return segmentString.split(",");
   }
 
   /**
-   * Optional API. It can be used by query optimizer to select index based on filter
-   * in the configuration of the job. After selecting index internally, index' name will be set
-   * in the configuration.
+   * Get the row count of the Block and mapping of segment and Block count.
    *
-   * The process of selection is simple, just use the default index. Subclass can provide a more
-   * advanced selection logic like cost based.
-   * @param conf job configuration
+   * @param job
+   * @param identifier
+   * @return
+   * @throws IOException
+   * @throws KeyGenException
    */
-  public void selectIndex(Configuration conf) {
-    // set the default index in configuration
+  public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
+      throws IOException, KeyGenException {
+    TableDataMap blockletMap =
+        DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
+        new SegmentStatusManager(identifier).getValidAndInvalidSegments();
+    Map<String, Long> blockRowCountMapping =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    Map<String, Long> segmentAndBlockCountMapping =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(), null);
+    for (Blocklet blocklet : blocklets) {
+      String blockName = blocklet.getPath().toString();
+      blockName = CarbonTablePath.getCarbonDataFileName(blockName);
+      blockName = blockName + CarbonTablePath.getCarbonDataExtension();
+
+      long rowCount = blocklet.getDetailInfo().getRowCount();
+
+      String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName);
+
+      // if block is invalid then dont add the count
+      SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
+
+      if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+        Long blockCount = blockRowCountMapping.get(key);
+        if (blockCount == null) {
+          blockCount = 0L;
+          Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId());
+          if (count == null) {
+            count = 0L;
+          }
+          segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1);
+        }
+        blockCount += rowCount;
+        blockRowCountMapping.put(key, blockCount);
+      }
+    }
+    return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 8270304..8269757 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryDimension;
 import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.hadoop.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -77,9 +77,10 @@ public class CarbonInputFormatUtil {
     return plan;
   }
 
-  public static <V> CarbonInputFormat<V> createCarbonInputFormat(AbsoluteTableIdentifier identifier,
+  public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
+      AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
-    CarbonInputFormat<V> carbonInputFormat = new CarbonInputFormat<>();
+    CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
     return carbonInputFormat;
   }


[3/3] carbondata git commit: [CARBONDATA-1232] Datamap implementation for Blocklet

Posted by ja...@apache.org.
[CARBONDATA-1232] Datamap implementation for Blocklet

This closes #1099


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6d71d9c4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6d71d9c4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6d71d9c4

Branch: refs/heads/datamap
Commit: 6d71d9c474e50792fc0fba3a321c2de927b05c84
Parents: 3ecb3ec
Author: ravipesala <ra...@gmail.com>
Authored: Sat Jun 17 22:53:57 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Tue Jul 11 13:17:43 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/cache/CacheProvider.java    |   3 +
 .../apache/carbondata/core/cache/CacheType.java |   6 +
 .../core/datastore/block/TableBlockInfo.java    |  19 +
 .../core/datastore/block/TaskBlockInfo.java     |   4 +
 .../carbondata/core/indexstore/Blocklet.java    |  55 +-
 .../indexstore/BlockletDataMapIndexStore.java   | 180 ++++++
 .../core/indexstore/BlockletDetailInfo.java     | 117 ++++
 .../carbondata/core/indexstore/DataMap.java     |   8 +-
 .../core/indexstore/DataMapFactory.java         |  87 +++
 .../core/indexstore/DataMapStoreManager.java    |  90 ++-
 .../carbondata/core/indexstore/DataMapType.java |  14 +-
 .../TableBlockIndexUniqueIdentifier.java        | 103 ++++
 .../core/indexstore/TableDataMap.java           |  97 +++-
 .../core/indexstore/UnsafeMemoryDMStore.java    | 207 +++++++
 .../blockletindex/BlockletDMComparator.java     | 134 +++++
 .../blockletindex/BlockletDataMap.java          | 445 +++++++++++++++
 .../blockletindex/BlockletDataMapFactory.java   | 115 ++++
 .../BlockletDataRefNodeWrapper.java             | 137 +++++
 .../indexstore/blockletindex/IndexWrapper.java  |  49 ++
 .../core/indexstore/row/DataMapRow.java         |  89 +++
 .../core/indexstore/row/DataMapRowImpl.java     | 106 ++++
 .../core/indexstore/row/UnsafeDataMapRow.java   | 133 +++++
 .../core/indexstore/schema/DataMapSchema.java   | 124 ++++
 .../core/indexstore/schema/FilterType.java      |  24 +
 .../core/metadata/blocklet/BlockletInfo.java    |  53 +-
 .../core/metadata/index/BlockIndexInfo.java     |  27 +
 .../executor/impl/AbstractQueryExecutor.java    |  52 +-
 .../executer/IncludeFilterExecuterImpl.java     |   2 +-
 .../executer/RangeValueFilterExecuterImpl.java  |   2 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |   2 +-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |   2 +-
 ...velRangeLessThanEqualFilterExecuterImpl.java |   2 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |   2 +-
 .../processor/AbstractDataBlockIterator.java    |   3 +
 .../AbstractDetailQueryResultIterator.java      |  34 +-
 .../util/AbstractDataFileFooterConverter.java   |  53 ++
 .../apache/carbondata/core/util/CarbonUtil.java |  40 +-
 .../core/util/DataFileFooterConverter.java      |   4 +
 .../core/util/DataFileFooterConverter2.java     |   3 +
 .../core/util/DataFileFooterConverterV3.java    |  11 +
 format/src/main/thrift/carbondata_index.thrift  |   1 +
 .../carbondata/hadoop/CarbonInputFormat.java    |  14 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  39 +-
 .../hadoop/api/CarbonTableInputFormat.java      | 562 ++++++++++++++++---
 .../hadoop/util/CarbonInputFormatUtil.java      |   7 +-
 .../presto/impl/CarbonTableReader.java          |  56 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   5 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   9 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  20 +-
 .../carbondata/spark/util/QueryPlanUtil.scala   |  10 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  13 +-
 .../sql/execution/command/IUDCommands.scala     |   7 -
 .../carbondata/spark/util/QueryPlanUtil.scala   |  10 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |  10 +-
 .../processing/merger/CarbonCompactionUtil.java |  32 ++
 55 files changed, 3172 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index 25a8976..5c4b265 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.BlockIndexStore;
 import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexStore;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
@@ -126,6 +127,8 @@ public class CacheProvider {
     } else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
       cacheObject =
           new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache);
+    } else if (cacheType.equals(cacheType.DRIVER_BLOCKLET_DATAMAP)) {
+      cacheObject = new BlockletDataMapIndexStore(carbonStorePath, carbonLRUCache);
     }
     cacheTypeToCacheMap.put(cacheType, cacheObject);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
index 2d6570d..ab51ff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
@@ -56,6 +56,12 @@ public class CacheType<K, V> {
       DRIVER_BTREE = new CacheType("driver_btree");
 
   /**
+   * Executor BTree cache which maintains size of BTree metadata
+   */
+  public static final CacheType<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+      DRIVER_BLOCKLET_DATAMAP = new CacheType("driver_blocklet_datamap");
+
+  /**
    * cacheName which is unique name for a cache
    */
   private String cacheName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 44347cf..f003882 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -77,6 +78,8 @@ public class TableBlockInfo implements Distributable, Serializable {
    */
   private String[] deletedDeltaFilePath;
 
+  private BlockletDetailInfo detailInfo;
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
       long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
@@ -88,6 +91,10 @@ public class TableBlockInfo implements Distributable, Serializable {
     this.deletedDeltaFilePath = deletedDeltaFilePath;
   }
 
+  public TableBlockInfo() {
+
+  }
+
   /**
    * constructor to initialize the TbaleBlockInfo with BlockletInfos
    *
@@ -319,4 +326,16 @@ public class TableBlockInfo implements Distributable, Serializable {
   public String[] getDeletedDeltaFilePath() {
     return deletedDeltaFilePath;
   }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public BlockletDetailInfo getDetailInfo() {
+    return detailInfo;
+  }
+
+  public void setDetailInfo(BlockletDetailInfo detailInfo) {
+    this.detailInfo = detailInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
index eb707c2..4fcec87 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.block;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +46,9 @@ public class TaskBlockInfo {
     return taskBlockInfoMapping.keySet();
   }
 
+  public Collection<List<TableBlockInfo>> getAllTableBlockInfoList() {
+    return taskBlockInfoMapping.values();
+  }
 
   /**
    * returns TableBlockInfoList of given task

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 597c46c..66da4d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -16,27 +16,76 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
 /**
  * Blocklet
  */
 public class Blocklet implements Serializable {
 
-  private String path;
+  private Path path;
+
+  private String segmentId;
 
   private String blockletId;
 
+  private BlockletDetailInfo detailInfo;
+
+  private long length;
+
+  private String[] location;
+
   public Blocklet(String path, String blockletId) {
-    this.path = path;
+    this.path = new Path(path);
     this.blockletId = blockletId;
   }
 
-  public String getPath() {
+  public Path getPath() {
     return path;
   }
 
   public String getBlockletId() {
     return blockletId;
   }
+
+  public BlockletDetailInfo getDetailInfo() {
+    return detailInfo;
+  }
+
+  public void setDetailInfo(BlockletDetailInfo detailInfo) {
+    this.detailInfo = detailInfo;
+  }
+
+  public void updateLocations() throws IOException {
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+    LocatedFileStatus fileStatus = iter.next();
+    location = fileStatus.getBlockLocations()[0].getHosts();
+    length = fileStatus.getLen();
+  }
+
+  public String[] getLocations() throws IOException {
+    return location;
+  }
+
+  public long getLength() throws IOException {
+    return length;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
new file mode 100644
index 0000000..fc8c273
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+
+/**
+ * Class to handle loading, unloading,clearing,storing of the table
+ * blocks
+ */
+public class BlockletDataMapIndexStore
+    implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
+  /**
+   * carbon store path
+   */
+  protected String carbonStorePath;
+  /**
+   * CarbonLRU cache
+   */
+  protected CarbonLRUCache lruCache;
+
+  /**
+   * map of block info to lock object map, while loading the btree this will be filled
+   * and removed after loading the tree for that particular block info, this will be useful
+   * while loading the tree concurrently so only block level lock will be applied another
+   * block can be loaded concurrently
+   */
+  private Map<String, Object> segmentLockMap;
+
+  /**
+   * constructor to initialize the SegmentTaskIndexStore
+   *
+   * @param carbonStorePath
+   * @param lruCache
+   */
+  public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
+    this.carbonStorePath = carbonStorePath;
+    this.lruCache = lruCache;
+    segmentLockMap = new ConcurrentHashMap<String, Object>();
+  }
+
+  @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier)
+      throws IOException {
+    String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+    BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
+    if (dataMap == null) {
+      try {
+        dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier);
+      } catch (IndexBuilderException e) {
+        throw new IOException(e.getMessage(), e);
+      } catch (Throwable e) {
+        throw new IOException("Problem in loading segment block.", e);
+      }
+    }
+    return dataMap;
+  }
+
+  @Override public List<BlockletDataMap> getAll(
+      List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
+    List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+    try {
+      for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
+        blockletDataMaps.add(get(identifier));
+      }
+    } catch (Throwable e) {
+      for (BlockletDataMap dataMap : blockletDataMaps) {
+        dataMap.clear();
+      }
+      throw new IOException("Problem in loading segment blocks.", e);
+    }
+    return blockletDataMaps;
+  }
+
+  /**
+   * returns the SegmentTaskIndexWrapper
+   *
+   * @param tableSegmentUniqueIdentifier
+   * @return
+   */
+  @Override public BlockletDataMap getIfPresent(
+      TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+    BlockletDataMap dataMap = (BlockletDataMap) lruCache
+        .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+    return dataMap;
+  }
+
+  /**
+   * method invalidate the segment cache for segment
+   *
+   * @param tableSegmentUniqueIdentifier
+   */
+  @Override public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
+    lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+  }
+
+  /**
+   * Below method will be used to load the segment of segments
+   * One segment may have multiple task , so  table segment will be loaded
+   * based on task id and will return the map of taksId to table segment
+   * map
+   *
+   * @return map of taks id to segment mapping
+   * @throws IOException
+   */
+  private BlockletDataMap loadAndGetDataMap(
+      TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
+    String uniqueTableSegmentIdentifier =
+        tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+    Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+    if (lock == null) {
+      lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+    }
+    BlockletDataMap dataMap = null;
+    synchronized (lock) {
+      dataMap = new BlockletDataMap();
+      dataMap.init(tableSegmentUniqueIdentifier.getFilePath());
+      lruCache.put(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(), dataMap,
+          dataMap.getMemorySize());
+    }
+    return dataMap;
+  }
+
+  /**
+   * Below method will be used to get the segment level lock object
+   *
+   * @param uniqueIdentifier
+   * @return lock object
+   */
+  private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) {
+    // get the segment lock object if it is present then return
+    // otherwise add the new lock and return
+    Object segmentLoderLockObject = segmentLockMap.get(uniqueIdentifier);
+    if (null == segmentLoderLockObject) {
+      segmentLoderLockObject = new Object();
+      segmentLockMap.put(uniqueIdentifier, segmentLoderLockObject);
+    }
+    return segmentLoderLockObject;
+  }
+
+  /**
+   * The method clears the access count of table segments
+   *
+   * @param tableSegmentUniqueIdentifiers
+   */
+  @Override public void clearAccessCount(
+      List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) {
+    for (TableBlockIndexUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) {
+      BlockletDataMap cacheable =
+          (BlockletDataMap) lruCache.get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+      cacheable.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
new file mode 100644
index 0000000..68dedd8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Blocklet detail information to be sent to each executor
+ */
+public class BlockletDetailInfo implements Serializable, Writable {
+
+  private int rowCount;
+
+  private short pagesCount;
+
+  private short versionNumber;
+
+  private int[] dimLens;
+
+  private long schemaUpdatedTimeStamp;
+
+  private BlockletInfo blockletInfo;
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public void setRowCount(int rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  public int getPagesCount() {
+    return pagesCount;
+  }
+
+  public void setPagesCount(short pagesCount) {
+    this.pagesCount = pagesCount;
+  }
+
+  public short getVersionNumber() {
+    return versionNumber;
+  }
+
+  public void setVersionNumber(short versionNumber) {
+    this.versionNumber = versionNumber;
+  }
+
+  public BlockletInfo getBlockletInfo() {
+    return blockletInfo;
+  }
+
+  public void setBlockletInfo(BlockletInfo blockletInfo) {
+    this.blockletInfo = blockletInfo;
+  }
+
+  public int[] getDimLens() {
+    return dimLens;
+  }
+
+  public void setDimLens(int[] dimLens) {
+    this.dimLens = dimLens;
+  }
+
+  public long getSchemaUpdatedTimeStamp() {
+    return schemaUpdatedTimeStamp;
+  }
+
+  public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
+    this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeInt(rowCount);
+    out.writeShort(pagesCount);
+    out.writeShort(versionNumber);
+    out.writeShort(dimLens.length);
+    for (int i = 0; i < dimLens.length; i++) {
+      out.writeInt(dimLens[i]);
+    }
+    out.writeLong(schemaUpdatedTimeStamp);
+    blockletInfo.write(out);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    rowCount = in.readInt();
+    pagesCount = in.readShort();
+    versionNumber = in.readShort();
+    dimLens = new int[in.readShort()];
+    for (int i = 0; i < dimLens.length; i++) {
+      dimLens[i] = in.readInt();
+    }
+    schemaUpdatedTimeStamp = in.readLong();
+    blockletInfo = new BlockletInfo();
+    blockletInfo.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
index 2651f15..1276494 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java
@@ -21,7 +21,7 @@ import java.util.List;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
- * Interface for adding and retrieving index data.
+ * Datamap is an entity which can store and retrieve index data.
  */
 public interface DataMap {
 
@@ -47,6 +47,12 @@ public interface DataMap {
   List<Blocklet> prune(FilterResolverIntf filterExp);
 
   /**
+   * Convert datamap to distributable object
+   * @return
+   */
+  DataMapDistributable toDistributable();
+
+  /**
    * Clear complete index table and release memory.
    */
   void clear();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
new file mode 100644
index 0000000..72f714f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.util.List;
+
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Interface for datamap factory, it is responsible for creating the datamap.
+ */
+public interface DataMapFactory {
+
+  /**
+   * Initialization of Datamap factory
+   * @param identifier
+   * @param dataMapName
+   */
+  void init(AbsoluteTableIdentifier identifier, String dataMapName);
+  /**
+   * Get the datamap writer for each segmentid.
+   *
+   * @param identifier
+   * @param segmentId
+   * @return
+   */
+  DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier,
+      String segmentId);
+
+  /**
+   * Get the datamap for segmentid
+   *
+   * @param segmentId
+   * @return
+   */
+  List<DataMap> getDataMaps(String segmentId);
+
+  /**
+   * Get datamap for distributable object.
+   *
+   * @param distributable
+   * @return
+   */
+  DataMap getDataMap(DataMapDistributable distributable);
+
+  /**
+   * This method checks whether the columns and the type of filters supported
+   * for this datamap or not
+   *
+   * @param filterType
+   * @return
+   */
+  boolean isFiltersSupported(FilterType filterType);
+
+  /**
+   *
+   * @param event
+   */
+  void fireEvent(ChangeEvent event);
+
+  /**
+   * Clears datamap of the segment
+   */
+  void clear(String segmentId);
+
+  /**
+   * Clear all datamaps from memory
+   */
+  void clear();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
index 06638ad..1a36187 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java
@@ -16,7 +16,9 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -24,13 +26,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 
 /**
- * It maintains all the index tables in it.
+ * It maintains all the DataMaps in it.
  */
-public class DataMapStoreManager {
+public final class DataMapStoreManager {
 
   private static DataMapStoreManager instance = new DataMapStoreManager();
 
-  private Map<DataMapType, Map<String, TableDataMap>> dataMapMappping = new HashMap<>();
+  /**
+   * Contains the list of datamaps for each table.
+   */
+  private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>();
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
@@ -48,56 +53,85 @@ public class DataMapStoreManager {
    */
   public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
       DataMapType mapType) {
-    Map<String, TableDataMap> map = dataMapMappping.get(mapType);
-    TableDataMap dataMap = null;
-    if (map == null) {
+    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+    TableDataMap dataMap;
+    if (tableDataMaps == null) {
+      createTableDataMap(identifier, mapType, dataMapName);
+      tableDataMaps = dataMapMappping.get(identifier);
+    }
+    dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+    if (dataMap == null) {
       throw new RuntimeException("Datamap does not exist");
-    } else {
-      dataMap = map.get(dataMapName);
-      if (dataMap == null) {
-        throw new RuntimeException("Datamap does not exist");
-      }
     }
-    // Initialize datamap
-    dataMap.init(identifier, dataMapName);
     return dataMap;
   }
 
   /**
-   * Create new datamap instance using datamap type and path
+   * Create new datamap instance using datamap name, datamap type and table identifier
    *
    * @param mapType
    * @return
    */
-  public TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, DataMapType mapType,
-      String dataMapName) {
-    Map<String, TableDataMap> map = dataMapMappping.get(mapType);
-    if (map == null) {
-      map = new HashMap<>();
-      dataMapMappping.put(mapType, map);
+  private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier,
+      DataMapType mapType, String dataMapName) {
+    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+    if (tableDataMaps == null) {
+      tableDataMaps = new ArrayList<>();
+      dataMapMappping.put(identifier, tableDataMaps);
     }
-    TableDataMap dataMap = map.get(dataMapName);
+    TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
     if (dataMap != null) {
       throw new RuntimeException("Already datamap exists in that path with type " + mapType);
     }
 
     try {
-      //TODO create datamap using @mapType.getClassName())
+      DataMapFactory dataMapFactory = mapType.getClassObject().newInstance();
+      dataMapFactory.init(identifier, dataMapName);
+      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
     } catch (Exception e) {
       LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+    tableDataMaps.add(dataMap);
+    return dataMap;
+  }
+
+  private TableDataMap getAbstractTableDataMap(String dataMapName,
+      List<TableDataMap> tableDataMaps) {
+    TableDataMap dataMap = null;
+    for (TableDataMap tableDataMap: tableDataMaps) {
+      if (tableDataMap.getDataMapName().equals(dataMapName)) {
+        dataMap = tableDataMap;
+        break;
+      }
     }
-    dataMap.init(identifier, dataMapName);
-    map.put(dataMapName, dataMap);
     return dataMap;
   }
 
-  public void clearDataMap(String dataMapName, DataMapType mapType) {
-    Map<String, TableDataMap> map = dataMapMappping.get(mapType);
-    if (map != null && map.get(dataMapName) != null) {
-      map.remove(dataMapName).clear();
+  /**
+   * Clear the datamap/datamaps of a mentioned datamap name and table from memory
+   * @param identifier
+   * @param dataMapName
+   */
+  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+    List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier);
+    if (tableDataMaps != null) {
+      int i = 0;
+      for (TableDataMap tableDataMap: tableDataMaps) {
+        if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+          tableDataMap.clear(new ArrayList<String>());
+          tableDataMaps.remove(i);
+          break;
+        }
+        i++;
+      }
     }
   }
 
+  /**
+   * Returns the singleton instance
+   * @return
+   */
   public static DataMapStoreManager getInstance() {
     return instance;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
index b6a0f5b..0059b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java
@@ -16,19 +16,21 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+
 /**
  * Datamap type
  */
 public enum DataMapType {
-  BLOCKLET("org.apache.carbondata.datamap.BlockletDataMap");
+  BLOCKLET(BlockletDataMapFactory.class);
 
-  private String className;
+  private Class<? extends DataMapFactory> classObject;
 
-  DataMapType(String className) {
-    this.className = className;
+  DataMapType(Class<? extends DataMapFactory> classObject) {
+    this.classObject = classObject;
   }
 
-  public String getClassName() {
-    return className;
+  public Class<? extends DataMapFactory> getClassObject() {
+    return classObject;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
new file mode 100644
index 0000000..7e2bc0e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+
+/**
+ * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ */
+public class TableBlockIndexUniqueIdentifier {
+  /**
+   * table fully qualified identifier
+   */
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  private String segmentId;
+
+  private String carbonIndexFileName;
+
+  /**
+   * Constructor to initialize the class instance
+   *
+   * @param absoluteTableIdentifier
+   * @param segmentId
+   */
+  public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
+      String segmentId, String carbonIndexFileName) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    this.segmentId = segmentId;
+    this.carbonIndexFileName = carbonIndexFileName;
+  }
+
+  /**
+   * returns AbsoluteTableIdentifier
+   *
+   * @return
+   */
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+    return absoluteTableIdentifier;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  /**
+   * method returns the id to uniquely identify a key
+   *
+   * @return
+   */
+  public String getUniqueTableSegmentIdentifier() {
+    CarbonTableIdentifier carbonTableIdentifier =
+        absoluteTableIdentifier.getCarbonTableIdentifier();
+    return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+        + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
+        + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId
+        + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName;
+  }
+
+  public String getFilePath() {
+    return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/"
+        + carbonIndexFileName;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o;
+
+    if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) {
+      return false;
+    }
+    if (!segmentId.equals(that.segmentId)) {
+      return false;
+    }
+    return carbonIndexFileName.equals(that.carbonIndexFileName);
+  }
+
+  @Override public int hashCode() {
+    int result = absoluteTableIdentifier.hashCode();
+    result = 31 * result + segmentId.hashCode();
+    result = 31 * result + carbonIndexFileName.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
index e1532c8..39ca4c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java
@@ -16,38 +16,34 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.events.ChangeEvent;
 import org.apache.carbondata.core.events.EventListener;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
 /**
  * DataMap at the table level, user can add any number of datamaps for one table. Depends
  * on the filter condition it can prune the blocklets.
  */
-public interface TableDataMap extends EventListener {
+public final class TableDataMap implements EventListener {
 
-  /**
-   * It is called to initialize and load the required table datamap metadata.
-   */
-  void init(AbsoluteTableIdentifier identifier, String dataMapName);
+  private AbsoluteTableIdentifier identifier;
 
-  /**
-   * Gives the writer to write the metadata information of this datamap at table level.
-   *
-   * @return
-   */
-  DataMapWriter getWriter();
+  private String dataMapName;
+
+  private DataMapFactory dataMapFactory;
 
   /**
-   * Create the datamap using the segmentid  and name.
-   *
-   * @param identifier
-   * @param segmentId
-   * @return
+   * It is called to initialize and load the required table datamap metadata.
    */
-  DataMap createDataMap(AbsoluteTableIdentifier identifier, String segmentId);
+  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+      DataMapFactory dataMapFactory) {
+    this.identifier = identifier;
+    this.dataMapName = dataMapName;
+    this.dataMapFactory = dataMapFactory;
+  }
 
   /**
    * Pass the valid segments and prune the datamap using filter expression
@@ -56,7 +52,24 @@ public interface TableDataMap extends EventListener {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp);
+  public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) {
+    List<Blocklet> blocklets = new ArrayList<>();
+    for (String segmentId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+      for (DataMap dataMap : dataMaps) {
+        List<Blocklet> pruneBlocklets = dataMap.prune(filterExp);
+        blocklets.addAll(addSegmentId(pruneBlocklets, segmentId));
+      }
+    }
+    return blocklets;
+  }
+
+  private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) {
+    for (Blocklet blocklet : pruneBlocklets) {
+      blocklet.setSegmentId(segmentId);
+    }
+    return pruneBlocklets;
+  }
 
   /**
    * This is used for making the datamap distributable.
@@ -65,7 +78,16 @@ public interface TableDataMap extends EventListener {
    *
    * @return
    */
-  List<DataMapDistributable> toDistributable(List<String> segmentIds);
+  public List<DataMapDistributable> toDistributable(List<String> segmentIds) {
+    List<DataMapDistributable> distributables = new ArrayList<>();
+    for (String segmentsId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
+      for (DataMap dataMap : dataMaps) {
+        distributables.add(dataMap.toDistributable());
+      }
+    }
+    return distributables;
+  }
 
   /**
    * This method is used from any machine after it is distributed. It takes the distributable object
@@ -75,20 +97,37 @@ public interface TableDataMap extends EventListener {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp);
+  public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
+    return dataMapFactory.getDataMap(distributable).prune(filterExp);
+  }
+
+  @Override public void fireEvent(ChangeEvent event) {
+    dataMapFactory.fireEvent(event);
+  }
 
   /**
-   * This method checks whether the columns and the type of filters supported
-   * for this datamap or not
-   *
-   * @param filterExp
-   * @return
+   * Clear only the datamaps of the segments
+   * @param segmentIds
    */
-  boolean isFiltersSupported(FilterResolverIntf filterExp);
+  public void clear(List<String> segmentIds) {
+    for (String segmentId: segmentIds) {
+      dataMapFactory.clear(segmentId);
+    }
+  }
 
   /**
-   * Clears table level datamap
+   * Clears all datamap
+   */
+  public void clear() {
+    dataMapFactory.clear();
+  }
+  /**
+   * Get the unique name of datamap
+   *
+   * @return
    */
-  void clear();
+  public String getDataMapName() {
+    return dataMapName;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
new file mode 100644
index 0000000..8246f99
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryAllocator;
+import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to unsafe.
+ */
+public class UnsafeMemoryDMStore {
+
+  private MemoryBlock memoryBlock;
+
+  private static int capacity = 8 * 1024 * 1024;
+
+  private int allocatedSize;
+
+  private int runningLength;
+
+  private MemoryAllocator memoryAllocator;
+
+  private boolean isMemoryFreed;
+
+  private DataMapSchema[] schema;
+
+  private int[] pointers;
+
+  private int rowCount;
+
+  public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+    this.schema = schema;
+    this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
+    this.allocatedSize = capacity;
+    this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+    this.pointers = new int[1000];
+  }
+
+  /**
+   * Check memory is sufficient or not, if not sufficient allocate more memory and copy old data to
+   * new one.
+   *
+   * @param rowSize
+   */
+  private void ensureSize(int rowSize) {
+    if (runningLength + rowSize >= allocatedSize) {
+      MemoryBlock allocate =
+          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity);
+      unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+          allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+      memoryAllocator.free(memoryBlock);
+      allocatedSize = allocatedSize + capacity;
+      memoryBlock = allocate;
+    }
+    if (this.pointers.length <= rowCount + 1) {
+      int[] newPointer = new int[pointers.length + 1000];
+      System.arraycopy(pointers, 0, newPointer, 0, pointers.length);
+      this.pointers = newPointer;
+    }
+  }
+
+  /**
+   * Add the index row to unsafe.
+   *
+   * @param indexRow
+   * @return
+   */
+  public void addIndexRowToUnsafe(DataMapRow indexRow) {
+    // First calculate the required memory to keep the row in unsafe
+    int rowSize = indexRow.getTotalSizeInBytes();
+    // Check whether allocated memory is sufficient or not.
+    ensureSize(rowSize);
+    int pointer = runningLength;
+
+    for (int i = 0; i < schema.length; i++) {
+      addToUnsafe(schema[i], indexRow, i);
+    }
+    pointers[rowCount++] = pointer;
+  }
+
+  private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) {
+    switch (schema.getSchemaType()) {
+      case FIXED:
+        switch (schema.getDataType()) {
+          case BYTE:
+            unsafe.putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                row.getByte(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case SHORT:
+            unsafe
+                .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                    row.getShort(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case INT:
+            unsafe.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                row.getInt(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case LONG:
+            unsafe.putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                row.getLong(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case FLOAT:
+            unsafe
+                .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                    row.getFloat(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case DOUBLE:
+            unsafe
+                .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
+                    row.getDouble(index));
+            runningLength += row.getSizeInBytes(index);
+            break;
+          case BYTE_ARRAY:
+            byte[] data = row.getByteArray(index);
+            unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+                memoryBlock.getBaseOffset() + runningLength, data.length);
+            runningLength += row.getSizeInBytes(index);
+            break;
+        }
+        break;
+      case VARIABLE:
+        byte[] data = row.getByteArray(index);
+        unsafe.putShort(memoryBlock.getBaseOffset() + runningLength, (short) data.length);
+        runningLength += 2;
+        unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+            memoryBlock.getBaseOffset() + runningLength, data.length);
+        runningLength += data.length;
+        break;
+      case STRUCT:
+        DataMapSchema[] childSchemas =
+            ((DataMapSchema.StructDataMapSchema) schema).getChildSchemas();
+        DataMapRow struct = row.getRow(index);
+        for (int i = 0; i < childSchemas.length; i++) {
+          addToUnsafe(childSchemas[i], struct, i);
+        }
+        break;
+    }
+  }
+
+  public DataMapRow getUnsafeRow(int index) {
+    assert (index < rowCount);
+    return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
+  }
+
+  public void finishWriting() {
+    if (runningLength < allocatedSize) {
+      MemoryBlock allocate =
+          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+      unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+          allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+      memoryAllocator.free(memoryBlock);
+      memoryBlock = allocate;
+    }
+    // Compact pointers.
+    if (rowCount < pointers.length) {
+      int[] newPointer = new int[rowCount];
+      System.arraycopy(pointers, 0, newPointer, 0, rowCount);
+      this.pointers = newPointer;
+    }
+  }
+
+  public void freeMemory() {
+    if (!isMemoryFreed) {
+      memoryAllocator.free(memoryBlock);
+      isMemoryFreed = true;
+    }
+  }
+
+  public int getMemoryUsed() {
+    return runningLength;
+  }
+
+  public DataMapSchema[] getSchema() {
+    return schema;
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
new file mode 100644
index 0000000..9a50600
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Data map comparator
+ */
+public class BlockletDMComparator implements Comparator<DataMapRow> {
+
+  /**
+   * no dictionary column value is of variable length so in each column value
+   * it will -1
+   */
+  private static final int NO_DCITIONARY_COLUMN_VALUE = -1;
+
+  /**
+   * sized of the short value in bytes
+   */
+  private static final short SHORT_SIZE_IN_BYTES = 2;
+
+  private int[] eachColumnValueSize;
+
+  /**
+   * the number of no dictionary columns in SORT_COLUMNS
+   */
+  private int numberOfNoDictSortColumns;
+
+  /**
+   * the number of columns in SORT_COLUMNS
+   */
+  private int numberOfSortColumns;
+
+  public BlockletDMComparator(int[] eachColumnValueSize, int numberOfSortColumns,
+      int numberOfNoDictSortColumns) {
+    this.eachColumnValueSize = eachColumnValueSize;
+    this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  @Override public int compare(DataMapRow first, DataMapRow second) {
+    int dictionaryKeyOffset = 0;
+    int nonDictionaryKeyOffset = 0;
+    int compareResult = 0;
+    int processedNoDictionaryColumn = numberOfNoDictSortColumns;
+    byte[][] firstBytes = splitKey(first.getByteArray(0));
+    byte[][] secondBytes = splitKey(first.getByteArray(0));
+    byte[] firstNoDictionaryKeys = firstBytes[1];
+    ByteBuffer firstNoDictionaryKeyBuffer = ByteBuffer.wrap(firstNoDictionaryKeys);
+    byte[] secondNoDictionaryKeys = secondBytes[1];
+    ByteBuffer secondNoDictionaryKeyBuffer = ByteBuffer.wrap(secondNoDictionaryKeys);
+    int actualOffset = 0;
+    int actualOffset1 = 0;
+    int firstNoDcitionaryLength = 0;
+    int secondNodeDictionaryLength = 0;
+
+    for (int i = 0; i < numberOfSortColumns; i++) {
+
+      if (eachColumnValueSize[i] != NO_DCITIONARY_COLUMN_VALUE) {
+        byte[] firstDictionaryKeys = firstBytes[0];
+        byte[] secondDictionaryKeys = secondBytes[0];
+        compareResult = ByteUtil.UnsafeComparer.INSTANCE
+            .compareTo(firstDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i],
+                secondDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i]);
+        dictionaryKeyOffset += eachColumnValueSize[i];
+      } else {
+        if (processedNoDictionaryColumn > 1) {
+          actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+          firstNoDcitionaryLength =
+              firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+                  - actualOffset;
+          actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+          secondNodeDictionaryLength =
+              secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES)
+                  - actualOffset1;
+          compareResult = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+                  secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+          nonDictionaryKeyOffset += SHORT_SIZE_IN_BYTES;
+          processedNoDictionaryColumn--;
+        } else {
+          actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+          actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset);
+          firstNoDcitionaryLength = firstNoDictionaryKeys.length - actualOffset;
+          secondNodeDictionaryLength = secondNoDictionaryKeys.length - actualOffset1;
+          compareResult = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength,
+                  secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength);
+        }
+      }
+      if (compareResult != 0) {
+        return compareResult;
+      }
+    }
+
+    return 0;
+  }
+
+  /**
+   * Split the index key to dictionary and no dictionary.
+   * @param startKey
+   * @return
+   */
+  private byte[][] splitKey(byte[] startKey) {
+    ByteBuffer buffer = ByteBuffer.wrap(startKey);
+    buffer.rewind();
+    int dictonaryKeySize = buffer.getInt();
+    int nonDictonaryKeySize = buffer.getInt();
+    byte[] dictionaryKey = new byte[dictonaryKeySize];
+    buffer.get(dictionaryKey);
+    byte[] nonDictionaryKey = new byte[nonDictonaryKeySize];
+    buffer.get(nonDictionaryKey);
+    return new byte[][] {dictionaryKey, nonDictionaryKey};
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
new file mode 100644
index 0000000..79aa091
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockletDataMap implements DataMap, Cacheable {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+
+  private static int KEY_INDEX = 0;
+
+  private static int MIN_VALUES_INDEX = 1;
+
+  private static int MAX_VALUES_INDEX = 2;
+
+  private static int ROW_COUNT_INDEX = 3;
+
+  private static int FILE_PATH_INDEX = 4;
+
+  private static int PAGE_COUNT_INDEX = 5;
+
+  private static int VERSION_INDEX = 6;
+
+  private static int SCHEMA_UPADATED_TIME_INDEX = 7;
+
+  private static int BLOCK_INFO_INDEX = 8;
+
+  private UnsafeMemoryDMStore unsafeMemoryDMStore;
+
+  private SegmentProperties segmentProperties;
+
+  private int[] columnCardinality;
+
+  @Override public DataMapWriter getWriter() {
+    return null;
+  }
+
+  @Override public void init(String path) {
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    try {
+      List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+      for (DataFileFooter fileFooter : indexInfo) {
+        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+        if (segmentProperties == null) {
+          columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
+          segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
+          createSchema(segmentProperties);
+        }
+        TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
+        fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+
+        loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+      }
+      if (unsafeMemoryDMStore != null) {
+        unsafeMemoryDMStore.finishWriting();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties,
+      String filePath) {
+    int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+    List<BlockletInfo> blockletList = fileFooter.getBlockletList();
+    DataMapSchema[] schema = unsafeMemoryDMStore.getSchema();
+    for (int index = 0; index < blockletList.size(); index++) {
+      DataMapRow row = new DataMapRowImpl(schema);
+      int ordinal = 0;
+      BlockletInfo blockletInfo = blockletList.get(index);
+
+      // add start key as index key
+      row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
+
+      BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal);
+      ordinal++;
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal);
+      ordinal++;
+
+      row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
+
+      // add file path
+      byte[] filePathBytes = filePath.getBytes();
+      row.setByteArray(filePathBytes, ordinal++);
+
+      // add pages
+      row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+
+      // add version number
+      row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+      // add schema updated time
+      row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+      // add blocklet info
+      byte[] serializedData;
+      try {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutput dataOutput = new DataOutputStream(stream);
+        blockletInfo.write(dataOutput);
+        serializedData = stream.toByteArray();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      row.setByteArray(serializedData, ordinal);
+      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+    }
+  }
+
+  private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) {
+    DataMapSchema[] minSchemas =
+        ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas();
+    DataMapRow minRow = new DataMapRowImpl(minSchemas);
+    int minOrdinal = 0;
+    // min value adding
+    for (int i = 0; i < minMaxLen.length; i++) {
+      minRow.setByteArray(minValues[i], minOrdinal++);
+    }
+    return minRow;
+  }
+
+  private void createSchema(SegmentProperties segmentProperties) {
+    List<DataMapSchema> indexSchemas = new ArrayList<>();
+
+    // Index key
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+    int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+    // do it 2 times, one for min and one for max.
+    for (int k = 0; k < 2; k++) {
+      DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
+      for (int i = 0; i < minMaxLen.length; i++) {
+        if (minMaxLen[i] <= 0) {
+          mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY);
+        } else {
+          mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]);
+        }
+      }
+      DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas);
+      indexSchemas.add(mapSchema);
+    }
+
+    // for number of rows.
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.INT));
+
+    // for table block path
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+    // for number of pages.
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+    // for version number.
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT));
+
+    // for schema updated time.
+    indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.LONG));
+
+    //for blocklet info
+    indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
+
+    unsafeMemoryDMStore =
+        new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));
+  }
+
+  @Override public List<Blocklet> prune(FilterResolverIntf filterExp) {
+
+    // getting the start and end index key based on filter for hitting the
+    // selected block reference nodes based on filter resolver tree.
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("preparing the start and end key for finding"
+          + "start and end block as per filter resolver");
+    }
+    List<Blocklet> blocklets = new ArrayList<>();
+    Comparator<DataMapRow> comparator =
+        new BlockletDMComparator(segmentProperties.getEachDimColumnValueSize(),
+            segmentProperties.getNumberOfSortColumns(),
+            segmentProperties.getNumberOfNoDictSortColumns());
+    List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
+    FilterUtil
+        .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys);
+    // reading the first value from list which has start key
+    IndexKey searchStartKey = listOfStartEndKeys.get(0);
+    // reading the last value from list which has end key
+    IndexKey searchEndKey = listOfStartEndKeys.get(1);
+    if (null == searchStartKey && null == searchEndKey) {
+      try {
+        // TODO need to handle for no dictionary dimensions
+        searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+        // TODO need to handle for no dictionary dimensions
+        searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+      } catch (KeyGenException e) {
+        return null;
+      }
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Successfully retrieved the start and end key" + "Dictionary Start Key: " + searchStartKey
+              .getDictionaryKeys() + "No Dictionary Start Key " + searchStartKey
+              .getNoDictionaryKeys() + "Dictionary End Key: " + searchEndKey.getDictionaryKeys()
+              + "No Dictionary End Key " + searchEndKey.getNoDictionaryKeys());
+    }
+    if (filterExp == null) {
+      int rowCount = unsafeMemoryDMStore.getRowCount();
+      for (int i = 0; i < rowCount; i++) {
+        DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(i);
+        blocklets.add(createBlocklet(unsafeRow, i));
+      }
+    } else {
+      int startIndex = findStartIndex(convertToRow(searchStartKey), comparator);
+      int endIndex = findEndIndex(convertToRow(searchEndKey), comparator);
+      FilterExecuter filterExecuter =
+          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+      while (startIndex <= endIndex) {
+        DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex);
+        BitSet bitSet = filterExecuter.isScanRequired(getMinMaxValue(unsafeRow, MAX_VALUES_INDEX),
+            getMinMaxValue(unsafeRow, MIN_VALUES_INDEX));
+        if (!bitSet.isEmpty()) {
+          blocklets.add(createBlocklet(unsafeRow, startIndex));
+        }
+        startIndex++;
+      }
+    }
+
+    return blocklets;
+  }
+
+  private byte[][] getMinMaxValue(DataMapRow row, int index) {
+    DataMapRow minMaxRow = row.getRow(index);
+    byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
+    for (int i = 0; i < minMax.length; i++) {
+      minMax[i] = minMaxRow.getByteArray(i);
+    }
+    return minMax;
+  }
+
+  private Blocklet createBlocklet(DataMapRow row, int blockletId) {
+    Blocklet blocklet =
+        new Blocklet(new String(row.getByteArray(FILE_PATH_INDEX)), blockletId + "");
+    BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+    detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
+    detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
+    detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
+    detailInfo.setDimLens(columnCardinality);
+    detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
+    BlockletInfo blockletInfo = new BlockletInfo();
+    try {
+      byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
+      ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
+      DataInputStream inputStream = new DataInputStream(stream);
+      blockletInfo.readFields(inputStream);
+      inputStream.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    detailInfo.setBlockletInfo(blockletInfo);
+    blocklet.setDetailInfo(detailInfo);
+    return blocklet;
+  }
+
+  /**
+   * Binary search used to get the first tentative index row based on
+   * search key
+   *
+   * @param key search key
+   * @return first tentative block
+   */
+  private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+    int childNodeIndex;
+    int low = 0;
+    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int mid = 0;
+    int compareRes = -1;
+    //
+    while (low <= high) {
+      mid = (low + high) >>> 1;
+      // compare the entries
+      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      if (compareRes < 0) {
+        high = mid - 1;
+      } else if (compareRes > 0) {
+        low = mid + 1;
+      } else {
+        // if key is matched then get the first entry
+        int currentPos = mid;
+        while (currentPos - 1 >= 0
+            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+          currentPos--;
+        }
+        mid = currentPos;
+        break;
+      }
+    }
+    // if compare result is less than zero then we
+    // and mid is more than 0 then we need to previous block as duplicates
+    // record can be present
+    if (compareRes < 0) {
+      if (mid > 0) {
+        mid--;
+      }
+      childNodeIndex = mid;
+    } else {
+      childNodeIndex = mid;
+    }
+    // get the leaf child
+    return childNodeIndex;
+  }
+
+  /**
+   * Binary search used to get the last tentative block  based on
+   * search key
+   *
+   * @param key search key
+   * @return first tentative block
+   */
+  private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
+    int childNodeIndex;
+    int low = 0;
+    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int mid = 0;
+    int compareRes = -1;
+    //
+    while (low <= high) {
+      mid = (low + high) >>> 1;
+      // compare the entries
+      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      if (compareRes < 0) {
+        high = mid - 1;
+      } else if (compareRes > 0) {
+        low = mid + 1;
+      } else {
+        int currentPos = mid;
+        // if key is matched then get the first entry
+        while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
+            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+          currentPos++;
+        }
+        mid = currentPos;
+        break;
+      }
+    }
+    // if compare result is less than zero then we
+    // and mid is more than 0 then we need to previous block as duplicates
+    // record can be present
+    if (compareRes < 0) {
+      if (mid > 0) {
+        mid--;
+      }
+      childNodeIndex = mid;
+    } else {
+      childNodeIndex = mid;
+    }
+    return childNodeIndex;
+  }
+
+  private DataMapRow convertToRow(IndexKey key) {
+    ByteBuffer buffer =
+        ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8);
+    buffer.putInt(key.getDictionaryKeys().length);
+    buffer.putInt(key.getNoDictionaryKeys().length);
+    buffer.put(key.getDictionaryKeys());
+    buffer.put(key.getNoDictionaryKeys());
+    DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+    dataMapRow.setByteArray(buffer.array(), 0);
+    return dataMapRow;
+  }
+
+  @Override public void clear() {
+    unsafeMemoryDMStore.freeMemory();
+    unsafeMemoryDMStore = null;
+    segmentProperties = null;
+  }
+
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
+  @Override public int getAccessCount() {
+    return 0;
+  }
+
+  @Override public long getMemorySize() {
+    return unsafeMemoryDMStore.getMemoryUsed();
+  }
+
+  @Override public DataMapDistributable toDistributable() {
+    // TODO
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
new file mode 100644
index 0000000..2fe6643
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.events.ChangeEvent;
+import org.apache.carbondata.core.indexstore.DataMap;
+import org.apache.carbondata.core.indexstore.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.DataMapFactory;
+import org.apache.carbondata.core.indexstore.DataMapWriter;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.schema.FilterType;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * Table map for blocklet
+ */
+public class BlockletDataMapFactory implements DataMapFactory {
+
+  private AbsoluteTableIdentifier identifier;
+
+  private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+
+  private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache;
+
+  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+    this.identifier = identifier;
+    cache = CacheProvider.getInstance()
+        .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath());
+  }
+
+  public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) {
+    return null;
+  }
+
+  public List<DataMap> getDataMaps(String segmentId) {
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+        segmentMap.get(segmentId);
+    if (tableBlockIndexUniqueIdentifiers == null) {
+      tableBlockIndexUniqueIdentifiers = new ArrayList<>();
+      String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId;
+      FileFactory.FileType fileType = FileFactory.getFileType(path);
+      CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(".carbonindex");
+        }
+      });
+      for (int i = 0; i < listFiles.length; i++) {
+        tableBlockIndexUniqueIdentifiers.add(
+            new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName()));
+      }
+    }
+
+    try {
+      return cache.getAll(tableBlockIndexUniqueIdentifiers);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override public boolean isFiltersSupported(FilterType filterType) {
+    return true;
+  }
+
+  public void clear(String segmentId) {
+    List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
+    if (blockIndexes != null) {
+      for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
+        DataMap dataMap = cache.getIfPresent(blockIndex);
+        dataMap.clear();
+        cache.invalidate(blockIndex);
+      }
+    }
+  }
+
+  @Override public void clear() {
+    for (String segmentId: segmentMap.keySet()) {
+      clear(segmentId);
+    }
+  }
+
+  @Override public DataMap getDataMap(DataMapDistributable distributable) {
+    return null;
+  }
+
+  @Override public void fireEvent(ChangeEvent event) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
new file mode 100644
index 0000000..5509c75
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+
+/**
+ * wrapper for blocklet data map data
+ */
+public class BlockletDataRefNodeWrapper implements DataRefNode {
+
+  private List<TableBlockInfo> blockInfos;
+
+  private int index;
+
+  private int[] dimensionLens;
+
+  private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache;
+
+  public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index,
+      int[] dimensionLens) {
+    this.blockInfos = blockInfos;
+    this.index = index;
+    this.dimensionLens = dimensionLens;
+  }
+
+  @Override public DataRefNode getNextDataRefNode() {
+    if (index + 1 < blockInfos.size()) {
+      new BlockletDataRefNodeWrapper(blockInfos, index + 1, dimensionLens);
+    }
+    return null;
+  }
+
+  @Override public int nodeSize() {
+    return blockInfos.get(index).getDetailInfo().getRowCount();
+  }
+
+  @Override public long nodeNumber() {
+    return index;
+  }
+
+  @Override public byte[][] getColumnsMaxValue() {
+    return null;
+  }
+
+  @Override public byte[][] getColumnsMinValue() {
+    return null;
+  }
+
+  @Override
+  public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes)
+      throws IOException {
+    DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+    return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes);
+  }
+
+  @Override
+  public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndexes)
+      throws IOException {
+    DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader();
+    return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndexes);
+  }
+
+  @Override
+  public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+      throws IOException {
+    MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+    return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes);
+  }
+
+  @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex)
+      throws IOException {
+    MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader();
+    return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex);
+  }
+
+  private DimensionColumnChunkReader getDimensionColumnChunkReader() throws IOException {
+    ColumnarFormatVersion version =
+        ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+    DimensionColumnChunkReader dimensionColumnChunkReader = CarbonDataReaderFactory.getInstance()
+        .getDimensionColumnChunkReader(version,
+            blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens,
+            blockInfos.get(index).getFilePath());
+    return dimensionColumnChunkReader;
+  }
+
+  private MeasureColumnChunkReader getMeasureColumnChunkReader() throws IOException {
+    ColumnarFormatVersion version =
+        ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber());
+    return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version,
+        blockInfos.get(index).getDetailInfo().getBlockletInfo(),
+        blockInfos.get(index).getFilePath());
+  }
+
+  @Override
+  public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) {
+    this.deleteDeltaDataCache = deleteDeltaDataCache;
+  }
+
+  @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+    return deleteDeltaDataCache;
+  }
+
+  @Override public int numberOfPages() {
+    return blockInfos.get(index).getDetailInfo().getPagesCount();
+  }
+
+  public int numberOfNodes() {
+    return blockInfos.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
new file mode 100644
index 0000000..b8cffc6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Wrapper of abstract index
+ * TODO it could be removed after refactor
+ */
+public class IndexWrapper extends AbstractIndex {
+
+  public IndexWrapper(List<TableBlockInfo> blockInfos) {
+    DataFileFooter fileFooter = null;
+    try {
+      fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
+        fileFooter.getSegmentInfo().getColumnCardinality());
+    dataRefNode = new BlockletDataRefNodeWrapper(blockInfos, 0,
+        segmentProperties.getDimensionColumnsValueSize());
+  }
+
+  @Override public void buildIndex(List<DataFileFooter> footerList) {
+  }
+}