You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/22 05:56:01 UTC

[2/2] carbondata git commit: [CARBONDATA-1543] Supported DataMap chooser and expression for supporting multiple datamaps in single query

[CARBONDATA-1543] Supported DataMap chooser and expression for supporting multiple datamaps in single query

This PR supports 3 features.

1.Load datamaps from the DataMapSchema which are created through DDL.
2.DataMap Chooser: It chooses the datamap out of available datamaps based on simple logic. Like if there is filter condition on column1 then for supposing 2 datamaps(1. column1 2. column1+column2) are supporting this column then we choose the datamap which has fewer columns that is the first datamap.
3.Expression support: Based on the filter expressions we convert them to the possible DataMap expressions and do apply expression on it.
For example, there are 2 datamaps available on table1
Datamap1 : column1
Datamap2 : column2
Query: select * from table1 where column1 ='a' and column2 =b
For the above query, we create datamap expression as AndDataMapExpression(Datamap1, DataMap2). So for the above query both the datamaps are included and the output of them will be applied AND condition to improve the performance

This closes #1510


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e616162c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e616162c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e616162c

Branch: refs/heads/datamap
Commit: e616162c023f055189bf6685471fc448b54c20ef
Parents: 88c0527
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Tue Nov 21 15:49:11 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Feb 22 13:55:31 2018 +0800

----------------------------------------------------------------------
 .../exceptions/MetadataProcessException.java    |  37 +++
 .../carbondata/core/datamap/DataMapChooser.java | 284 +++++++++++++++++++
 .../core/datamap/DataMapDistributable.java      |  21 +-
 .../core/datamap/DataMapStoreManager.java       | 147 +++++++---
 .../carbondata/core/datamap/TableDataMap.java   |  23 +-
 .../core/datamap/dev/DataMapFactory.java        |   3 +-
 .../datamap/dev/expr/AndDataMapExprWrapper.java |  97 +++++++
 .../dev/expr/DataMapDistributableWrapper.java   |  56 ++++
 .../datamap/dev/expr/DataMapExprWrapper.java    |  77 +++++
 .../dev/expr/DataMapExprWrapperImpl.java        |  86 ++++++
 .../datamap/dev/expr/OrDataMapExprWrapper.java  |  94 ++++++
 .../carbondata/core/datastore/TableSpec.java    |  14 +-
 .../carbondata/core/indexstore/Blocklet.java    |  20 ++
 .../core/indexstore/ExtendedBlocklet.java       |  33 ++-
 .../core/indexstore/FineGrainBlocklet.java      |   8 +
 .../blockletindex/BlockletDataMap.java          |   2 -
 .../blockletindex/BlockletDataMapFactory.java   |   8 +-
 .../conditional/StartsWithExpression.java       |  72 +++++
 .../scan/filter/FilterExpressionProcessor.java  |  20 +-
 .../core/scan/filter/intf/ExpressionType.java   |   6 +-
 .../statusmanager/SegmentStatusManager.java     |   5 +-
 .../datamap/examples/MinMaxDataMapFactory.java  |   3 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  30 +-
 .../carbondata/hadoop/api/DataMapJob.java       |   2 +-
 .../hadoop/api/DistributableDataMapFormat.java  |  32 ++-
 .../sdv/generated/MergeIndexTestCase.scala      |  18 +-
 .../preaggregate/TestPreAggCreateCommand.scala  |   7 +-
 .../timeseries/TestTimeSeriesCreateTable.scala  |   6 +-
 ...CompactionSupportGlobalSortBigFileTest.scala |   2 +-
 .../testsuite/dataload/TestLoadDataFrame.scala  |  24 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  62 ++--
 .../testsuite/datamap/DataMapWriterSuite.scala  |  17 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  68 +++--
 .../testsuite/datamap/TestDataMapCommand.scala  |  73 ++---
 .../iud/InsertOverwriteConcurrentTest.scala     |   8 +-
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |   6 +-
 .../org/apache/spark/sql/CarbonSource.scala     |   8 +-
 .../spark/sql/SparkUnknownExpression.scala      |   6 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |  40 ++-
 .../datamap/CarbonDropDataMapCommand.scala      |   3 +-
 .../CreatePreAggregateTableCommand.scala        |  13 +-
 .../preaaggregate/PreAggregateUtil.scala        |  11 +-
 .../table/CarbonCreateTableCommand.scala        |   2 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 .../spark/sql/optimizer/CarbonFilters.scala     |  20 +-
 .../datamap/DataMapWriterListener.java          |   6 +-
 .../loading/DataLoadProcessBuilder.java         |   2 +-
 .../store/CarbonFactDataHandlerModel.java       |  12 +-
 48 files changed, 1294 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
new file mode 100644
index 0000000..aaeee5e
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when failed to process metadata while executing
+ * carbon command
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class MetadataProcessException extends RuntimeException {
+  public MetadataProcessException(String message) {
+    super(message);
+  }
+
+  public MetadataProcessException(String message, Throwable cause) {
+    super(message + ": " + cause.getMessage(), cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
new file mode 100644
index 0000000..5155009
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.dev.expr.AndDataMapExprWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapperImpl;
+import org.apache.carbondata.core.datamap.dev.expr.OrDataMapExprWrapper;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.TrueConditionalResolverImpl;
+
+/**
+ * This chooser does 2 jobs.
+ * 1. Based on filter expression it converts the available datamaps to datamap expression.
+ *   For example, there are 2 datamaps available on table1
+ *   Datamap1 : column1
+ *   Datamap2 : column2
+ *   Query: select * from table1 where column1 ='a' and column2 =b
+ *   For the above query, we create datamap expression as AndDataMapExpression(Datamap1, DataMap2).
+ *   So for the above query both the datamaps are included and the output of them will be
+ *   applied AND condition to improve the performance
+ *
+ * 2. It chooses the datamap out of available datamaps based on simple logic.
+ *   Like if there is filter condition on column1 then for
+ *   supposing 2 datamaps(1. column1 2. column1+column2) are supporting this column then we choose
+ *   the datamap which has fewer columns that is the first datamap.
+ */
+@InterfaceAudience.Internal
+public class DataMapChooser {
+
+  private static DataMapChooser INSTANCE;
+
+  private DataMapChooser() { }
+
+  public static DataMapChooser get() {
+    if (INSTANCE == null) {
+      INSTANCE = new DataMapChooser();
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Return a chosen datamap based on input filter. See {@link DataMapChooser}
+   */
+  public DataMapExprWrapper choose(CarbonTable carbonTable, FilterResolverIntf resolverIntf) {
+    if (resolverIntf != null) {
+      Expression expression = resolverIntf.getFilterExpression();
+      // First check for FG datamaps if any exist
+      List<TableDataMap> allDataMapFG =
+          DataMapStoreManager.getInstance().getAllDataMap(carbonTable, DataMapType.FG);
+      ExpressionTuple tuple = selectDataMap(expression, allDataMapFG);
+      if (tuple.dataMapExprWrapper == null) {
+        // Check for CG datamap
+        List<TableDataMap> allDataMapCG =
+            DataMapStoreManager.getInstance().getAllDataMap(carbonTable, DataMapType.CG);
+        tuple = selectDataMap(expression, allDataMapCG);
+      }
+      if (tuple.dataMapExprWrapper != null) {
+        return tuple.dataMapExprWrapper;
+      }
+    }
+    // Return the default datamap if no other datamap exists.
+    return new DataMapExprWrapperImpl(DataMapStoreManager.getInstance()
+        .getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()), resolverIntf);
+  }
+
+  private ExpressionTuple selectDataMap(Expression expression, List<TableDataMap> allDataMap) {
+    switch (expression.getFilterExpressionType()) {
+      case AND:
+        if (expression instanceof AndExpression) {
+          AndExpression andExpression = (AndExpression) expression;
+          ExpressionTuple left = selectDataMap(andExpression.getLeft(), allDataMap);
+          ExpressionTuple right = selectDataMap(andExpression.getRight(), allDataMap);
+          Set<ExpressionType> filterExpressionTypes = new HashSet<>();
+          // If both left and right has datamap then we can either merge both datamaps to single
+          // datamap if possible. Otherwise apply AND expression.
+          if (left.dataMapExprWrapper != null && right.dataMapExprWrapper != null) {
+            filterExpressionTypes.add(
+                left.dataMapExprWrapper.getFilterResolverIntf().getFilterExpression()
+                    .getFilterExpressionType());
+            filterExpressionTypes.add(
+                right.dataMapExprWrapper.getFilterResolverIntf().getFilterExpression()
+                    .getFilterExpressionType());
+            List<ColumnExpression> columnExpressions = new ArrayList<>();
+            columnExpressions.addAll(left.columnExpressions);
+            columnExpressions.addAll(right.columnExpressions);
+            // Check if we can merge them to single datamap.
+            TableDataMap dataMap =
+                chooseDataMap(allDataMap, columnExpressions, filterExpressionTypes);
+            if (dataMap != null) {
+              ExpressionTuple tuple = new ExpressionTuple();
+              tuple.columnExpressions = columnExpressions;
+              tuple.dataMapExprWrapper = new DataMapExprWrapperImpl(dataMap,
+                  new TrueConditionalResolverImpl(expression, false, false));
+              return tuple;
+            } else {
+              // Apply AND expression.
+              ExpressionTuple tuple = new ExpressionTuple();
+              tuple.columnExpressions = columnExpressions;
+              tuple.dataMapExprWrapper =
+                  new AndDataMapExprWrapper(left.dataMapExprWrapper, right.dataMapExprWrapper,
+                      new TrueConditionalResolverImpl(expression, false, false));
+              return tuple;
+            }
+          } else if (left.dataMapExprWrapper != null && right.dataMapExprWrapper == null) {
+            return left;
+          } else if (left.dataMapExprWrapper == null && right.dataMapExprWrapper != null) {
+            return right;
+          } else {
+            return left;
+          }
+        }
+        break;
+      case OR:
+        if (expression instanceof OrExpression) {
+          OrExpression orExpression = (OrExpression) expression;
+          ExpressionTuple left = selectDataMap(orExpression.getLeft(), allDataMap);
+          ExpressionTuple right = selectDataMap(orExpression.getRight(), allDataMap);
+          Set<ExpressionType> filterExpressionTypes = new HashSet<>();
+          // If both left and right has datamap then we can either merge both datamaps to single
+          // datamap if possible. Otherwise apply OR expression.
+          if (left.dataMapExprWrapper != null && right.dataMapExprWrapper != null) {
+            filterExpressionTypes.add(
+                left.dataMapExprWrapper.getFilterResolverIntf().getFilterExpression()
+                    .getFilterExpressionType());
+            filterExpressionTypes.add(
+                right.dataMapExprWrapper.getFilterResolverIntf().getFilterExpression()
+                    .getFilterExpressionType());
+            List<ColumnExpression> columnExpressions = new ArrayList<>();
+            columnExpressions.addAll(left.columnExpressions);
+            columnExpressions.addAll(right.columnExpressions);
+            TableDataMap dataMap =
+                chooseDataMap(allDataMap, columnExpressions, filterExpressionTypes);
+            if (dataMap != null) {
+              ExpressionTuple tuple = new ExpressionTuple();
+              tuple.columnExpressions = columnExpressions;
+              tuple.dataMapExprWrapper = new DataMapExprWrapperImpl(dataMap,
+                  new TrueConditionalResolverImpl(expression, false, false));
+              return tuple;
+            } else {
+              ExpressionTuple tuple = new ExpressionTuple();
+              tuple.columnExpressions = columnExpressions;
+              tuple.dataMapExprWrapper =
+                  new OrDataMapExprWrapper(left.dataMapExprWrapper, right.dataMapExprWrapper,
+                      new TrueConditionalResolverImpl(expression, false, false));
+              return tuple;
+            }
+          } else {
+            left.dataMapExprWrapper = null;
+            return left;
+          }
+        }
+        break;
+      default:
+        ExpressionTuple tuple = new ExpressionTuple();
+        extractColumnExpression(expression, tuple.columnExpressions);
+        Set<ExpressionType> filterExpressionTypes = new HashSet<>();
+        filterExpressionTypes.add(expression.getFilterExpressionType());
+        TableDataMap dataMap =
+            chooseDataMap(allDataMap, tuple.columnExpressions, filterExpressionTypes);
+        if (dataMap != null) {
+          tuple.dataMapExprWrapper = new DataMapExprWrapperImpl(dataMap,
+              new TrueConditionalResolverImpl(expression, false, false));
+        }
+        return tuple;
+    }
+    return new ExpressionTuple();
+  }
+
+  private void extractColumnExpression(Expression expression,
+      List<ColumnExpression> columnExpressions) {
+    if (expression instanceof ColumnExpression) {
+      columnExpressions.add((ColumnExpression) expression);
+    } else if (expression != null) {
+      List<Expression> children = expression.getChildren();
+      if (children != null && children.size() > 0) {
+        for (Expression exp : children) {
+          extractColumnExpression(exp, columnExpressions);
+        }
+      }
+    }
+  }
+
+  private TableDataMap chooseDataMap(List<TableDataMap> allDataMap,
+      List<ColumnExpression> columnExpressions, Set<ExpressionType> expressionTypes) {
+    List<DataMapTuple> tuples = new ArrayList<>();
+    for (TableDataMap dataMap : allDataMap) {
+      if (contains(dataMap.getDataMapFactory().getMeta(), columnExpressions, expressionTypes)) {
+        tuples.add(
+            new DataMapTuple(dataMap.getDataMapFactory().getMeta().getIndexedColumns().size(),
+                dataMap));
+      }
+    }
+    if (tuples.size() > 0) {
+      Collections.sort(tuples);
+      return tuples.get(0).dataMap;
+    }
+    return null;
+  }
+
+  private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpressions,
+      Set<ExpressionType> expressionTypes) {
+    if (mapMeta.getIndexedColumns().size() == 0 || columnExpressions.size() == 0) {
+      return false;
+    }
+    boolean contains = true;
+    for (ColumnExpression expression : columnExpressions) {
+      if (!mapMeta.getIndexedColumns().contains(expression.getColumnName()) || !mapMeta
+          .getOptimizedOperation().containsAll(expressionTypes)) {
+        contains = false;
+        break;
+      }
+    }
+    return contains;
+  }
+
+  private static class ExpressionTuple {
+
+    DataMapExprWrapper dataMapExprWrapper;
+
+    List<ColumnExpression> columnExpressions = new ArrayList<>();
+
+  }
+
+  private static class DataMapTuple implements Comparable<DataMapTuple> {
+
+    int order;
+
+    TableDataMap dataMap;
+
+    public DataMapTuple(int order, TableDataMap dataMap) {
+      this.order = order;
+      this.dataMap = dataMap;
+    }
+
+    @Override public int compareTo(DataMapTuple o) {
+      return order - o.order;
+    }
+
+    @Override public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      DataMapTuple that = (DataMapTuple) o;
+
+      if (order != that.order) return false;
+      return dataMap != null ? dataMap.equals(that.dataMap) : that.dataMap == null;
+    }
+
+    @Override public int hashCode() {
+      int result = order;
+      result = 31 * result + (dataMap != null ? dataMap.hashCode() : 0);
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
index 50af789..828cdbb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.Serializable;
 
 import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 
@@ -33,11 +34,9 @@ public abstract class DataMapDistributable extends InputSplit
 
   private String segmentId;
 
-  private String dataMapName;
-
   private String[] locations;
 
-  private String dataMapFactoryClass;
+  private DataMapSchema dataMapSchema;
 
   public String getTablePath() {
     return tablePath;
@@ -55,20 +54,12 @@ public abstract class DataMapDistributable extends InputSplit
     this.segmentId = segmentId;
   }
 
-  public String getDataMapName() {
-    return dataMapName;
-  }
-
-  public void setDataMapName(String dataMapName) {
-    this.dataMapName = dataMapName;
-  }
-
-  public String getDataMapFactoryClass() {
-    return dataMapFactoryClass;
+  public DataMapSchema getDataMapSchema() {
+    return dataMapSchema;
   }
 
-  public void setDataMapFactoryClass(String dataMapFactoryClass) {
-    this.dataMapFactoryClass = dataMapFactoryClass;
+  public void setDataMapSchema(DataMapSchema dataMapSchema) {
+    this.dataMapSchema = dataMapSchema;
   }
 
   public void setLocations(String[] locations) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 8d80b4d..ca5da1e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -22,14 +22,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
@@ -55,48 +59,76 @@ public final class DataMapStoreManager {
 
   }
 
-  public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) {
-    return allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
+  /**
+   * It gives all datamaps of type @mapType except the default datamap.
+   *
+   */
+  public List<TableDataMap> getAllDataMap(CarbonTable carbonTable, DataMapType mapType) {
+    List<TableDataMap> dataMaps = new ArrayList<>();
+    List<TableDataMap> tableDataMaps = getAllDataMap(carbonTable);
+    if (tableDataMaps != null) {
+      for (TableDataMap dataMap : tableDataMaps) {
+        if (mapType == dataMap.getDataMapFactory().getDataMapType()) {
+          dataMaps.add(dataMap);
+        }
+      }
+    }
+    return dataMaps;
   }
 
-  // TODO its a temporary method till chooser is implemented
-  public TableDataMap chooseDataMap(AbsoluteTableIdentifier identifier) {
-    List<TableDataMap> tableDataMaps = getAllDataMap(identifier);
-    if (tableDataMaps != null && tableDataMaps.size() > 0) {
-      for (TableDataMap dataMap: tableDataMaps) {
-        if (!dataMap.getDataMapName().equalsIgnoreCase(BlockletDataMap.NAME)) {
-          return dataMap;
+  /**
+   * It gives all datamaps except the default datamap.
+   *
+   * @return
+   */
+  public List<TableDataMap> getAllDataMap(CarbonTable carbonTable) {
+    List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList();
+    List<TableDataMap> dataMaps = new ArrayList<>();
+    if (dataMapSchemaList != null) {
+      for (DataMapSchema dataMapSchema : dataMapSchemaList) {
+        if (!dataMapSchema.getClassName()
+            .equalsIgnoreCase(CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA)) {
+          dataMaps.add(getDataMap(carbonTable.getAbsoluteTableIdentifier(), dataMapSchema));
         }
       }
-      return tableDataMaps.get(0);
-    } else {
-      return getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
     }
+    return dataMaps;
   }
 
   /**
-   * Get the datamap for reading data.
+   * It gives the default datamap of the table. Default datamap of any table is BlockletDataMap
    *
-   * @param dataMapName
-   * @param factoryClass
+   * @param identifier
    * @return
    */
-  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier,
-      String dataMapName, String factoryClass) {
+  public TableDataMap getDefaultDataMap(AbsoluteTableIdentifier identifier) {
+    return getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA);
+  }
+
+  /**
+   * Get the datamap for reading data.
+   */
+  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
     String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
-    TableDataMap dataMap;
-    if (tableDataMaps == null) {
+    TableDataMap dataMap = null;
+    if (tableDataMaps != null) {
+      dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableDataMaps);
+    }
+    if (dataMap == null) {
       synchronized (table.intern()) {
         tableDataMaps = allDataMaps.get(table);
-        if (tableDataMaps == null) {
-          dataMap = createAndRegisterDataMap(identifier, factoryClass, dataMapName);
-        } else {
-          dataMap = getTableDataMap(dataMapName, tableDataMaps);
+        if (tableDataMaps != null) {
+          dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableDataMaps);
+        }
+        if (dataMap == null) {
+          try {
+            dataMap = createAndRegisterDataMap(identifier, dataMapSchema);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
         }
       }
-    } else {
-      dataMap = getTableDataMap(dataMapName, tableDataMaps);
     }
 
     if (dataMap == null) {
@@ -110,7 +142,8 @@ public final class DataMapStoreManager {
    * The datamap is created using datamap name, datamap factory class and table identifier.
    */
   public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
-      String factoryClassName, String dataMapName) {
+      DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
     String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(identifier);
@@ -118,16 +151,20 @@ public final class DataMapStoreManager {
     if (tableDataMaps == null) {
       tableDataMaps = new ArrayList<>();
     }
+    String dataMapName = dataMapSchema.getDataMapName();
     TableDataMap dataMap = getTableDataMap(dataMapName, tableDataMaps);
-    if (dataMap != null && dataMap.getDataMapName().equalsIgnoreCase(dataMapName)) {
-      throw new RuntimeException("Already datamap exists in that path with type " + dataMapName);
+    if (dataMap != null && dataMap.getDataMapSchema().getDataMapName()
+        .equalsIgnoreCase(dataMapName)) {
+      throw new MalformedDataMapCommandException("Already datamap exists in that path with type " +
+          dataMapName);
     }
 
     try {
+      // try to create datamap by reflection to test whether it is a valid DataMapFactory class
       Class<? extends DataMapFactory> factoryClass =
-          (Class<? extends DataMapFactory>) Class.forName(factoryClassName);
+          (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getClassName());
       DataMapFactory dataMapFactory = factoryClass.newInstance();
-      dataMapFactory.init(identifier, dataMapName);
+      dataMapFactory.init(identifier, dataMapSchema);
       BlockletDetailsFetcher blockletDetailsFetcher;
       SegmentPropertiesFetcher segmentPropertiesFetcher = null;
       if (dataMapFactory instanceof BlockletDetailsFetcher) {
@@ -136,11 +173,14 @@ public final class DataMapStoreManager {
         blockletDetailsFetcher = getBlockletDetailsFetcher(identifier);
       }
       segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher;
-      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher,
+      dataMap = new TableDataMap(identifier, dataMapSchema, dataMapFactory, blockletDetailsFetcher,
           segmentPropertiesFetcher);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new MalformedDataMapCommandException("DataMap class '" +
+          dataMapSchema.getClassName() + "' not found");
+    } catch (Throwable e) {
+      throw new MetadataProcessException(
+          "failed to create DataMap instance for '" + dataMapSchema.getClassName() + "'", e);
     }
     tableDataMaps.add(dataMap);
     allDataMaps.put(table, tableDataMaps);
@@ -150,8 +190,7 @@ public final class DataMapStoreManager {
   private TableDataMap getTableDataMap(String dataMapName, List<TableDataMap> tableDataMaps) {
     TableDataMap dataMap = null;
     for (TableDataMap tableDataMap : tableDataMaps) {
-      if (tableDataMap.getDataMapName().equals(dataMapName) || (!tableDataMap.getDataMapName()
-          .equals(""))) {
+      if (tableDataMap.getDataMapSchema().getDataMapName().equals(dataMapName)) {
         dataMap = tableDataMap;
         break;
       }
@@ -160,16 +199,30 @@ public final class DataMapStoreManager {
   }
 
   /**
+   * Clear the invalid segments from all the datamaps of the table
+   * @param carbonTable
+   * @param segments
+   */
+  public void clearInvalidSegments(CarbonTable carbonTable, List<String> segments) {
+    getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()).clear(segments);
+    List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
+    for (TableDataMap dataMap: allDataMap) {
+      dataMap.clear(segments);
+    }
+
+  }
+
+  /**
    * Clear the datamap/datamaps of a table from memory
+   *
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier) {
     String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
-    List<TableDataMap> tableDataMaps =
-        allDataMaps.get(tableUniqueName);
+    List<TableDataMap> tableDataMaps = allDataMaps.get(tableUniqueName);
     segmentRefreshMap.remove(identifier.uniqueName());
     if (tableDataMaps != null) {
-      for (TableDataMap tableDataMap: tableDataMaps) {
+      for (TableDataMap tableDataMap : tableDataMaps) {
         if (tableDataMap != null) {
           tableDataMap.clear();
           break;
@@ -181,6 +234,7 @@ public final class DataMapStoreManager {
 
   /**
    * Clear the datamap/datamaps of a table from memory
+   *
    * @param identifier Table identifier
    */
   public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
@@ -188,8 +242,9 @@ public final class DataMapStoreManager {
         allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
     if (tableDataMaps != null) {
       int i = 0;
-      for (TableDataMap tableDataMap: tableDataMaps) {
-        if (tableDataMap != null && dataMapName.equalsIgnoreCase(tableDataMap.getDataMapName())) {
+      for (TableDataMap tableDataMap : tableDataMaps) {
+        if (tableDataMap != null && dataMapName
+            .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
           tableDataMap.clear();
           tableDataMaps.remove(i);
           break;
@@ -201,17 +256,18 @@ public final class DataMapStoreManager {
 
   /**
    * Get the blocklet datamap factory to get the detail information of blocklets
+   *
    * @param identifier
    * @return
    */
   private BlockletDetailsFetcher getBlockletDetailsFetcher(AbsoluteTableIdentifier identifier) {
-    TableDataMap blockletMap =
-        getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+    TableDataMap blockletMap = getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA);
     return (BlockletDetailsFetcher) blockletMap.getDataMapFactory();
   }
 
   /**
    * Returns the singleton instance
+   *
    * @return
    */
   public static DataMapStoreManager getInstance() {
@@ -271,7 +327,7 @@ public final class DataMapStoreManager {
     }
 
     public void refreshSegments(List<String> segmentIds) {
-      for (String segmentId: segmentIds) {
+      for (String segmentId : segmentIds) {
         manualSegmentRefresh.put(segmentId, true);
       }
     }
@@ -286,5 +342,4 @@ public final class DataMapStoreManager {
     }
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 42fc702..d4a8a22 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.FineGrainBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.events.OperationContext;
@@ -45,7 +46,7 @@ public final class TableDataMap extends OperationEventListener {
 
   private AbsoluteTableIdentifier identifier;
 
-  private String dataMapName;
+  private DataMapSchema dataMapSchema;
 
   private DataMapFactory dataMapFactory;
 
@@ -56,11 +57,11 @@ public final class TableDataMap extends OperationEventListener {
   /**
    * It is called to initialize and load the required table datamap metadata.
    */
-  public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
+  public TableDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema,
       DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher,
       SegmentPropertiesFetcher segmentPropertiesFetcher) {
     this.identifier = identifier;
-    this.dataMapName = dataMapName;
+    this.dataMapSchema = dataMapSchema;
     this.dataMapFactory = dataMapFactory;
     this.blockletDetailsFetcher = blockletDetailsFetcher;
     this.segmentPropertiesFetcher = segmentPropertiesFetcher;
@@ -115,10 +116,9 @@ public final class TableDataMap extends OperationEventListener {
     for (String segmentsId : segmentIds) {
       List<DataMapDistributable> list = dataMapFactory.toDistributable(segmentsId);
       for (DataMapDistributable distributable: list) {
-        distributable.setDataMapName(dataMapName);
+        distributable.setDataMapSchema(dataMapSchema);
         distributable.setSegmentId(segmentsId);
         distributable.setTablePath(identifier.getTablePath());
-        distributable.setDataMapFactoryClass(dataMapFactory.getClass().getName());
       }
       distributables.addAll(list);
     }
@@ -147,7 +147,8 @@ public final class TableDataMap extends OperationEventListener {
     }
     BlockletSerializer serializer = new BlockletSerializer();
     String writePath =
-        identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapName;
+        identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + dataMapSchema
+            .getDataMapName();
     if (dataMapFactory.getDataMapType() == DataMapType.FG) {
       FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
     }
@@ -182,13 +183,9 @@ public final class TableDataMap extends OperationEventListener {
   public void clear() {
     dataMapFactory.clear();
   }
-  /**
-   * Get the unique name of datamap
-   *
-   * @return
-   */
-  public String getDataMapName() {
-    return dataMapName;
+
+  public DataMapSchema getDataMapSchema() {
+    return dataMapSchema;
   }
 
   public DataMapFactory getDataMapFactory() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index e900f8a..0aebe9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.events.Event;
 
 /**
@@ -33,7 +34,7 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  void init(AbsoluteTableIdentifier identifier, String dataMapName);
+  void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema);
 
   /**
    * Return a new write for this datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
new file mode 100644
index 0000000..458772f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * And expression for datamaps
+ */
+public class AndDataMapExprWrapper implements DataMapExprWrapper {
+
+  private DataMapExprWrapper left;
+
+  private DataMapExprWrapper right;
+
+  private FilterResolverIntf resolverIntf;
+
+  public AndDataMapExprWrapper(DataMapExprWrapper left, DataMapExprWrapper right,
+      FilterResolverIntf resolverIntf) {
+    this.left = left;
+    this.right = right;
+    this.resolverIntf = resolverIntf;
+  }
+
+  @Override public List<ExtendedBlocklet> prune(List<String> segments,
+      List<String> partitionsToPrune) throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
+    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
+    List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
+    for (ExtendedBlocklet blocklet : leftPrune) {
+      if (rightPrune.contains(blocklet)) {
+        andBlocklets.add(blocklet);
+      }
+    }
+    return andBlocklets;
+  }
+
+  @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
+      throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);
+    List<ExtendedBlocklet> rightPrune = right.pruneBlocklets(blocklets);
+    List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
+    for (ExtendedBlocklet blocklet : leftPrune) {
+      if (rightPrune.contains(blocklet)) {
+        andBlocklets.add(blocklet);
+      }
+    }
+    return andBlocklets;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf() {
+    return resolverIntf;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf(String uniqueId) {
+    FilterResolverIntf leftExp = left.getFilterResolverIntf(uniqueId);
+    FilterResolverIntf rightExp = right.getFilterResolverIntf(uniqueId);
+    if (leftExp != null) {
+      return leftExp;
+    } else if (rightExp != null) {
+      return rightExp;
+    }
+    return null;
+  }
+
+  @Override public List<DataMapDistributableWrapper> toDistributable(List<String> segments)
+      throws IOException {
+    List<DataMapDistributableWrapper> wrappers = new ArrayList<>();
+    wrappers.addAll(left.toDistributable(segments));
+    wrappers.addAll(right.toDistributable(segments));
+    return wrappers;
+  }
+
+  @Override public DataMapType getDataMapType() {
+    return left.getDataMapType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapDistributableWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapDistributableWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapDistributableWrapper.java
new file mode 100644
index 0000000..9075032
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapDistributableWrapper.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class DataMapDistributableWrapper extends InputSplit implements Serializable {
+
+  private String uniqueId;
+
+  private DataMapDistributable distributable;
+
+  public DataMapDistributableWrapper(String uniqueId, DataMapDistributable distributable) {
+    this.uniqueId = uniqueId;
+    this.distributable = distributable;
+  }
+
+  public String getUniqueId() {
+    return uniqueId;
+  }
+
+  public DataMapDistributable getDistributable() {
+    return distributable;
+  }
+
+  public void setDistributable(DataMapDistributable distributable) {
+    this.distributable = distributable;
+  }
+
+  @Override public long getLength() throws IOException, InterruptedException {
+    return distributable.getLength();
+  }
+
+  @Override public String[] getLocations() throws IOException, InterruptedException {
+    return distributable.getLocations();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
new file mode 100644
index 0000000..36c2472
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * It is the wrapper around datamap and related filter expression. By using it user can apply
+ * datamaps in expression style.
+ */
+public interface DataMapExprWrapper extends Serializable {
+
+  /**
+   * It get the blocklets from each leaf node datamap and apply expressions on the blocklets
+   * using list of segments, it is used in case on non distributable datamap.
+   */
+  List<ExtendedBlocklet> prune(List<String> segments, List<String> partitionsToPrune)
+      throws IOException;
+
+  /**
+   * It is used in case on distributable datamap. First using job it gets all blockets from all
+   * related datamaps. These blocklets are passed to this method to apply expression.
+   * @param blocklets
+   * @return
+   * @throws IOException
+   */
+  List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets) throws IOException;
+
+  /**
+   * Get the underlying filter expression.
+   * @return
+   */
+  FilterResolverIntf getFilterResolverIntf();
+
+  /**
+   * Convert to distributable objects for executing job.
+   * @param segments
+   * @return
+   * @throws IOException
+   */
+  List<DataMapDistributableWrapper> toDistributable(List<String> segments) throws IOException;
+
+  /**
+   * Each leaf node is identified by uniqueid, so if user wants the underlying filter expression for
+   * any leaf node then this method can be used.
+   * @param uniqueId
+   * @return
+   */
+  FilterResolverIntf getFilterResolverIntf(String uniqueId);
+
+  /**
+   * Get the datamap type.
+   * @return
+   */
+  DataMapType getDataMapType();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
new file mode 100644
index 0000000..a66d31b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+public class DataMapExprWrapperImpl implements DataMapExprWrapper {
+
+  private static final long serialVersionUID = -6240385328696074171L;
+
+  private transient TableDataMap dataMap;
+
+  private FilterResolverIntf expression;
+
+  private String uniqueId;
+
+  public DataMapExprWrapperImpl(TableDataMap dataMap, FilterResolverIntf expression) {
+    this.dataMap = dataMap;
+    this.expression = expression;
+    this.uniqueId = UUID.randomUUID().toString();
+  }
+
+  @Override public List<ExtendedBlocklet> prune(List<String> segments,
+      List<String> partitionsToPrune) throws IOException {
+    return dataMap.prune(segments, expression, partitionsToPrune);
+  }
+
+  @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
+      throws IOException {
+    List<ExtendedBlocklet> blockletList = new ArrayList<>();
+    for (ExtendedBlocklet blocklet: blocklets) {
+      if (blocklet.getDataMapUniqueId().equals(uniqueId)) {
+        blockletList.add(blocklet);
+      }
+    }
+    return blockletList;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf() {
+    return expression;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf(String uniqueId) {
+    if (this.uniqueId.equals(uniqueId)) {
+      return expression;
+    }
+    return null;
+  }
+
+  @Override public List<DataMapDistributableWrapper> toDistributable(List<String> segments)
+      throws IOException {
+    List<DataMapDistributable> dataMapDistributables = dataMap.toDistributable(segments);
+    List<DataMapDistributableWrapper> wrappers = new ArrayList<>();
+    for (DataMapDistributable distributable : dataMapDistributables) {
+      wrappers.add(new DataMapDistributableWrapper(uniqueId, distributable));
+    }
+    return wrappers;
+  }
+
+  @Override public DataMapType getDataMapType() {
+    return dataMap.getDataMapFactory().getDataMapType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
new file mode 100644
index 0000000..1d90f0d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap.dev.expr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * Or expression for datamaps
+ */
+public class OrDataMapExprWrapper implements DataMapExprWrapper {
+
+  private DataMapExprWrapper left;
+
+  private DataMapExprWrapper right;
+
+  private FilterResolverIntf resolverIntf;
+
+  public OrDataMapExprWrapper(DataMapExprWrapper left, DataMapExprWrapper right,
+      FilterResolverIntf resolverIntf) {
+    this.left = left;
+    this.right = right;
+    this.resolverIntf = resolverIntf;
+  }
+
+  @Override public List<ExtendedBlocklet> prune(List<String> segments,
+      List<String> partitionsToPrune) throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
+    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
+    Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
+    andBlocklets.addAll(leftPrune);
+    andBlocklets.addAll(rightPrune);
+    return new ArrayList<>(andBlocklets);
+  }
+
+  @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
+      throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);
+    List<ExtendedBlocklet> rightPrune = right.pruneBlocklets(blocklets);
+    Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
+    andBlocklets.addAll(leftPrune);
+    andBlocklets.addAll(rightPrune);
+    return new ArrayList<>(andBlocklets);
+  }
+
+  @Override public List<DataMapDistributableWrapper> toDistributable(List<String> segments)
+      throws IOException {
+    List<DataMapDistributableWrapper> wrappers = new ArrayList<>();
+    wrappers.addAll(left.toDistributable(segments));
+    wrappers.addAll(right.toDistributable(segments));
+    return wrappers;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf() {
+    return resolverIntf;
+  }
+
+  @Override public FilterResolverIntf getFilterResolverIntf(String uniqueId) {
+    FilterResolverIntf leftExp = left.getFilterResolverIntf(uniqueId);
+    FilterResolverIntf rightExp = right.getFilterResolverIntf(uniqueId);
+    if (leftExp != null) {
+      return leftExp;
+    } else if (rightExp != null) {
+      return rightExp;
+    }
+    return null;
+  }
+
+
+  @Override public DataMapType getDataMapType() {
+    return left.getDataMapType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index eb36c8d..93da81e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -38,7 +39,14 @@ public class TableSpec {
   // number of simple dimensions
   private int numSimpleDimensions;
 
-  public TableSpec(List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
+  private CarbonTable carbonTable;
+
+  public TableSpec(CarbonTable carbonTable) {
+    this.carbonTable = carbonTable;
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     // first calculate total number of columnar field considering column group and complex column
     numSimpleDimensions = 0;
     for (CarbonDimension dimension : dimensions) {
@@ -112,6 +120,10 @@ public class TableSpec {
     return measureSpec.length;
   }
 
+  public CarbonTable getCarbonTable() {
+    return carbonTable;
+  }
+
   public static class ColumnSpec implements Writable {
     // field name of this column
     private String fieldName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index c731e07..052d269 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -58,4 +58,24 @@ public class Blocklet implements Writable,Serializable {
     blockId = in.readUTF();
     blockletId = in.readUTF();
   }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    Blocklet blocklet = (Blocklet) o;
+
+    if (blockId != null ? !blockId.equals(blocklet.blockId) : blocklet.blockId != null) {
+      return false;
+    }
+    return blockletId != null ?
+        blockletId.equals(blocklet.blockletId) :
+        blocklet.blockletId == null;
+  }
+
+  @Override public int hashCode() {
+    int result = blockId != null ? blockId.hashCode() : 0;
+    result = 31 * result + (blockletId != null ? blockletId.hashCode() : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 58a9344..d2af5cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -29,13 +29,12 @@ public class ExtendedBlocklet extends Blocklet {
 
   private String[] location;
 
-  private String path;
-
   private String dataMapWriterPath;
 
+  private String dataMapUniqueId;
+
   public ExtendedBlocklet(String path, String blockletId) {
     super(path, blockletId);
-    this.path = path;
   }
 
   public BlockletDetailInfo getDetailInfo() {
@@ -67,7 +66,7 @@ public class ExtendedBlocklet extends Blocklet {
   }
 
   public String getPath() {
-    return path;
+    return getBlockId();
   }
 
   public String getDataMapWriterPath() {
@@ -77,4 +76,30 @@ public class ExtendedBlocklet extends Blocklet {
   public void setDataMapWriterPath(String dataMapWriterPath) {
     this.dataMapWriterPath = dataMapWriterPath;
   }
+
+  public String getDataMapUniqueId() {
+    return dataMapUniqueId;
+  }
+
+  public void setDataMapUniqueId(String dataMapUniqueId) {
+    this.dataMapUniqueId = dataMapUniqueId;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) {
+      return false;
+    }
+
+    ExtendedBlocklet that = (ExtendedBlocklet) o;
+
+    return segmentId != null ? segmentId.equals(that.segmentId) : that.segmentId == null;
+  }
+
+  @Override public int hashCode() {
+    int result = super.hashCode();
+    result = 31 * result + (segmentId != null ? segmentId.hashCode() : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
index 266120e..229e5bf 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
@@ -117,4 +117,12 @@ public class FineGrainBlocklet extends Blocklet implements Serializable {
       pages.add(page);
     }
   }
+
+  @Override public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 99a47ff..aceb372 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -78,8 +78,6 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
 
-  public static final String NAME = "clustered.btree.blocklet";
-
   private static int KEY_INDEX = 0;
 
   private static int MIN_VALUES_INDEX = 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index f6b8165..664242b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -55,6 +56,11 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
     implements BlockletDetailsFetcher,
     SegmentPropertiesFetcher {
 
+  private static final String NAME = "clustered.btree.blocklet";
+
+  public static final DataMapSchema DATA_MAP_SCHEMA =
+      new DataMapSchema(NAME, BlockletDataMapFactory.class.getName());
+
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
@@ -63,7 +69,7 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
   private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainDataMap> cache;
 
   @Override
-  public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+  public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
     this.identifier = identifier;
     cache = CacheProvider.getInstance()
         .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
new file mode 100644
index 0000000..18c7374
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.expression.conditional;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+public class StartsWithExpression extends BinaryConditionalExpression {
+  private static final long serialVersionUID = -5319109756575539219L;
+
+  public StartsWithExpression(Expression left, Expression right) {
+    super(left, right);
+  }
+
+  @Override public ExpressionResult evaluate(RowIntf value)
+      throws FilterUnsupportedException, FilterIllegalMemberException {
+    ExpressionResult exprLeftRes = left.evaluate(value);
+    ExpressionResult exprRightRes = right.evaluate(value);
+    ExpressionResult val1 = exprLeftRes;
+    if (exprLeftRes.isNull() || exprRightRes.isNull()) {
+      exprLeftRes.set(DataTypes.BOOLEAN, false);
+      return exprLeftRes;
+    }
+    if (exprLeftRes.getDataType() != exprRightRes.getDataType()) {
+      if (exprLeftRes.getDataType().getPrecedenceOrder() < exprRightRes.getDataType()
+          .getPrecedenceOrder()) {
+        val1 = exprRightRes;
+      }
+
+    }
+    boolean result = false;
+    DataType dataType = val1.getDataType();
+    if (dataType == DataTypes.STRING) {
+      result = exprLeftRes.getString().startsWith(exprRightRes.getString());
+    } else {
+      throw new FilterUnsupportedException(
+          "DataType: " + val1.getDataType() + " not supported for the filter expression");
+    }
+    val1.set(DataTypes.BOOLEAN, result);
+    return val1;
+  }
+
+  @Override public ExpressionType getFilterExpressionType() {
+    return ExpressionType.STARTSWITH;
+  }
+
+  @Override public String getString() {
+    return "StartsWith(" + left.getString() + ',' + right.getString() + ')';
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 72ca1a4..b96603a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -46,7 +46,9 @@ import org.apache.carbondata.core.scan.expression.conditional.InExpression;
 import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
 import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import org.apache.carbondata.core.scan.expression.conditional.StartsWithExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
@@ -369,7 +371,23 @@ public class FilterExpressionProcessor implements FilterProcessor {
       case LESSTHAN_EQUALTO:
         return getFilterResolverBasedOnExpressionType(ExpressionType.EQUALS, true, expressionTree,
             tableIdentifier, expressionTree);
-
+      case STARTSWITH:
+        assert (expressionTree instanceof StartsWithExpression);
+        currentExpression = (StartsWithExpression) expressionTree;
+        Expression re = currentExpression.getRight();
+        assert (re instanceof LiteralExpression);
+        LiteralExpression literal = (LiteralExpression) re;
+        String value = literal.getLiteralExpValue().toString();
+        Expression left = new GreaterThanEqualToExpression(currentExpression.getLeft(), literal);
+        String maxValueLimit = value.substring(0, value.length() - 1) + (char) (
+            ((int) value.charAt(value.length() - 1)) + 1);
+        Expression right = new LessThanExpression(currentExpression.getLeft(),
+            new LiteralExpression(maxValueLimit, literal.getLiteralExpDataType()));
+        currentExpression = new AndExpression(left, right);
+        return new LogicalFilterResolverImpl(
+            createFilterResolverTree(currentExpression.getLeft(), tableIdentifier),
+            createFilterResolverTree(currentExpression.getRight(), tableIdentifier),
+            currentExpression);
       case NOT_EQUALS:
       case NOT_IN:
         return getFilterResolverBasedOnExpressionType(ExpressionType.NOT_EQUALS, false,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
index a3f9199..831acc8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
@@ -39,6 +39,8 @@ public enum ExpressionType {
   LITERAL,
   RANGE,
   FALSE,
-  TRUE
-
+  TRUE,
+  STARTSWITH,
+  ENDSWITH,
+  CONTAINSWITH
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 2e73aef..439df0d 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -360,9 +360,8 @@ public class SegmentStatusManager {
         LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
 
         if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) {
-          // log error.
-          LOG.error("Error message: " + "Load metadata file is not present.");
-          invalidLoadTimestamps.add(loadDate);
+          // Table status file is not present, maybe table is empty, ignore this operation
+          LOG.warn("Trying to update table metadata file which is not present.");
           return invalidLoadTimestamps;
         }
         // read existing metadata details in load metadata.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index 5203cb3..925731a 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataM
 import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.events.Event;
 
@@ -41,7 +42,7 @@ public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
 
   private AbsoluteTableIdentifier identifier;
 
-  @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) {
+  @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
     this.identifier = identifier;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ad5a1c9..04902f9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,14 +33,15 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapChooser;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.DataMapType;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -338,9 +339,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
-    TableDataMap blockletMap =
-        DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
-            BlockletDataMapFactory.class.getName());
     List<String> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<String> streamSegments = null;
@@ -368,7 +366,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
       }
       if (invalidSegments.size() > 0) {
-        blockletMap.clear(invalidSegments);
+        DataMapStoreManager.getInstance()
+            .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), invalidSegments);
       }
     }
 
@@ -392,7 +391,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         toBeCleanedSegments.add(segment);
       }
     }
-    blockletMap.clear(toBeCleanedSegments);
+    if (toBeCleanedSegments.size() > 0) {
+      DataMapStoreManager.getInstance()
+          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
+              toBeCleanedSegments);
+    }
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
@@ -720,19 +723,21 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
             CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
-    TableDataMap blockletMap =
-        DataMapStoreManager.getInstance().chooseDataMap(absoluteTableIdentifier);
+    DataMapExprWrapper dataMapExprWrapper =
+        DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<String> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
-    if (distributedCG || blockletMap.getDataMapFactory().getDataMapType() == DataMapType.FG) {
+    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapType.FG) {
       DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, blockletMap.getDataMapName(),
+          new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
               segmentIds, partitionsToPrune,
               BlockletDataMapFactory.class.getName());
       prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+      // Apply expression on the blocklets.
+      prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
     } else {
-      prunedBlocklets = blockletMap.prune(segmentIds, resolver, partitionsToPrune);
+      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
     }
 
     List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
@@ -897,8 +902,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier,
       List<String> partitions) throws IOException {
-    TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+    TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(identifier);
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
         new SegmentStatusManager(identifier).getValidAndInvalidSegments();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index fad2336..64936aa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -29,6 +29,6 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 public interface DataMapJob extends Serializable {
 
   List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
-      FilterResolverIntf resolverIntf);
+      FilterResolverIntf filter);
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e616162c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 96eec6f..596f013 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -22,9 +22,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -47,7 +48,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private AbsoluteTableIdentifier identifier;
 
-  private String dataMapName;
+  private DataMapExprWrapper dataMapExprWrapper;
 
   private List<String> validSegments;
 
@@ -55,10 +56,11 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private List<String> partitions;
 
-  public DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
-      String dataMapName, List<String> validSegments, List<String> partitions, String className) {
+  DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
+      DataMapExprWrapper dataMapExprWrapper, List<String> validSegments, List<String> partitions,
+      String className) {
     this.identifier = identifier;
-    this.dataMapName = dataMapName;
+    this.dataMapExprWrapper = dataMapExprWrapper;
     this.validSegments = validSegments;
     this.className = className;
     this.partitions = partitions;
@@ -83,9 +85,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
-    TableDataMap dataMap =
-        DataMapStoreManager.getInstance().getDataMap(identifier, dataMapName, className);
-    List<DataMapDistributable> distributables = dataMap.toDistributable(validSegments);
+    List<DataMapDistributableWrapper> distributables =
+        dataMapExprWrapper.toDistributable(validSegments);
     List<InputSplit> inputSplits = new ArrayList<>(distributables.size());
     inputSplits.addAll(distributables);
     return inputSplits;
@@ -101,13 +102,16 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
       @Override
       public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
-        DataMapDistributable distributable = (DataMapDistributable)inputSplit;
+        DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
         TableDataMap dataMap = DataMapStoreManager.getInstance()
-            .getDataMap(identifier, distributable.getDataMapName(),
-                distributable.getDataMapFactoryClass());
-        blockletIterator = dataMap.prune(
-            distributable, getFilterExp(taskAttemptContext.getConfiguration()), partitions)
-            .iterator();
+            .getDataMap(identifier, distributable.getDistributable().getDataMapSchema());
+        List<ExtendedBlocklet> blocklets = dataMap.prune(
+            distributable.getDistributable(),
+            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
+        for (ExtendedBlocklet blocklet: blocklets) {
+          blocklet.setDataMapUniqueId(distributable.getUniqueId());
+        }
+        blockletIterator = blocklets.iterator();
       }
 
       @Override