You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/04/22 07:09:19 UTC

[carbondata] branch master updated: [CARBONDATA-4158]Add Secondary Index as a coarse-grain index and use secondary indexes for Presto queries

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 09ad509  [CARBONDATA-4158]Add Secondary Index as a coarse-grain index and use secondary indexes for Presto queries
09ad509 is described below

commit 09ad509d67fbe752892c7180fb8a10cd62fef465
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Wed Mar 10 00:23:07 2021 +0530

    [CARBONDATA-4158]Add Secondary Index as a coarse-grain index and use secondary indexes for Presto queries
    
    Why is this PR needed?
    At present, secondary indexes are leveraged for query pruning via spark plan modification.
    This approach is tightly coupled with spark because the plan modification is specific to
    spark engine. In order to use secondary indexes for Presto or Hive queries, it is not
    feasible to modify the query plans as we desire in the current approach. Thus need arises
    for an engine agnostic approach to use secondary indexes in query pruning.
    
    What changes were proposed in this PR?
    1. Add Secondary Index as a coarse grain index.
    2. Add a new insegment() UDF to support query within the particular segments
    3. Control the use of Secondary Index as a coarse grain index pruning with
    property('carbon.coarse.grain.secondary.index')
    4. Use Index Server driver for Secondary Index pruning
    5. Use Secondary Indexes with Presto Queries
    
    This closes #4110
---
 README.md                                          |   1 +
 .../core/constants/CarbonCommonConstants.java      |  28 ++++
 .../carbondata/core/index/AbstractIndexJob.java    |  10 ++
 .../apache/carbondata/core/index/IndexChooser.java |  17 +-
 .../apache/carbondata/core/index/IndexFilter.java  |   2 +-
 .../carbondata/core/index/IndexInputFormat.java    |  14 +-
 .../org/apache/carbondata/core/index/IndexJob.java |   2 +
 .../carbondata/core/index/IndexStoreManager.java   |  12 +-
 .../apache/carbondata/core/index/IndexUtil.java    |  12 +-
 .../secondaryindex}/CarbonCostBasedOptimizer.java  |  50 +++++-
 .../carbondata/core/metadata/index/IndexType.java  |   4 +-
 .../metadata/schema/indextable/IndexTableInfo.java |  13 +-
 .../core/scan/expression/ExpressionResult.java     |  24 +--
 .../core/scan/expression/LiteralExpression.java    |  17 +-
 .../conditional/NotEqualsExpression.java           |   2 +-
 .../carbondata/core/util/CarbonProperties.java     |  28 ++++
 .../apache/carbondata/core/util/CarbonUtil.java    |  34 ++++
 .../apache/carbondata/core/util/SessionParams.java |   6 +-
 docs/index/secondary-index-guide.md                |  29 ++++
 .../carbondata/hadoop/api/CarbonInputFormat.java   |  46 +++++-
 .../org/apache/carbon/flink/TestCarbonWriter.scala |   2 +-
 .../presto/CarbondataPageSourceProvider.java       |   4 +
 .../carbondata/presto/CarbondataSplitManager.java  |   3 +
 .../carbondata/presto/impl/CarbonTableReader.java  |  56 +++++++
 .../presto/CarbondataPageSourceProvider.java       |   4 +
 .../carbondata/presto/CarbondataSplitManager.java  |   3 +
 .../carbondata/presto/impl/CarbonTableReader.java  |  56 +++++++
 .../carbondata/index/secondary/SecondaryIndex.java | 125 +++++++++++++++
 .../index/secondary/SecondaryIndexFactory.java     | 172 +++++++++++++++++++++
 .../index/secondary/SecondaryIndexModel.java       | 103 ++++++++++++
 .../indexserver/DistributedRDDUtils.scala          |  37 ++++-
 .../indexserver/DistributedShowCacheRDD.scala      |   2 +-
 .../carbondata/indexserver/IndexServer.scala       |  23 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  13 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   3 +
 .../command/index/CarbonCreateIndexCommand.scala   |   4 +-
 .../command/index/CarbonRefreshIndexCommand.scala  |   4 +-
 .../execution/strategy/CarbonDataSourceScan.scala  |   6 +-
 .../execution/strategy/CarbonSourceStrategy.scala  |  13 +-
 .../execution/command/CarbonHiveCommands.scala     |   3 +-
 .../apache/spark/sql/index/CarbonIndexUtil.scala   |  92 ++++++++++-
 .../secondaryindex/command/SICreationCommand.scala |   8 +
 .../jobs/StringProjectionQueryJob.scala            |  34 ++--
 .../spark/sql/secondaryindex/load/Compactor.scala  |   6 +
 .../optimizer/CarbonSITransformationRule.scala     |  13 +-
 .../optimizer/CarbonSecondaryIndexOptimizer.scala  |   1 +
 .../secondaryindex/rdd/SecondaryIndexCreator.scala |   8 +
 .../index/bloom/BloomCoarseGrainIndexSuite.scala   |   4 +-
 .../complexType/TestArrayContainsPushDown.scala    |  12 +-
 .../processing/index/IndexWriterListener.java      |   6 +-
 50 files changed, 1067 insertions(+), 104 deletions(-)

diff --git a/README.md b/README.md
index f706e1c..4f47546 100644
--- a/README.md
+++ b/README.md
@@ -76,6 +76,7 @@ Some features are marked as experimental because the syntax/implementation might
 2. Accelerating performance using MV on parquet/orc.
 3. Merge API for Spark DataFrame.
 4. Hive write for non-transactional table.
+5. Secondary Index as a Coarse Grain Index in query processing
 
 ##  Integration
 * [Hive](https://github.com/apache/carbondata/blob/master/docs/hive-guide.md)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 41a51b8..9840540 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2317,6 +2317,34 @@ public final class CarbonCommonConstants {
   public static final String CARBON_ENABLE_INDEX_SERVER = "carbon.enable.index.server";
 
   /**
+   * This property is used to support Secondary Index as a Coarse Grain Index.
+   * 1. The default value of this property is false for the spark session. By default, Spark queries
+   * continue to use secondary indexes in the query pruning via spark query plan rewrite. If
+   * user want to use secondary index as a Coarse Grain Index in spark query pruning, need to
+   * explicitly configure this property to true. Setting this configuration to true also avoids the
+   * query plan rewrite.
+   * 2. The default value of this property is true for Presto. By default, Presto queries use
+   * secondary index as a Coarse Grain Index in spark query pruning. If user do not wish to use the
+   * secondary indexes in the query pruning, need to explicitly configure this property to false.
+   *
+   * Property is supported at both session level and carbon level. It can be configured in 2
+   * variants as show below:
+   * 1. Global - carbon.coarse.grain.secondary.index
+   * 2. For a particular table - carbon.coarse.grain.secondary.index.<dbname>.<tablename>
+   * Property when specified along with database name and table name ensures that Secondary Index
+   * as Coarse Grain Index can be enabled/disable for queries on a particular table. Property with
+   * database name and table name has higher precedence over global.
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String CARBON_COARSE_GRAIN_SECONDARY_INDEX =
+      "carbon.coarse.grain.secondary.index";
+
+  /**
+   * This default value false is applicable only for the spark session
+   */
+  public static final String CARBON_COARSE_GRAIN_SECONDARY_INDEX_DEFAULT = "false";
+
+  /**
    * Configured property to enable/disable prepriming in index server
    */
   public static final String CARBON_INDEXSEVER_ENABLE_PREPRIMING =
diff --git a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java b/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
index 44f2df6..17c5c3a 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
@@ -37,8 +37,18 @@ public abstract class AbstractIndexJob implements IndexJob {
   }
 
   @Override
+  public Object[] execute(String sql) {
+    return new Object[0];
+  }
+
+  @Override
   public List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat,
       Configuration configuration) {
     return null;
   }
+
+  @Override
+  public Long executeCountJob(IndexInputFormat indexInputFormat, Configuration configuration) {
+    return 0L;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexChooser.java b/core/src/main/java/org/apache/carbondata/core/index/IndexChooser.java
index 3cc9af5..3b411b0 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexChooser.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
 import org.apache.carbondata.core.index.dev.expr.IndexExprWrapperImpl;
 import org.apache.carbondata.core.index.dev.expr.OrIndexExprWrapper;
 import org.apache.carbondata.core.index.status.IndexStatus;
+import org.apache.carbondata.core.metadata.index.IndexType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -65,7 +66,8 @@ public class IndexChooser {
   private List<TableIndex> cgIndexes;
   private List<TableIndex> fgIndexes;
 
-  public IndexChooser(CarbonTable carbonTable) throws IOException {
+  public IndexChooser(CarbonTable carbonTable, boolean isIncludeSecondaryIndexes)
+      throws IOException {
     this.carbonTable = carbonTable;
     // read all indexes for this table and populate CG and FG index list
     List<TableIndex> visibleIndexes = carbonTable.getAllVisibleIndexes();
@@ -76,7 +78,11 @@ public class IndexChooser {
           != null && visibleIndex.getIndexSchema().getProperties()
           .get(CarbonCommonConstants.INDEX_STATUS).equalsIgnoreCase(IndexStatus.ENABLED.name())) {
         IndexLevel level = visibleIndex.getIndexFactory().getIndexLevel();
+        String provider = visibleIndex.getIndexSchema().getProviderName();
         if (level == IndexLevel.CG) {
+          if (!isIncludeSecondaryIndexes && provider.equals(IndexType.SI.getIndexProviderName())) {
+            continue;
+          }
           cgIndexes.add(visibleIndex);
         } else {
           fgIndexes.add(visibleIndex);
@@ -331,6 +337,15 @@ public class IndexChooser {
 
     @Override
     public int compareTo(IndexTuple o) {
+      boolean isSecondaryIndex =
+          index.getIndexSchema().getProviderName().equals(IndexType.SI.getIndexProviderName());
+      boolean isOtherSecondaryIndex =
+          o.index.getIndexSchema().getProviderName().equals(IndexType.SI.getIndexProviderName());
+      if (isSecondaryIndex && !isOtherSecondaryIndex) {
+        return -1;
+      } else if (!isSecondaryIndex && isOtherSecondaryIndex) {
+        return 1;
+      }
       return order - o.order;
     }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java b/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
index c3fd782..1579a5e 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
@@ -134,7 +134,7 @@ public class IndexFilter implements Serializable {
     }
   }
 
-  private Set<String> extractColumnExpressions(Expression expression) {
+  public static Set<String> extractColumnExpressions(Expression expression) {
     Set<String> columnExpressionList = new HashSet<>();
     for (Expression expressions: expression.getChildren()) {
       if (expressions != null && expressions.getChildren() != null
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
index 072dbbc..1069e85 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
@@ -96,6 +96,8 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
   // Whether AsyncCall to the Index Server(true in the case of pre-priming)
   private boolean isAsyncCall;
 
+  private boolean isSIPruningEnabled;
+
   IndexInputFormat() {
 
   }
@@ -258,6 +260,7 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
     out.writeBoolean(isWriteToFile);
     out.writeBoolean(isCountStarJob);
     out.writeBoolean(isAsyncCall);
+    out.writeBoolean(isSIPruningEnabled);
   }
 
   @Override
@@ -305,6 +308,7 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
     this.isWriteToFile = in.readBoolean();
     this.isCountStarJob = in.readBoolean();
     this.isAsyncCall = in.readBoolean();
+    this.isSIPruningEnabled = in.readBoolean();
   }
 
   private void initReadCommittedScope() throws IOException {
@@ -342,6 +346,14 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
     return isJobToClearIndexes;
   }
 
+  public boolean isSIPruningEnabled() {
+    return isSIPruningEnabled;
+  }
+
+  public void setSIPruningEnabled(boolean SIPruningEnabled) {
+    isSIPruningEnabled = SIPruningEnabled;
+  }
+
   public String getTaskGroupId() {
     return taskGroupId;
   }
@@ -428,7 +440,7 @@ public class IndexInputFormat extends FileInputFormat<Void, ExtendedBlocklet>
 
   public void createIndexChooser() throws IOException {
     if (null != filterResolverIntf) {
-      this.indexChooser = new IndexChooser(table);
+      this.indexChooser = new IndexChooser(table, isSIPruningEnabled);
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java b/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
index 61cfde0..7b0b913 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
@@ -35,6 +35,8 @@ public interface IndexJob extends Serializable {
 
   void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletIndexWrapper> format);
 
+  Object[] execute(String sql);
+
   List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat, Configuration configuration);
 
   Long executeCountJob(IndexInputFormat indexInputFormat, Configuration configuration);
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java b/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
index 8d4be8f..321a774 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
@@ -37,7 +37,6 @@ import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.index.IndexType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.IndexSchema;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -95,13 +94,10 @@ public final class IndexStoreManager {
         .getIndexesMap().entrySet()) {
       for (Map.Entry<String, Map<String, String>> indexEntry : providerEntry.getValue()
           .entrySet()) {
-        if (!indexEntry.getValue().get(CarbonCommonConstants.INDEX_PROVIDER)
-            .equalsIgnoreCase(IndexType.SI.getIndexProviderName())) {
-          IndexSchema indexSchema = new IndexSchema(indexEntry.getKey(),
-              indexEntry.getValue().get(CarbonCommonConstants.INDEX_PROVIDER));
-          indexSchema.setProperties(indexEntry.getValue());
-          indexes.add(getIndex(carbonTable, indexSchema));
-        }
+        IndexSchema indexSchema = new IndexSchema(indexEntry.getKey(),
+            indexEntry.getValue().get(CarbonCommonConstants.INDEX_PROVIDER));
+        indexSchema.setProperties(indexEntry.getValue());
+        indexes.add(getIndex(carbonTable, indexSchema));
       }
     }
     return indexes;
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
index 87d2a40..e1b090d 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
@@ -221,6 +221,12 @@ public class IndexUtil {
     }
   }
 
+  public static Object[] getPositionReferences(String sql) {
+    IndexJob indexJob = (IndexJob) createIndexJob(
+        "org.apache.spark.sql.secondaryindex.jobs.StringProjectionQueryJob");
+    return indexJob.execute(sql);
+  }
+
   private static FileInputFormat createIndexJob(CarbonTable carbonTable,
       IndexExprWrapper indexExprWrapper, List<Segment> validSegments, String clsName) {
     try {
@@ -296,8 +302,7 @@ public class IndexUtil {
       List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level,
       List<String> segmentsToBeRefreshed, Configuration configuration) {
     return executeIndexJob(carbonTable, resolver, indexJob, partitionsToPrune, validSegments,
-        invalidSegments, level, false, segmentsToBeRefreshed, false,
-        configuration);
+        invalidSegments, level, false, segmentsToBeRefreshed, false, false, configuration);
   }
 
   /**
@@ -309,7 +314,7 @@ public class IndexUtil {
       FilterResolverIntf resolver, IndexJob indexJob, List<PartitionSpec> partitionsToPrune,
       List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level,
       Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob,
-      Configuration configuration) {
+      boolean isSIPruningEnabled, Configuration configuration) {
     List<String> invalidSegmentNo = new ArrayList<>();
     for (Segment segment : invalidSegments) {
       invalidSegmentNo.add(segment.getSegmentNo());
@@ -322,6 +327,7 @@ public class IndexUtil {
       indexInputFormat.setCountStarJob();
       indexInputFormat.setIsWriteToFile(false);
     }
+    indexInputFormat.setSIPruningEnabled(isSIPruningEnabled);
     return indexJob.execute(indexInputFormat, configuration);
   }
 
diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/optimizer/CarbonCostBasedOptimizer.java b/core/src/main/java/org/apache/carbondata/core/index/secondaryindex/CarbonCostBasedOptimizer.java
similarity index 63%
rename from integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/optimizer/CarbonCostBasedOptimizer.java
rename to core/src/main/java/org/apache/carbondata/core/index/secondaryindex/CarbonCostBasedOptimizer.java
index ca63f39..52d095e 100644
--- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/optimizer/CarbonCostBasedOptimizer.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/secondaryindex/CarbonCostBasedOptimizer.java
@@ -15,15 +15,63 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.secondaryindex.optimizer;
+package org.apache.carbondata.core.index.secondaryindex;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.index.IndexType;
+import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+
+import org.apache.log4j.Logger;
+
 public class CarbonCostBasedOptimizer {
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonCostBasedOptimizer.class.getName());
+  private static Map<String, List<String>> getSecondaryIndexes(TableInfo tableInfo) {
+    Map<String, List<String>> indexes = new HashMap<>();
+    String indexMeta =
+        tableInfo.getFactTable().getTableProperties().get(tableInfo.getFactTable().getTableId());
+    IndexMetadata indexMetadata = null;
+    if (null != indexMeta) {
+      try {
+        indexMetadata = IndexMetadata.deserialize(indexMeta);
+      } catch (IOException e) {
+        LOGGER.error("Error deserializing index metadata", e);
+      }
+    }
+    if (indexMetadata != null) {
+      String provider = IndexType.SI.getIndexProviderName();
+      if (!indexMetadata.isIndexTable() && (null != indexMetadata.getIndexesMap().get(provider))) {
+        for (Map.Entry<String, Map<String, String>> entry : indexMetadata.getIndexesMap()
+            .get(provider).entrySet()) {
+          Map<String, String> indexProperties = entry.getValue();
+          indexes.put(entry.getKey(),
+              Arrays.asList(indexProperties.get(CarbonCommonConstants.INDEX_COLUMNS).split(",")));
+        }
+      }
+    }
+    return indexes;
+  }
+
+  public static List<String> identifyRequiredTables(Set<String> filterAttributes,
+      TableInfo tableInfo) {
+    Map<String, List<String>> indexTableInfos = getSecondaryIndexes(tableInfo);
+    if (indexTableInfos.isEmpty()) {
+      return new ArrayList<>();
+    }
+    return identifyRequiredTables(filterAttributes, indexTableInfos);
+  }
+
   public static List<String> identifyRequiredTables(Set<String> filterAttributes,
       Map<String, List<String>> indexTableInfos) {
     List<String> matchedIndexTables = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/index/IndexType.java b/core/src/main/java/org/apache/carbondata/core/metadata/index/IndexType.java
index d0c4e8f..87c0518 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/index/IndexType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/index/IndexType.java
@@ -20,7 +20,7 @@ package org.apache.carbondata.core.metadata.index;
 public enum IndexType {
   LUCENE("org.apache.carbondata.index.lucene.LuceneFineGrainIndexFactory", "lucene"),
   BLOOMFILTER("org.apache.carbondata.index.bloom.BloomCoarseGrainIndexFactory", "bloomfilter"),
-  SI("", "si");
+  SI("org.apache.carbondata.index.secondary.SecondaryIndexFactory", "si");
 
 
   /**
@@ -57,6 +57,8 @@ public enum IndexType {
       return LUCENE;
     } else if (BLOOMFILTER.isEqual(indexProviderName)) {
       return BLOOMFILTER;
+    } else if (SI.isEqual(indexProviderName)) {
+      return SI;
     } else {
       throw new UnsupportedOperationException("Unknown index provider" + indexProviderName);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/indextable/IndexTableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/indextable/IndexTableInfo.java
index 44d7239..22b03a1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/indextable/IndexTableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/indextable/IndexTableInfo.java
@@ -157,15 +157,18 @@ public class IndexTableInfo implements Serializable {
     return toGson(indexTableInfos);
   }
 
-  public static String enableIndex(String oldIndexIno, String indexName) {
-    IndexTableInfo[] indexTableInfos = fromGson(oldIndexIno);
+  public static void setIndexStatus(IndexTableInfo[] indexTableInfos, String indexName,
+      IndexStatus status) {
     for (IndexTableInfo indexTableInfo : indexTableInfos) {
       if (indexTableInfo.tableName.equalsIgnoreCase(indexName)) {
-        Map<String, String> oldIndexProperties = indexTableInfo.indexProperties;
-        oldIndexProperties.put(CarbonCommonConstants.INDEX_STATUS, IndexStatus.ENABLED.name());
-        indexTableInfo.setIndexProperties(oldIndexProperties);
+        indexTableInfo.indexProperties.put(CarbonCommonConstants.INDEX_STATUS, status.name());
       }
     }
+  }
+
+  public static String setIndexStatus(String oldIndexInfo, String indexName, IndexStatus status) {
+    IndexTableInfo[] indexTableInfos = fromGson(oldIndexInfo);
+    setIndexStatus(indexTableInfos, indexName, status);
     return toGson(indexTableInfos);
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java
index f091ec5..0396536 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java
@@ -24,10 +24,8 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.TimeZone;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
@@ -172,25 +170,9 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
     try {
       DataType dataType = this.getDataType();
       if (dataType == DataTypes.DATE || dataType == DataTypes.TIMESTAMP) {
-        String format = CarbonUtil.getFormatFromProperty(this.getDataType());
-        SimpleDateFormat parser = new SimpleDateFormat(format);
-        if (this.getDataType() == DataTypes.DATE) {
-          parser.setTimeZone(TimeZone.getTimeZone("GMT"));
-        }
-        if (value instanceof Timestamp) {
-          return parser.format((Timestamp) value);
-        } else if (value instanceof java.sql.Date) {
-          return parser.format((java.sql.Date) value);
-        } else if (value instanceof Long) {
-          if (isLiteral) {
-            return parser.format(new Timestamp((long) value / 1000));
-          }
-          return parser.format(new Timestamp((long) value));
-        } else if (value instanceof Integer) {
-          long date = ((int) value) * DateDirectDictionaryGenerator.MILLIS_PER_DAY;
-          return parser.format(new java.sql.Date(date));
-        }
-        return value.toString();
+        return CarbonUtil
+            .getFormattedDateOrTimestamp(CarbonUtil.getFormatFromProperty(dataType), dataType,
+                value, isLiteral);
       } else {
         return value.toString();
       }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java
index 7b5700a..f5f74c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java
@@ -17,9 +17,12 @@
 
 package org.apache.carbondata.core.scan.expression;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 public class LiteralExpression extends LeafExpression {
 
@@ -58,7 +61,19 @@ public class LiteralExpression extends LeafExpression {
 
   @Override
   public String getStatement() {
-    return value == null ? null : value.toString();
+    boolean quoteString = false;
+    Object val = value;
+    if (val != null) {
+      if (dataType == DataTypes.STRING || val instanceof String) {
+        quoteString = true;
+      } else if (dataType == DataTypes.TIMESTAMP || dataType == DataTypes.DATE) {
+        val = CarbonUtil.getFormattedDateOrTimestamp(dataType == DataTypes.TIMESTAMP ?
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT :
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT, dataType, value, true);
+        quoteString = true;
+      }
+    }
+    return val == null ? null : quoteString ? "'" + val.toString() + "'" : val.toString();
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
index d0baed9..5f68d25 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
@@ -102,6 +102,6 @@ public class NotEqualsExpression extends BinaryConditionalExpression {
 
   @Override
   public String getStatement() {
-    return left.getStatement() + " <> " + right.getStatement();
+    return left.getStatement() + (isNotNull ? " is not " : " <> ") + right.getStatement();
   }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 2491a1f..975d41a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -2188,6 +2188,34 @@ public final class CarbonProperties {
   }
 
   /**
+   * Check whether coarse grain secondary index is enabled or not. If property is not configured,
+   * default value {@link CarbonCommonConstants#CARBON_COARSE_GRAIN_SECONDARY_INDEX_DEFAULT} is
+   * returned
+   */
+  public boolean isCoarseGrainSecondaryIndex(String dbName, String tableName) {
+    return isCoarseGrainSecondaryIndex(dbName, tableName,
+        CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX_DEFAULT);
+  }
+
+  /**
+   * Check whether coarse grain secondary index is enabled or not. If property is not configured,
+   * input default value is returned
+   */
+  public boolean isCoarseGrainSecondaryIndex(String dbName, String tableName, String defaultValue) {
+    String configuredValue = getProperty(
+        CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX + "." + dbName + "." + tableName);
+    if (configuredValue == null) {
+      configuredValue =
+          getProperty(CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX, defaultValue);
+    }
+    boolean isCoarseGrainSecondaryIndex = Boolean.parseBoolean(configuredValue);
+    if (isCoarseGrainSecondaryIndex) {
+      LOGGER.info("Coarse grain secondary index is enabled for " + dbName + "." + tableName);
+    }
+    return isCoarseGrainSecondaryIndex;
+  }
+
+  /**
    * for test to print current configuration
    */
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 422ce0f..a5e6a12 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -34,6 +34,8 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -44,6 +46,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.UUID;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -67,6 +70,7 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.index.Segment;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.localdictionary.generator.ColumnLocalDictionaryGenerator;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.locks.ICarbonLock;
@@ -1627,6 +1631,36 @@ public final class CarbonUtil {
   }
 
   /**
+   * Get formatted date/timestamp string
+   * @param format Expected format
+   * @param dataType Data type(date or timestamp)
+   * @param value value
+   * @param isLiteral isLiteral
+   * @return
+   */
+  public static String getFormattedDateOrTimestamp(String format, DataType dataType, Object value,
+      boolean isLiteral) {
+    SimpleDateFormat parser = new SimpleDateFormat(format);
+    if (dataType == DataTypes.DATE) {
+      parser.setTimeZone(TimeZone.getTimeZone("GMT"));
+    }
+    if (value instanceof Timestamp) {
+      return parser.format((Timestamp) value);
+    } else if (value instanceof java.sql.Date) {
+      return parser.format((java.sql.Date) value);
+    } else if (value instanceof Long) {
+      if (isLiteral) {
+        return parser.format(new Timestamp((long) value / 1000));
+      }
+      return parser.format(new Timestamp((long) value));
+    } else if (value instanceof Integer) {
+      long date = ((int) value) * DateDirectDictionaryGenerator.MILLIS_PER_DAY;
+      return parser.format(new java.sql.Date(date));
+    }
+    return value.toString();
+  }
+
+  /**
    * Below method will be used to convert byte data to surrogate key based
    * column value size
    *
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index b62d83b..10b413a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -143,6 +143,7 @@ public class SessionParams implements Serializable, Cloneable {
       case CARBON_ENABLE_INDEX_SERVER:
       case CARBON_QUERY_STAGE_INPUT:
       case CARBON_ENABLE_MV:
+      case CARBON_COARSE_GRAIN_SECONDARY_INDEX:
         isValid = CarbonUtil.validateBoolean(value);
         if (!isValid) {
           throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
@@ -216,7 +217,10 @@ public class SessionParams implements Serializable, Cloneable {
           }
         } else if (key.startsWith(CarbonCommonConstants.CARBON_INDEX_VISIBLE)) {
           isValid = true;
-        } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL)) {
+        } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL) || (
+            key.startsWith(CARBON_COARSE_GRAIN_SECONDARY_INDEX) && key.split("\\.").length == 7)) {
+          // validate the value field when property key is with database name and table name.
+          // Like, carbon.coarse.grain.secondary.index.<dbname>.<tablename>
           isValid = CarbonUtil.validateBoolean(value);
           if (!isValid) {
             throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
diff --git a/docs/index/secondary-index-guide.md b/docs/index/secondary-index-guide.md
index e115260..e0c811e 100644
--- a/docs/index/secondary-index-guide.md
+++ b/docs/index/secondary-index-guide.md
@@ -217,3 +217,32 @@ Note: This command is not supported with other concurrent operations.
 ## Complex DataType support on SI
 Currently, only complex Array types are supported for creating secondary indexes. Nested Array
 support and other complex types support will be supported in the future.
+
+## Secondary Index as a Coarse Grain Index in query processing (Experimental)
+Secondary Indexes are used in main table query pruning by rewriting the Spark plan during course of 
+query execution. It is not possible to use Secondary Indexes in the query pruning when the query is 
+fired from engines other than Spark (i.e., Presto, Hive etc). To address this issue, 
+Secondary Index is implemented as a Coarse Grain Index similar to Bloom. Coarse Grain Index pruning 
+happens right after default pruning of the table being queried. A new property is introduced at 
+session level and carbon level to support Secondary Index as a Coarse Grain Index. It can be 
+configured in two variants as show below:
+1. Configure globally - ```carbon.coarse.grain.secondary.index```
+2. Configure for a particular table - ```carbon.coarse.grain.secondary.index.<dbname>.<tablename>```
+
+Property when specified along with database name and table name ensures that Secondary Index as 
+Coarse Grain Index can be enabled/disable for query pruning on a particular table. It has higher 
+precedence over global configuration.
+
+The default value of the property is ```false``` for the Spark session. By default, Spark 
+queries continue to use Secondary Indexes in the query pruning via spark query plan rewrite. If 
+user want to use Secondary Index as a Coarse Grain Index in spark query pruning, need to explicitly 
+configure the property to ```true```. Setting the configuration to ```true``` also avoids the 
+query plan rewrite. Since the feature is newly introduced, unless existing queries are working with 
+Coarse Grain Secondary Index, and the performance improvement is evident, It is recommended to 
+avoid using this property for the existing customers using the Secondary Indexes with Spark queries. 
+
+The default value of this property is ```true``` for Presto. By default, Presto queries use 
+Secondary Index as a Coarse Grain Index in spark query pruning. If user do not wish to use the 
+Secondary Indexes in the query pruning, need to explicitly configure this property to ```false```.
+
+Note: This feature is not supported with the Secondary Index tables created on the older versions.
\ No newline at end of file
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index dda0f20..ffbf357 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.index.Segment;
 import org.apache.carbondata.core.index.TableIndex;
 import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
 import org.apache.carbondata.core.index.dev.expr.IndexWrapperSimpleInfo;
+import org.apache.carbondata.core.index.secondaryindex.CarbonCostBasedOptimizer;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -52,6 +53,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.profiler.ExplainCollector;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
@@ -116,6 +118,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   private static final String PARTITIONS_TO_PRUNE =
       "mapreduce.input.carboninputformat.partitions.to.prune";
   private static final String FG_INDEX_PRUNING = "mapreduce.input.carboninputformat.fgindex";
+  private static final String SECONDARY_INDEX_PRUNING =
+      "mapreduce.input.carboninputformat.secondaryindex.pruning";
   private static final String READ_COMMITTED_SCOPE =
       "mapreduce.input.carboninputformat.read.committed.scope";
   private static final String READ_ONLY_DELTA = "readDeltaOnly";
@@ -271,6 +275,26 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return (enable == null) || enable.equalsIgnoreCase("true");
   }
 
+  public static void checkAndSetSecondaryIndexPruning(TableInfo tableInfo, Expression expression,
+      Configuration configuration) {
+    if (expression != null) {
+      List<String> matchingIndexTables = CarbonCostBasedOptimizer
+          .identifyRequiredTables(IndexFilter.extractColumnExpressions(expression), tableInfo);
+      if (!matchingIndexTables.isEmpty()) {
+        configuration.set(SECONDARY_INDEX_PRUNING, "true");
+      }
+    }
+  }
+
+  public static void setSecondaryIndexPruning(Configuration configuration, boolean status) {
+    configuration.set(SECONDARY_INDEX_PRUNING, Boolean.toString(status));
+  }
+
+  public static boolean isSecondaryIndexPruningEnabled(Configuration configuration) {
+    String enable = configuration.get(SECONDARY_INDEX_PRUNING);
+    return enable != null && enable.equalsIgnoreCase("true");
+  }
+
   /**
    * Set list of segments to access
    */
@@ -425,6 +449,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
       List<Segment> validSegments, List<Segment> invalidSegments,
       List<String> segmentsToBeRefreshed, boolean isCountJob, Configuration configuration) {
+    boolean isSIPruningEnabled = isSecondaryIndexPruningEnabled(configuration);
     try {
       IndexJob indexJob = (IndexJob) IndexUtil.createIndexJob(IndexUtil.DISTRIBUTED_JOB_NAME);
       if (indexJob == null) {
@@ -432,7 +457,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       }
       return IndexUtil
           .executeIndexJob(table, filterResolverIntf, indexJob, partitionNames, validSegments,
-              invalidSegments, null, false, segmentsToBeRefreshed, isCountJob, configuration);
+              invalidSegments, null, false, segmentsToBeRefreshed, isCountJob, isSIPruningEnabled,
+              configuration);
     } catch (Exception e) {
       // Check if fallback is disabled for testing purposes then directly throw exception.
       if (CarbonProperties.getInstance().isFallBackDisabled()) {
@@ -443,7 +469,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       return IndexUtil
           .executeIndexJob(table, filterResolverIntf, IndexUtil.getEmbeddedJob(), partitionNames,
               validSegments, invalidSegments, null, true, segmentsToBeRefreshed, isCountJob,
-              configuration);
+              isSIPruningEnabled, configuration);
     }
   }
 
@@ -510,7 +536,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * Prune the blocklets using the filter expression with available index.
    * First pruned with default blocklet index, then pruned with CG and FG index
    */
-  private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
+  public List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
       IndexFilter filter, List<Segment> validSegments, List<Segment> invalidSegments,
       List<String> segmentsToBeRefreshed) throws IOException {
     ExplainCollector.addPruningInfo(carbonTable.getTableName());
@@ -529,7 +555,9 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     LOG.info("Started block pruning ...");
     boolean isDistributedPruningEnabled = CarbonProperties.getInstance()
         .isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName());
-    if (isDistributedPruningEnabled) {
+    boolean isIndexServerContext =
+        job.getConfiguration().get("isIndexServerContext", "false").equals("true");
+    if (isDistributedPruningEnabled && !isIndexServerContext) {
       try {
         prunedBlocklets =
             getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, validSegments,
@@ -558,7 +586,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         return prunedBlocklets;
       }
 
-      IndexChooser chooser = new IndexChooser(getOrCreateCarbonTable(job.getConfiguration()));
+      IndexChooser chooser = new IndexChooser(getOrCreateCarbonTable(job.getConfiguration()),
+          isSecondaryIndexPruningEnabled(job.getConfiguration()));
 
       // Get the available CG indexes and prune further.
       IndexExprWrapper cgIndexExprWrapper = chooser.chooseCGIndex(filter.getResolver());
@@ -585,6 +614,13 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         // If isCGPruneFallback = true, it means that CG index pruning failed,
         // hence no need to do intersect and simply pass the prunedBlocklets from default index
         if (!isCGPruneFallback) {
+          if (isIndexServerContext) {
+            // For all blocklets initialize the detail info so that it can be serialized to driver
+            for (ExtendedBlocklet blocklet : cgPrunedBlocklets) {
+              blocklet.getDetailInfo();
+              blocklet.setCgIndexPresent(true);
+            }
+          }
           // since index index prune in segment scope,
           // the result need to intersect with previous pruned result
           prunedBlocklets =
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 4ea6896..8c99a94 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -214,7 +214,7 @@ class TestCarbonWriter extends QueryTest with BeforeAndAfterAll{
         """
           |Table Scan on test_flink
           | - total: 1 blocks, 1 blocklets
-          | - filter: (intfield <> null and intfield = 99)
+          | - filter: (intfield is not null and intfield = 99)
           | - pruned by Main Index
           |    - skipped: 0 blocks, 0 blocklets
           | - pruned by CG Index
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 1f646d9..f0e65a0 100644
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.presto;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
@@ -82,6 +83,9 @@ public class CarbondataPageSourceProvider extends HivePageSourceProvider {
         new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(), carbonSplit.getTable()),
         new Path(carbonSplit.getSchema().getProperty("tablePath")));
     configuration = carbonTableReader.updateS3Properties(configuration);
+    for (Map.Entry<Object, Object> entry : carbonSplit.getSchema().entrySet()) {
+      configuration.set(entry.getKey().toString(), entry.getValue().toString());
+    }
     CarbonTable carbonTable = getCarbonTable(carbonSplit, configuration);
     boolean isDirectVectorFill = carbonTableReader.config.getPushRowFilter() == null ||
         carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false");
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
index 3dccde5..09f0b3f 100755
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -146,6 +146,9 @@ public class CarbondataSplitManager extends HiveSplitManager {
         new HdfsEnvironment.HdfsContext(session, schemaTableName.getSchemaName(),
             schemaTableName.getTableName()), new Path(location));
     configuration = carbonTableReader.updateS3Properties(configuration);
+    for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters().entrySet()) {
+      configuration.set(entry.getKey(), entry.getValue());
+    }
     // set the hadoop configuration to thread local, so that FileFactory can use it.
     ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
     CarbonTableCacheModel cache =
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 67b4656..9e833ec 100755
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -35,12 +36,16 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.IndexFilter;
 import org.apache.carbondata.core.index.IndexStoreManager;
+import org.apache.carbondata.core.index.status.IndexStatus;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.index.IndexType;
+import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata;
+import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -223,6 +228,7 @@ public class CarbonTableReader {
         CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
         CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
             .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        refreshIndexInfo(carbonTable, config);
         cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
         // cache the table
         carbonCache.get().put(table, cache);
@@ -234,6 +240,51 @@ public class CarbonTableReader {
     }
   }
 
+  private void refreshIndexInfo(CarbonTable carbonTable, Configuration config) {
+    Map<String, Map<String, Map<String, String>>> indexTableMap = new ConcurrentHashMap<>();
+    String indexInfo = config.get("indexInfo", IndexTableInfo.toGson(new IndexTableInfo[0]));
+    String parentTableName = config.get("parentTableName", "");
+    String parentTableId = config.get("parentTableId", "");
+    String parentTablePath = config.get("parentTablePath", "");
+    boolean isIndexTable = Boolean.getBoolean(config.get("isIndexTable", "false"));
+    IndexTableInfo[] indexTableInfos = IndexTableInfo.fromGson(indexInfo);
+    for (IndexTableInfo indexTableInfo : indexTableInfos) {
+      Map<String, String> indexProperties = indexTableInfo.getIndexProperties();
+      String indexProvider;
+      if (indexProperties != null) {
+        indexProvider = indexProperties.get(CarbonCommonConstants.INDEX_PROVIDER);
+      } else {
+        // in case if SI table has been created before the change CARBONDATA-3765,
+        // indexProperties variable will not be present. On direct upgrade of SI store,
+        // indexProperties will be null, in that case, create indexProperties from indexCols
+        // For details, refer
+        // {@link org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore#refreshIndexInfo}
+        indexProperties = new HashMap<>();
+        indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS,
+            String.join(",", indexTableInfo.getIndexCols()));
+        indexProvider = IndexType.SI.getIndexProviderName();
+        indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER, indexProvider);
+        indexProperties.put(CarbonCommonConstants.INDEX_STATUS, IndexStatus.DISABLED.name());
+      }
+      if (indexTableMap.get(indexProvider) == null) {
+        Map<String, Map<String, String>> indexTableInfoMap = new HashMap<>();
+        indexTableInfoMap.put(indexTableInfo.getTableName(), indexProperties);
+        indexTableMap.put(indexProvider, indexTableInfoMap);
+      } else {
+        indexTableMap.get(indexProvider).put(indexTableInfo.getTableName(), indexProperties);
+      }
+    }
+    IndexMetadata indexMetadata =
+        new IndexMetadata(indexTableMap, parentTableName, isIndexTable, parentTablePath,
+            parentTableId);
+    try {
+      carbonTable.getTableInfo().getFactTable().getTableProperties()
+          .put(carbonTable.getCarbonTableIdentifier().getTableId(), indexMetadata.serialize());
+    } catch (IOException e) {
+      LOGGER.error("Error serializing index metadata", e);
+    }
+  }
+
   private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
     CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
     if (cache != null && cache.isValid()) {
@@ -270,6 +321,11 @@ public class CarbonTableReader {
     config.set("query.id", queryId);
     CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
     CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+    if (CarbonProperties.getInstance().isCoarseGrainSecondaryIndex(tableInfo.getDatabaseName(),
+        tableInfo.getFactTable().getTableName(), "true")) {
+      CarbonInputFormat
+          .checkAndSetSecondaryIndexPruning(carbonTable.getTableInfo(), filters, config);
+    }
 
     JobConf jobConf = new JobConf(config);
     try {
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index eaf0b92..6abe4c3 100644
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.presto;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
@@ -84,6 +85,9 @@ public class CarbondataPageSourceProvider extends HivePageSourceProvider {
         new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(), carbonSplit.getTable()),
         new Path(carbonSplit.getSchema().getProperty("tablePath")));
     configuration = carbonTableReader.updateS3Properties(configuration);
+    for (Map.Entry<Object, Object> entry : carbonSplit.getSchema().entrySet()) {
+      configuration.set(entry.getKey().toString(), entry.getValue().toString());
+    }
     CarbonTable carbonTable = getCarbonTable(carbonSplit, configuration);
     boolean isDirectVectorFill = carbonTableReader.config.getPushRowFilter() == null ||
         carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false");
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
index 633ad61..0d6812c 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -152,6 +152,9 @@ public class CarbondataSplitManager extends HiveSplitManager {
         new HdfsEnvironment.HdfsContext(session, schemaTableName.getSchemaName(),
             schemaTableName.getTableName()), new Path(location));
     configuration = carbonTableReader.updateS3Properties(configuration);
+    for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters().entrySet()) {
+      configuration.set(entry.getKey(), entry.getValue());
+    }
     // set the hadoop configuration to thread local, so that FileFactory can use it.
     ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
     CarbonTableCacheModel cache =
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 5a6bcc5..1946406 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -35,12 +36,16 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.IndexFilter;
 import org.apache.carbondata.core.index.IndexStoreManager;
+import org.apache.carbondata.core.index.status.IndexStatus;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.index.IndexType;
+import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata;
+import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.reader.ThriftReader;
@@ -218,6 +223,7 @@ public class CarbonTableReader {
         CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
         CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
             .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
+        refreshIndexInfo(carbonTable, config);
         cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
         // cache the table
         carbonCache.get().put(table, cache);
@@ -229,6 +235,51 @@ public class CarbonTableReader {
     }
   }
 
+  private void refreshIndexInfo(CarbonTable carbonTable, Configuration config) {
+    Map<String, Map<String, Map<String, String>>> indexTableMap = new ConcurrentHashMap<>();
+    String indexInfo = config.get("indexInfo", IndexTableInfo.toGson(new IndexTableInfo[0]));
+    String parentTableName = config.get("parentTableName", "");
+    String parentTableId = config.get("parentTableId", "");
+    String parentTablePath = config.get("parentTablePath", "");
+    boolean isIndexTable = Boolean.getBoolean(config.get("isIndexTable", "false"));
+    IndexTableInfo[] indexTableInfos = IndexTableInfo.fromGson(indexInfo);
+    for (IndexTableInfo indexTableInfo : indexTableInfos) {
+      Map<String, String> indexProperties = indexTableInfo.getIndexProperties();
+      String indexProvider;
+      if (indexProperties != null) {
+        indexProvider = indexProperties.get(CarbonCommonConstants.INDEX_PROVIDER);
+      } else {
+        // in case if SI table has been created before the change CARBONDATA-3765,
+        // indexProperties variable will not be present. On direct upgrade of SI store,
+        // indexProperties will be null, in that case, create indexProperties from indexCols
+        // For details, refer
+        // {@link org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore#refreshIndexInfo}
+        indexProperties = new HashMap<>();
+        indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS,
+            String.join(",", indexTableInfo.getIndexCols()));
+        indexProvider = IndexType.SI.getIndexProviderName();
+        indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER, indexProvider);
+        indexProperties.put(CarbonCommonConstants.INDEX_STATUS, IndexStatus.DISABLED.name());
+      }
+      if (indexTableMap.get(indexProvider) == null) {
+        Map<String, Map<String, String>> indexTableInfoMap = new HashMap<>();
+        indexTableInfoMap.put(indexTableInfo.getTableName(), indexProperties);
+        indexTableMap.put(indexProvider, indexTableInfoMap);
+      } else {
+        indexTableMap.get(indexProvider).put(indexTableInfo.getTableName(), indexProperties);
+      }
+    }
+    IndexMetadata indexMetadata =
+        new IndexMetadata(indexTableMap, parentTableName, isIndexTable, parentTablePath,
+            parentTableId);
+    try {
+      carbonTable.getTableInfo().getFactTable().getTableProperties()
+          .put(carbonTable.getCarbonTableIdentifier().getTableId(), indexMetadata.serialize());
+    } catch (IOException e) {
+      LOGGER.error("Error serializing index metadata", e);
+    }
+  }
+
   private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
     CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
     if (cache != null && cache.isValid()) {
@@ -265,6 +316,11 @@ public class CarbonTableReader {
     config.set("query.id", queryId);
     CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
     CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
+    if (CarbonProperties.getInstance().isCoarseGrainSecondaryIndex(tableInfo.getDatabaseName(),
+        tableInfo.getFactTable().getTableName(), "true")) {
+      CarbonInputFormat
+          .checkAndSetSecondaryIndexPruning(carbonTable.getTableInfo(), filters, config);
+    }
 
     JobConf jobConf = new JobConf(config);
     try {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndex.java b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndex.java
new file mode 100644
index 0000000..aec65fe
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndex.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.index.secondary;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.index.IndexUtil;
+import org.apache.carbondata.core.index.dev.IndexModel;
+import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndex;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.index.secondary.SecondaryIndexModel.PositionReferenceInfo;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Secondary Index to prune at blocklet level.
+ */
+public class SecondaryIndex extends CoarseGrainIndex {
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(SecondaryIndex.class.getName());
+  private String indexName;
+  private String currentSegmentId;
+  private List<String> validSegmentIds;
+  private PositionReferenceInfo positionReferenceInfo;
+
+  @Override
+  public void init(IndexModel indexModel) {
+    assert (indexModel instanceof SecondaryIndexModel);
+    SecondaryIndexModel model = (SecondaryIndexModel) indexModel;
+    indexName = model.getIndexName();
+    currentSegmentId = model.getCurrentSegmentId();
+    validSegmentIds = model.getValidSegmentIds();
+    positionReferenceInfo = model.getPositionReferenceInfo();
+  }
+
+  private Set<String> getPositionReferences(String databaseName, String indexName,
+      Expression expression) {
+    /* If the position references are not obtained yet(i.e., prune happening for the first valid
+    segment), then get them from the given index table with the given filter from all the valid
+    segments at once and store them as map of segmentId to set of position references in that
+    particular segment. Upon the subsequent prune for other segments, return the position
+    references for the respective segment from the map directly */
+    if (!positionReferenceInfo.isFetched()) {
+      Object[] rows = IndexUtil.getPositionReferences(String
+          .format("select distinct positionReference from %s.%s where insegment('%s') and %s",
+              databaseName, indexName, String.join(",", validSegmentIds),
+              expression.getStatement()));
+      for (Object row : rows) {
+        String positionReference = (String) row;
+        int blockletPathIndex = positionReference.indexOf("/");
+        String blockletPath = positionReference.substring(blockletPathIndex + 1);
+        int segEndIndex = blockletPath.lastIndexOf(CarbonCommonConstants.DASH);
+        int segStartIndex = blockletPath.lastIndexOf(CarbonCommonConstants.DASH, segEndIndex - 1);
+        Set<String> blockletPaths = positionReferenceInfo.getSegmentToPosReferences()
+            .computeIfAbsent(blockletPath.substring(segStartIndex + 1, segEndIndex),
+                k -> new HashSet<>());
+        blockletPaths.add(blockletPath);
+      }
+      positionReferenceInfo.setFetched(true);
+    }
+    Set<String> blockletPaths =
+        positionReferenceInfo.getSegmentToPosReferences().get(currentSegmentId);
+    return blockletPaths != null ? blockletPaths : new HashSet<>();
+  }
+
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+      FilterExecutor filterExecutor, CarbonTable carbonTable) {
+    Set<String> blockletPaths = getPositionReferences(carbonTable.getDatabaseName(), indexName,
+        filterExp.getFilterExpression());
+    List<Blocklet> blocklets = new ArrayList<>();
+    for (String blockletPath : blockletPaths) {
+      blockletPath = blockletPath.substring(blockletPath.indexOf(CarbonCommonConstants.DASH) + 1)
+          .replace(CarbonCommonConstants.UNDERSCORE, CarbonTablePath.BATCH_PREFIX);
+      int blockletIndex = blockletPath.lastIndexOf("/");
+      blocklets.add(new Blocklet(blockletPath.substring(0, blockletIndex),
+          blockletPath.substring(blockletIndex + 1)));
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(String
+          .format("Secondary Index pruned blocklet count for segment %s is %d ", currentSegmentId,
+              blocklets.size()));
+    }
+    return blocklets;
+  }
+
+  @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    return true;
+  }
+
+  @Override
+  public void clear() {
+  }
+
+  @Override
+  public void finish() {
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexFactory.java b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexFactory.java
new file mode 100644
index 0000000..33a88c3
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexFactory.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.index.secondary;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedIndexCommandException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.features.TableOperation;
+import org.apache.carbondata.core.index.IndexFilter;
+import org.apache.carbondata.core.index.IndexInputSplit;
+import org.apache.carbondata.core.index.IndexMeta;
+import org.apache.carbondata.core.index.Segment;
+import org.apache.carbondata.core.index.dev.IndexBuilder;
+import org.apache.carbondata.core.index.dev.IndexWriter;
+import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndex;
+import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndexFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.IndexSchema;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.events.Event;
+import org.apache.carbondata.index.secondary.SecondaryIndexModel.PositionReferenceInfo;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Index Factory for Secondary Index.
+ */
+@InterfaceAudience.Internal
+public class SecondaryIndexFactory extends CoarseGrainIndexFactory {
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(SecondaryIndexFactory.class.getName());
+  private IndexMeta indexMeta;
+
+  public SecondaryIndexFactory(CarbonTable carbonTable, IndexSchema indexSchema)
+      throws MalformedIndexCommandException {
+    super(carbonTable, indexSchema);
+    List<ExpressionType> operations = new ArrayList<>(Arrays.asList(ExpressionType.values()));
+    indexMeta = new IndexMeta(indexSchema.getIndexName(),
+        carbonTable.getIndexedColumns(indexSchema.getIndexColumns()), operations);
+    LOGGER.info("Created Secondary Index Factory instance for " + indexSchema.getIndexName());
+  }
+
+  @Override
+  public IndexWriter createWriter(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException {
+    throw new UnsupportedOperationException("Not supported for Secondary Index");
+  }
+
+  @Override
+  public IndexBuilder createBuilder(Segment segment, String shardName,
+      SegmentProperties segmentProperties) throws IOException {
+    throw new UnsupportedOperationException("Not supported for Secondary Index");
+  }
+
+  @Override
+  public IndexMeta getMeta() {
+    return indexMeta;
+  }
+
+  private Map<Segment, List<CoarseGrainIndex>> getIndexes(List<Segment> segments,
+      PositionReferenceInfo positionReferenceInfo) throws IOException {
+    Map<Segment, List<CoarseGrainIndex>> indexes = new HashMap<>();
+    List<String> allSegmentIds =
+        segments.stream().map(Segment::getSegmentNo).collect(Collectors.toList());
+    for (Segment segment : segments) {
+      indexes.put(segment, this.getIndexes(segment, allSegmentIds, positionReferenceInfo));
+    }
+    return indexes;
+  }
+
+  private List<CoarseGrainIndex> getIndexes(Segment segment, List<String> allSegmentIds,
+      PositionReferenceInfo positionReferenceInfo) throws IOException {
+    List<CoarseGrainIndex> indexes = new ArrayList<>();
+    SecondaryIndex secondaryIndex = new SecondaryIndex();
+    secondaryIndex.init(
+        new SecondaryIndexModel(getIndexSchema().getIndexName(), segment.getSegmentNo(),
+            allSegmentIds, positionReferenceInfo, segment.getConfiguration()));
+    indexes.add(secondaryIndex);
+    return indexes;
+  }
+
+  @Override
+  public Map<Segment, List<CoarseGrainIndex>> getIndexes(List<Segment> segments, IndexFilter filter)
+      throws IOException {
+    return getIndexes(segments, new PositionReferenceInfo());
+  }
+
+  @Override
+  public Map<Segment, List<CoarseGrainIndex>> getIndexes(List<Segment> segments,
+      Set<Path> partitionLocations, IndexFilter filter) throws IOException {
+    return getIndexes(segments, new PositionReferenceInfo());
+  }
+
+  @Override
+  public List<CoarseGrainIndex> getIndexes(Segment segment) throws IOException {
+    List<String> allSegmentIds = new ArrayList<>();
+    allSegmentIds.add(segment.getSegmentNo());
+    return getIndexes(segment, allSegmentIds, new PositionReferenceInfo());
+  }
+
+  @Override
+  public List<CoarseGrainIndex> getIndexes(Segment segment, Set<Path> partitionLocations)
+      throws IOException {
+    return getIndexes(segment);
+  }
+
+  @Override
+  public List<CoarseGrainIndex> getIndexes(IndexInputSplit distributable) throws IOException {
+    throw new UnsupportedOperationException("Not supported for Secondary Index");
+  }
+
+  @Override
+  public List<IndexInputSplit> toDistributable(Segment segment) {
+    throw new UnsupportedOperationException("Not supported for Secondary Index");
+  }
+
+  @Override
+  public void fireEvent(Event event) {
+  }
+
+  @Override
+  public void clear(String segment) {
+  }
+
+  @Override
+  public synchronized void clear() {
+  }
+
+  @Override
+  public void deleteIndexData(Segment segment) throws IOException {
+  }
+
+  @Override
+  public void deleteIndexData() {
+  }
+
+  @Override
+  public boolean willBecomeStale(TableOperation operation) {
+    return false;
+  }
+
+  @Override
+  public String getCacheSize() {
+    return "0:0";
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexModel.java b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexModel.java
new file mode 100644
index 0000000..b2d776f
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexModel.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.index.secondary;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.index.dev.IndexModel;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Secondary Index Model. it is used to initialize the Secondary Index Coarse Grain Index
+ */
+public class SecondaryIndexModel extends IndexModel {
+  private final String indexName; // Secondary Index name
+  private final String currentSegmentId; // Segment Id to prune
+  private final List<String> validSegmentIds; // Valid segment Ids for Secondary Index pruning
+  private final PositionReferenceInfo positionReferenceInfo; // Position reference information
+
+  public SecondaryIndexModel(String indexName, String currentSegmentId,
+      List<String> validSegmentIds, PositionReferenceInfo positionReferenceInfo,
+      Configuration configuration) {
+    super(null, configuration);
+    this.indexName = indexName;
+    this.currentSegmentId = currentSegmentId;
+    this.validSegmentIds = validSegmentIds;
+    this.positionReferenceInfo = positionReferenceInfo;
+  }
+
+  public String getIndexName() {
+    return indexName;
+  }
+
+  public String getCurrentSegmentId() {
+    return currentSegmentId;
+  }
+
+  public List<String> getValidSegmentIds() {
+    return validSegmentIds;
+  }
+
+  public PositionReferenceInfo getPositionReferenceInfo() {
+    return positionReferenceInfo;
+  }
+
+  /**
+   * Position Reference information. One instance of position reference information is shared across
+   * all the {@link SecondaryIndex} instances for the particular query pruning with the given index
+   * filter. This ensures to run a single sql query to get position references from the valid
+   * segments of the given secondary index table with given index filter and populate them in map.
+   * First secondary index segment prune in the query will run the sql query for position
+   * references and store them in map. And subsequent segments prune in the same query can avoid
+   * the individual sql for position references within the respective segment and return position
+   * references from the map directly
+   */
+  public static class PositionReferenceInfo {
+    /**
+     * Indicates whether position references are available or not. Initially it is false. It is set
+     * to true during the first secondary index segment prune after sql query for position
+     * references from the valid segments (passed in the {@link SecondaryIndexModel}) of the
+     * secondary index table. Those obtained position references are used to populate
+     * {@link #segmentToPosReferences} map
+     */
+    private boolean fetched;
+    /**
+     * Map of Segment Id to set of position references within that segment. First secondary index
+     * segment prune populates this map and the subsequent segment prune will return the position
+     * references for the respective segment from this map directly without further sql query for
+     * position references in the segment
+     */
+    private final Map<String, Set<String>> segmentToPosReferences = new HashMap<>();
+
+    public boolean isFetched() {
+      return fetched;
+    }
+
+    public void setFetched(boolean fetched) {
+      this.fetched = fetched;
+    }
+
+    public Map<String, Set<String>> getSegmentToPosReferences() {
+      return segmentToPosReferences;
+    }
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index c3a84fa..5eb1fd4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -26,15 +26,18 @@ import org.apache.spark.Partition
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.index.{IndexInputSplit, IndexStoreManager, Segment}
+import org.apache.carbondata.core.index.{IndexFilter, IndexInputFormat, IndexInputSplit, IndexStoreManager, Segment}
 import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
+import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapper
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, TableStatusReadCommittedScope}
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 object DistributedRDDUtils {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -394,4 +397,36 @@ object DistributedRDDUtils {
       }
     }
   }
+
+  def pruneOnDriver(request: IndexInputFormat): ExtendedBlockletWrapper = {
+    val job = CarbonSparkUtil.createHadoopJob()
+    val conf = job.getConfiguration
+    val tableInfo = request.getCarbonTable.getTableInfo
+    val identifier = request.getCarbonTable.getAbsoluteTableIdentifier
+    val indexFilter = new IndexFilter(request.getCarbonTable,
+      request.getFilterResolverIntf.getFilterExpression)
+    CarbonInputFormat.setTableInfo(conf, tableInfo)
+    CarbonInputFormat.setFilterPredicates(conf, indexFilter)
+    CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+    CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+    CarbonInputFormat.setPartitionsToPrune(conf, request.getPartitions)
+    CarbonInputFormat.setTransactionalTable(conf, tableInfo.isTransactionalTable)
+    CarbonInputFormat.setTablePath(conf,
+      identifier.appendWithLocalPrefix(identifier.getTablePath))
+    CarbonInputFormat.setQuerySegment(conf, identifier)
+    CarbonInputFormat.setColumnProjection(conf, Array("positionId"))
+    CarbonInputFormat.setReadCommittedScope(conf, request.getReadCommittedScope)
+    CarbonInputFormat.setSegmentsToAccess(conf, request.getValidSegments)
+    CarbonInputFormat.setValidateSegmentsToAccess(conf, false)
+    CarbonInputFormat.setSecondaryIndexPruning(conf, request.isSIPruningEnabled)
+    conf.set("isIndexServerContext", "true")
+    val blocklets = new CarbonTableInputFormat[Object].getPrunedBlocklets(job,
+      request.getCarbonTable,
+      indexFilter,
+      request.getValidSegments,
+      new java.util.ArrayList(),
+      new java.util.ArrayList())
+    new ExtendedBlockletWrapper(blocklets, request.getCarbonTable.getTablePath, request.getQueryId,
+      request.isWriteToFile, request.isCountStarJob)
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
index 72eac2b..a81ae80 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala
@@ -72,7 +72,7 @@ class DistributedShowCacheRDD(@transient private val ss: SparkSession,
                 .getCarbonTable
                 .getTableUniqueName
             } else {
-              index.getIndexSchema.getRelationIdentifier.getDatabaseName + "_" + index
+              index.getTable.getAbsoluteTableIdentifier.getDatabaseName + "_" + index
                 .getIndexSchema.getIndexName
             }
             if (executorCache) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 089b857..6db718a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.index.IndexInputFormat
+import org.apache.carbondata.core.index.{IndexInputFormat, IndexStoreManager}
 import org.apache.carbondata.core.indexstore.{ExtendedBlockletWrapperContainer, SegmentWrapperContainer}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -177,15 +177,22 @@ object IndexServer extends ServerInterface {
         DistributedRDDUtils
           .invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName,
             request.getInvalidSegments.asScala)
+        IndexStoreManager.getInstance()
+          .clearInvalidSegments(request.getCarbonTable, request.getInvalidSegments)
       }
-      val splits = new DistributedPruneRDD(sparkSession, request).collect()
-      if (!request.isFallbackJob) {
-        DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
-      }
-      if (request.isJobToClearIndexes) {
-        DistributedRDDUtils.invalidateTableMapping(request.getCarbonTable.getTableUniqueName)
+      if (request.isSIPruningEnabled) {
+        new ExtendedBlockletWrapperContainer(Array(DistributedRDDUtils.pruneOnDriver(request)),
+          request.isFallbackJob)
+      } else {
+        val splits = new DistributedPruneRDD(sparkSession, request).collect()
+        if (!request.isFallbackJob) {
+          DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+        }
+        if (request.isJobToClearIndexes) {
+          DistributedRDDUtils.invalidateTableMapping(request.getCarbonTable.getTableUniqueName)
+        }
+        new ExtendedBlockletWrapperContainer(splits.map(_._2), request.isFallbackJob)
       }
-      new ExtendedBlockletWrapperContainer(splits.map(_._2), request.isFallbackJob)
     }
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6929169..6852968 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -83,7 +83,8 @@ class CarbonScanRDD[T: ClassTag](
     @transient val partitionNames: Seq[PartitionSpec],
     val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl],
     val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass,
-    @transient var splits: java.util.List[InputSplit] = null)
+    @transient var splits: java.util.List[InputSplit] = null,
+    val segmentIds: Option[String] = None)
   extends CarbonRDDWithTableInfo[T](spark, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
@@ -128,6 +129,16 @@ class CarbonScanRDD[T: ClassTag](
       } else {
         prepareInputFormatForDriver(job.getConfiguration)
       }
+      if (segmentIds.isDefined) {
+        CarbonInputFormat.setQuerySegment(job.getConfiguration, segmentIds.get)
+      }
+      if (indexFilter != null && CarbonProperties.getInstance()
+        .isCoarseGrainSecondaryIndex(tableInfo.getDatabaseName,
+          tableInfo.getFactTable.getTableName)) {
+        CarbonInputFormat.checkAndSetSecondaryIndexPruning(tableInfo,
+          indexFilter.getExpression,
+          job.getConfiguration)
+      }
       // initialise query_id for job
       job.getConfiguration.set("query.id", queryId)
       if (!StringUtils.isEmpty(currentSegmentFileName)) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 071c841..bb6e2ef 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -95,6 +95,9 @@ class CarbonEnv {
     // TODO: move it to proper place, it should be registered by indexSchema implementation
     sparkSession.udf.register("text_match", new TextMatchUDF)
     sparkSession.udf.register("text_match_with_limit", new TextMatchMaxDocUDF)
+    sparkSession.udf.register("insegment", new (String => Boolean) with Serializable {
+      override def apply(v1: String): Boolean = true
+    })
 
     // register udf for spatial index filters of querying
     GeoUdfRegister.registerQueryFilterUdf(sparkSession)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
index 362b90a..19eeb51 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonCreateIndexCommand.scala
@@ -225,7 +225,9 @@ case class CarbonCreateIndexCommand(
               new IndexTableInfo(parentTable.getDatabaseName, indexModel.indexName,
                 indexSchema.getProperties),
               false)
-            val enabledIndexInfo = IndexTableInfo.enableIndex(indexInfo, indexModel.indexName)
+            val enabledIndexInfo = IndexTableInfo.setIndexStatus(indexInfo,
+              indexModel.indexName,
+              IndexStatus.ENABLED)
 
             // set index information in parent table. Create it if it is null.
             val parentIndexMetadata = if (
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonRefreshIndexCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonRefreshIndexCommand.scala
index 84dea2f..fba5695 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonRefreshIndexCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/CarbonRefreshIndexCommand.scala
@@ -115,7 +115,9 @@ case class CarbonRefreshIndexCommand(
         LOGGER.info(s"Acquired the metadata lock for table " +
                     s"${ parentTable.getDatabaseName}.${ parentTable.getTableName }")
         val oldIndexInfo = parentTable.getIndexInfo
-        val updatedIndexInfo = IndexTableInfo.enableIndex(oldIndexInfo, indexName)
+        val updatedIndexInfo = IndexTableInfo.setIndexStatus(oldIndexInfo,
+          indexName,
+          IndexStatus.ENABLED)
 
         // set index information in parent table
         val parentIndexMetadata = parentTable.getIndexMetadata
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index b181659..d3f8d87 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -54,7 +54,8 @@ case class CarbonDataSourceScan(
     @transient pushedDownFilters: Seq[Expression],
     directScanSupport: Boolean,
     @transient extraRDD: Option[(RDD[InternalRow], Boolean)] = None,
-    tableIdentifier: Option[TableIdentifier] = None)
+    tableIdentifier: Option[TableIdentifier] = None,
+    segmentIds: Option[String] = None)
   extends DataSourceScanExec with ColumnarBatchScan {
 
   override lazy val supportsBatch: Boolean = {
@@ -129,7 +130,8 @@ case class CarbonDataSourceScan(
       relation.carbonTable.getTableInfo.serialize(),
       relation.carbonTable.getTableInfo,
       new CarbonInputMetrics,
-      selectedPartitions)
+      selectedPartitions,
+      segmentIds = segmentIds)
     carbonRdd.setVectorReaderSupport(supportsBatch)
     carbonRdd.setDirectScanSupport(supportsBatch && directScanSupport)
     extraRDD.map(_._1.union(carbonRdd)).getOrElse(carbonRdd)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index 1e7f0bf..be8b0e3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -151,6 +151,15 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
       }
       (tuple4._1, tuple4._2, tuple4._3)
     }
+    val inSegmentUDF = allPredicates.filter(e =>
+      e.isInstanceOf[ScalaUDF] &&
+      e.asInstanceOf[ScalaUDF].udfName.get.equalsIgnoreCase("insegment"))
+    val segmentIds: Option[String] = if (!inSegmentUDF.isEmpty) {
+      val inSegment = inSegmentUDF.head.asInstanceOf[ScalaUDF]
+      Option(inSegment.children.head.asInstanceOf[Literal].value.toString)
+    } else {
+      None
+    }
     // scan
     val scan = CarbonDataSourceScan(
       table,
@@ -162,7 +171,9 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
       pushedFilters,
       directScanSupport,
       extraRDD,
-      Some(TableIdentifier(table.identifier.getTableName, Option(table.identifier.getDatabaseName)))
+      Some(TableIdentifier(table.identifier.getTableName,
+        Option(table.identifier.getDatabaseName))),
+      segmentIds
     )
     // filter
     val filterOption = if (directScanSupport && scan.supportsBatch) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 23f74d3..e819b86 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -130,7 +130,8 @@ object CarbonSetCommand {
       if (keySplits.length == 6 || keySplits.length == 4) {
         sessionParams.addProperty(key.toString, value)
       }
-    } else if (key.equalsIgnoreCase(CarbonCommonConstants.CARBON_REORDER_FILTER)) {
+    } else if (key.equalsIgnoreCase(CarbonCommonConstants.CARBON_REORDER_FILTER) ||
+               key.startsWith(CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX)) {
       sessionParams.addProperty(key, value)
     } else if (isCarbonProperty) {
       sessionParams.addProperty(key, value)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 094be92..506316a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ListBuffer
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonHiveIndexMetadataUtil, CarbonRelation}
 import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel}
 import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
 import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
@@ -38,7 +38,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.index.status.IndexStatus
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
 import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo
@@ -396,6 +398,88 @@ object CarbonIndexUtil {
     }
   }
 
+  def updateIndexStatusInBatch(carbonTable: CarbonTable,
+      indexTables: List[CarbonTable],
+      indexType: IndexType,
+      status: IndexStatus,
+      sparkSession: SparkSession): Unit = {
+    val dbName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getTableName
+    val metadataLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.METADATA_LOCK)
+    try {
+      if (metadataLock.lockWithRetries()) {
+        CarbonMetadata.getInstance.removeTable(dbName, tableName)
+        val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+        val indexTableInfos = IndexTableInfo.fromGson(table.getIndexInfo)
+        val indexMetadata = table.getIndexMetadata
+        indexTables.foreach { indexTable =>
+          IndexTableInfo.setIndexStatus(indexTableInfos, indexTable.getTableName, status)
+          indexMetadata.updateIndexStatus(indexType.getIndexProviderName,
+            indexTable.getTableName,
+            status.name())
+        }
+        table.getTableInfo
+          .getFactTable
+          .getTableProperties
+          .put(table.getCarbonTableIdentifier.getTableId, indexMetadata.serialize)
+        sparkSession.sql(s"""ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES ('indexInfo' = '${
+          IndexTableInfo.toGson(indexTableInfos)
+        }')""".stripMargin).collect()
+        CarbonHiveIndexMetadataUtil.refreshTable(dbName, tableName, sparkSession)
+        CarbonMetadata.getInstance.removeTable(dbName, tableName)
+        CarbonMetadata.getInstance.loadTableMetadata(table.getTableInfo)
+      }
+    } finally {
+      metadataLock.unlock()
+    }
+  }
+
+  def updateIndexStatus(carbonTable: CarbonTable,
+      indexName: String,
+      indexType: IndexType,
+      status: IndexStatus,
+      needLock: Boolean = true,
+      sparkSession: SparkSession): Unit = {
+    val dbName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getTableName
+    val locks: java.util.List[ICarbonLock] = new java.util.ArrayList[ICarbonLock]
+    try {
+      try {
+        if (needLock) {
+          locks.add(CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+            LockUsage.METADATA_LOCK,
+            "Failed to acquire metadata lock for table: %s".format(carbonTable
+              .getAbsoluteTableIdentifier)))
+        }
+      } catch {
+        case e: Exception =>
+          throw e
+      }
+      CarbonMetadata.getInstance.removeTable(dbName, tableName)
+      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      val indexInfo = IndexTableInfo.setIndexStatus(table.getIndexInfo, indexName, status)
+      val indexMetadata = table.getIndexMetadata
+      indexMetadata.updateIndexStatus(indexType.getIndexProviderName, indexName, status.name())
+      table.getTableInfo
+        .getFactTable
+        .getTableProperties
+        .put(table.getCarbonTableIdentifier.getTableId, indexMetadata.serialize)
+      sparkSession.sql(
+        s"""ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES ('indexInfo' =
+           |'$indexInfo')"""
+          .stripMargin).collect()
+      CarbonHiveIndexMetadataUtil.refreshTable(dbName, tableName, sparkSession)
+      CarbonMetadata.getInstance.removeTable(dbName, tableName)
+      CarbonMetadata.getInstance.loadTableMetadata(table.getTableInfo)
+    } catch {
+      case e: Exception =>
+        LOGGER.error("Failed to update index status for %s".format(indexName))
+    } finally {
+      AlterTableUtil.releaseLocks(locks.asScala.toList)
+    }
+  }
+
   def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
       carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
       secondaryIndexProvider: String, repairLimit: Int,
@@ -572,6 +656,12 @@ object CarbonIndexUtil {
               sparkSession.sql(
                 s"""ALTER TABLE ${ carbonLoadModel.getDatabaseName }.$indexTableName SET
                    |SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
+              CarbonIndexUtil.updateIndexStatus(carbonTable,
+                indexTableName,
+                IndexType.SI,
+                IndexStatus.ENABLED,
+                true,
+                sparkSession)
             }
           } catch {
             case ex: Exception =>
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index d3aebd9..974c821 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.status.IndexStatus
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
@@ -294,6 +295,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
       indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS, indexTableCols.asScala.mkString(","))
       indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER,
         IndexType.SI.getIndexProviderName)
+      indexProperties.put(CarbonCommonConstants.INDEX_STATUS, IndexStatus.DISABLED.name())
       val indexInfo = IndexTableUtil.checkAndAddIndexTable(
         oldIndexInfo,
         new IndexTableInfo(
@@ -444,6 +446,12 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
         sparkSession.sql(
           s"""ALTER TABLE $databaseName.$indexTableName SET
              |SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
+        CarbonIndexUtil.updateIndexStatus(carbonTable,
+          indexModel.indexName,
+          IndexType.SI,
+          IndexStatus.ENABLED,
+          false,
+          sparkSession)
       }
       val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
         CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
diff --git a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/StringProjectionQueryJob.scala
similarity index 52%
copy from core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
copy to integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/StringProjectionQueryJob.scala
index 44f2df6..6a463da 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/StringProjectionQueryJob.scala
@@ -15,30 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.core.index;
+package org.apache.spark.sql.secondaryindex.jobs
 
-import java.util.List;
+import org.apache.spark.sql.util.SparkSQLUtil
 
-import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.carbondata.core.index.AbstractIndexJob
 
 /**
- * abstract class for index job
+ * Spark job to run the sql and get the values of string projection column.
+ * Note: Expects a string column as projection in sql.
  */
-public abstract class AbstractIndexJob implements IndexJob {
-
-  @Override
-  public void execute(CarbonTable carbonTable,
-      FileInputFormat<Void, BlockletIndexWrapper> format) {
-  }
-
-  @Override
-  public List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat,
-      Configuration configuration) {
-    return null;
+class StringProjectionQueryJob extends AbstractIndexJob {
+  override def execute(sql: String): Array[Object] = {
+    SparkSQLUtil
+      .getSparkSession
+      .sql(sql)
+      .rdd
+      .map(row => row.get(0).asInstanceOf[String])
+      .collect
+      .asInstanceOf[Array[Object]]
   }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index 3e61713..cfb79b2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.status.IndexStatus
 import org.apache.carbondata.core.locks.ICarbonLock
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -166,6 +167,11 @@ object Compactor {
                  | SET SERDEPROPERTIES ('isSITableEnabled' = 'false')
                """.stripMargin).collect()
           }
+          CarbonIndexUtil.updateIndexStatusInBatch(carbonMainTable,
+            siCompactionIndexList,
+            IndexType.SI,
+            IndexStatus.DISABLED,
+            sparkSession)
           throw ex
       } finally {
         // once compaction is success, release the segment locks
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
index 78c14b4..3ffaf3b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
@@ -29,6 +29,7 @@ import org.apache.spark.util.SparkUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 /**
@@ -47,11 +48,13 @@ class CarbonSITransformationRule(sparkSession: SparkSession)
     plan.collect {
       case l: LogicalRelation if (!hasSecondaryIndexTable &&
                                   l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) =>
-        hasSecondaryIndexTable = l.relation
-                       .asInstanceOf[CarbonDatasourceHadoopRelation]
-                       .carbonTable
-                       .getIndexTableNames(IndexType.SI.getIndexProviderName).size() > 0
-
+        val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+        hasSecondaryIndexTable = if (!CarbonProperties.getInstance()
+          .isCoarseGrainSecondaryIndex(carbonTable.getDatabaseName, carbonTable.getTableName)) {
+          carbonTable.getIndexTableNames(IndexType.SI.getIndexProviderName).size() > 0
+        } else {
+          false
+        }
     }
     if (hasSecondaryIndexTable && checkIfRuleNeedToBeApplied(plan)) {
       secondaryIndexOptimizer.transformFilterToJoin(plan, isProjectionNeeded(plan))
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
index 23d17d2..f908f54 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.secondaryindex.optimizer.NodeType.NodeType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.secondaryindex.CarbonCostBasedOptimizer
 import org.apache.carbondata.core.util.CarbonProperties
 
 class SIFilterPushDownOperation(nodeType: NodeType)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 9ab18fd..e493d47 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -38,8 +38,10 @@ import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.status.IndexStatus
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, SegmentFileStore}
+import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
@@ -487,6 +489,12 @@ object SecondaryIndexCreator {
               .getDatabaseName
           }.${ secondaryIndexModel.secondaryIndex.indexName } SET
              |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin).collect()
+        CarbonIndexUtil.updateIndexStatus(secondaryIndexModel.carbonTable,
+          secondaryIndexModel.secondaryIndex.indexName,
+          IndexType.SI,
+          IndexStatus.DISABLED,
+          true,
+          secondaryIndexModel.sqlContext.sparkSession)
       }
       // close the executor service
       if (null != executorService) {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
index 1b58ed3..0c83281 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
@@ -733,7 +733,7 @@ class BloomCoarseGrainIndexSuite extends QueryTest with BeforeAndAfterAll with B
       """
         |Table Scan on carbon_bloom
         | - total: 3 blocks, 3 blocklets
-        | - filter: (num1 <> null and num1 = 1)
+        | - filter: (num1 is not null and num1 = 1)
         | - pruned by Main Index
         |    - skipped: 1 blocks, 1 blocklets
         | - pruned by CG Index
@@ -752,7 +752,7 @@ class BloomCoarseGrainIndexSuite extends QueryTest with BeforeAndAfterAll with B
       """
         |Table Scan on carbon_bloom
         | - total: 3 blocks, 3 blocklets
-        | - filter: (dictstring <> null and dictstring = S21)
+        | - filter: (dictstring is not null and dictstring = 'S21')
         | - pruned by Main Index
         |    - skipped: 1 blocks, 1 blocklets
         | - pruned by CG Index
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
index 4bd8a86..710de07 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
@@ -48,11 +48,11 @@ class TestArrayContainsPushDown extends QueryTest with BeforeAndAfterAll {
 
     checkExistence(sql(" explain select * from complex1 where array_contains(arr,'sd')"),
       true,
-      "PushedFilters: [arr = sd]")
+      "PushedFilters: [arr = 'sd']")
 
     checkExistence(sql(" explain select count(*) from complex1 where array_contains(arr,'sd')"),
       true,
-      "PushedFilters: [arr = sd]")
+      "PushedFilters: [arr = 'sd']")
 
     checkAnswer(sql(" select * from complex1 where array_contains(arr,'sd')"),
       Seq(Row(mutable.WrappedArray.make(Array("sd", "df", "gh"))),
@@ -263,13 +263,13 @@ class TestArrayContainsPushDown extends QueryTest with BeforeAndAfterAll {
       sql("explain select * from complex1 " +
           "where array_contains(arr,cast('2018-01-01 00:00:00' as timestamp))"),
       true,
-      "PushedFilters: [arr = 1514793600000000]")
+      "PushedFilters: [arr = '2018-01-01 00:00:00']")
 
     checkExistence(
       sql("explain select count(*) from complex1 " +
           "where array_contains(arr,cast('2018-01-01 00:00:00' as timestamp))"),
       true,
-      "PushedFilters: [arr = 1514793600000000]")
+      "PushedFilters: [arr = '2018-01-01 00:00:00']")
 
     checkAnswer(
       sql("select * from complex1 " +
@@ -295,13 +295,13 @@ class TestArrayContainsPushDown extends QueryTest with BeforeAndAfterAll {
     checkExistence(
       sql("explain select * from complex1 where array_contains(arr,cast('2018-01-01' as date))"),
       true,
-      "PushedFilters: [arr = 17532]")
+      "PushedFilters: [arr = '2018-01-01']")
 
     checkExistence(
       sql("explain select count(*) from complex1 " +
           "where array_contains(arr,cast('2018-01-01' as date))"),
       true,
-      "PushedFilters: [arr = 17532]")
+      "PushedFilters: [arr = '2018-01-01']")
 
     checkAnswer(sql(" select * from complex1 where array_contains(arr,cast('2018-01-01' as date))"),
       Seq(Row(mutable.WrappedArray.make(
diff --git a/processing/src/main/java/org/apache/carbondata/processing/index/IndexWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/index/IndexWriterListener.java
index 7f24757..01c2f10 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/index/IndexWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/index/IndexWriterListener.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.index.TableIndex;
 import org.apache.carbondata.core.index.dev.IndexFactory;
 import org.apache.carbondata.core.index.dev.IndexWriter;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.index.IndexType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.processing.store.TablePage;
@@ -73,9 +74,10 @@ public class IndexWriterListener {
     }
     tblIdentifier = carbonTable.getCarbonTableIdentifier();
     for (TableIndex tableIndex : tableIndices) {
-      // register it only if it is not lazy index, for lazy index, user
+      // register it only if it is not lazy index and not secondary index. For lazy index, user
       // will rebuild the index manually
-      if (!tableIndex.getIndexSchema().isLazy()) {
+      if (!tableIndex.getIndexSchema().isLazy() && !tableIndex.getIndexSchema().getProviderName()
+          .equals(IndexType.SI.getIndexProviderName())) {
         IndexFactory factory = tableIndex.getIndexFactory();
         register(factory, segmentId, taskNo, segmentProperties);
       }