You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/04/10 14:47:59 UTC

[carbondata] branch master updated: [CARBONDATA-3724] Secondary Index enable on partition Table

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 24d2fdf  [CARBONDATA-3724] Secondary Index enable on partition Table
24d2fdf is described below

commit 24d2fdfc2b518c114f70d6fabcf56becc4af7245
Author: maheshrajus <ma...@huawei.com>
AuthorDate: Tue Feb 25 21:30:53 2020 +0530

    [CARBONDATA-3724] Secondary Index enable on partition Table
    
    Why is this PR needed?
    Currently Secondary index is not supported for partition table.
    Secondary index should support on non partition columns instead of blocking full partition table.
    
    What changes were proposed in this PR?
    Secondary index should support on non partition columns
    
    This closes #3639
---
 .../core/constants/CarbonCommonConstants.java      |   5 +
 .../apache/carbondata/core/datamap/Segment.java    |   4 +
 .../block/SegmentPropertiesAndSchemaHolder.java    |   4 +
 .../core/indexstore/blockletindex/BlockIndex.java  |  10 +-
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |  26 +-
 .../core/mutate/data/BlockMappingVO.java           |  12 +
 .../scan/executor/impl/AbstractQueryExecutor.java  |   2 +-
 .../core/scan/result/BlockletScannedResult.java    |  12 +-
 .../scan/scanner/impl/BlockletFilterScanner.java   |  10 +-
 .../scan/scanner/impl/BlockletFullScanner.java     |   6 +-
 .../statusmanager/SegmentUpdateStatusManager.java  |  12 +-
 .../apache/carbondata/core/util/CarbonUtil.java    |  29 +-
 .../blockletindex/TestBlockletIndex.java           |  13 +
 .../hadoop/api/CarbonOutputCommitter.java          |   5 +-
 .../hadoop/api/CarbonTableInputFormat.java         |  17 +-
 .../hadoop/util/CarbonInputFormatUtil.java         |  13 +
 .../TestAlterTableColumnRenameWithIndex.scala      |  20 +-
 .../TestBroadCastSIFilterPushJoinWithUDF.scala     |  48 +--
 .../secondaryindex/TestCreateIndexTable.scala      |   6 -
 .../TestIndexModelForORFilterPushDown.scala        |  21 +-
 .../secondaryindex/TestSIWithPartition.scala       | 363 +++++++++++++++++++++
 .../secondaryindex/TestSIWithSecondryIndex.scala   |  21 +-
 .../secondaryindex/TestSecondaryIndexUtils.scala   |  38 +++
 .../spark/rdd/CarbonDeltaRowScanRDD.scala          |   4 +-
 .../command/mutation/DeleteExecution.scala         |  61 ++--
 .../secondaryindex/command/SICreationCommand.scala |  17 +-
 .../AlterTableCompactionPostEventListener.scala    |   8 +-
 .../rdd/CarbonSecondaryIndexRDD.scala              |  21 +-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala |   6 +-
 .../testsuite/iud/DeleteCarbonTableTestCase.scala  |  94 +++---
 30 files changed, 703 insertions(+), 205 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 41ac51a..2692377 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2102,6 +2102,11 @@ public final class CarbonCommonConstants {
   // As due to SnappyCompressor.MAX_BYTE_TO_COMPRESS is 1.75 GB
   public static final int TABLE_PAGE_SIZE_MAX_INMB = 1755;
 
+  /**
+   * Current segment file
+   */
+  public static final String CURRENT_SEGMENTFILE = "current.segmentfile";
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Unused constants and parameters start here
   //////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 9849df2..fee7117 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -130,6 +130,10 @@ public class Segment implements Serializable, Writable {
     this.options = options;
   }
 
+  public void setSegmentFileName(String segmentFileName) {
+    this.segmentFileName = segmentFileName;
+  }
+
   /**
    *
    * @param segmentNo
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index 1aa675d..e0c8c6e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -302,6 +302,10 @@ public class SegmentPropertiesAndSchemaHolder {
       this.columnsInTable = columnsInTable;
     }
 
+    public CarbonTable getCarbonTable() {
+      return this.carbonTable;
+    }
+
     public void initSegmentProperties() {
       segmentProperties = new SegmentProperties(columnsInTable);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 9198b6b..50d91f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -789,9 +789,13 @@ public class BlockIndex extends CoarseGrainIndex
       byte[][] minValue, boolean[] minMaxFlag, String filePath, int blockletId) {
     BitSet bitSet = null;
     if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
-      String uniqueBlockPath = !isPartitionTable ?
-                filePath.substring(filePath.lastIndexOf("/Part") + 1) :
-                filePath;
+      String uniqueBlockPath;
+      if (segmentPropertiesWrapper.getCarbonTable().isHivePartitionTable()) {
+        uniqueBlockPath = filePath
+            .substring(segmentPropertiesWrapper.getCarbonTable().getTablePath().length() + 1);
+      } else {
+        uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
+      }
       // this case will come in case of old store where index file does not contain the
       // blocklet information
       if (blockletId != -1) {
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index a1d1e18..c942a29 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -61,6 +61,14 @@ public class CarbonUpdateUtil {
   /**
    * returns required filed from tuple id
    *
+   */
+  public static String getRequiredFieldFromTID(String Tid, int index) {
+    return Tid.split(CarbonCommonConstants.FILE_SEPARATOR)[index];
+  }
+
+  /**
+   * returns required filed from tuple id
+   *
    * @param Tid
    * @param tid
    * @return
@@ -74,7 +82,10 @@ public class CarbonUpdateUtil {
    * @param Tid
    * @return
    */
-  public static String getSegmentWithBlockFromTID(String Tid) {
+  public static String getSegmentWithBlockFromTID(String Tid, boolean isPartitionTable) {
+    if (isPartitionTable) {
+      return getRequiredFieldFromTID(Tid, TupleIdEnum.SEGMENT_ID);
+    }
     return getRequiredFieldFromTID(Tid, TupleIdEnum.SEGMENT_ID)
         + CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid, TupleIdEnum.BLOCK_ID);
   }
@@ -916,14 +927,15 @@ public class CarbonUpdateUtil {
    * @param blockName
    * @return
    */
-  public static String getSegmentBlockNameKey(String segID, String blockName) {
-
+  public static String getSegmentBlockNameKey(String segID, String blockName,
+      boolean isPartitionTable) {
     String blockNameWithOutPart = blockName
-            .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
-                    blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
-
+        .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
+            blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
+    if (isPartitionTable) {
+      return blockNameWithOutPart;
+    }
     return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPart;
-
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
index 847ebeb..9f1c713 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
@@ -30,6 +30,10 @@ public class BlockMappingVO {
 
   private Map<String, RowCountDetailsVO> completeBlockRowDetailVO;
 
+  // This map will help us to finding the segment id from the block path.
+  // key is 'blockpath' and value is 'segmentId'
+  private Map<String, String> blockToSegmentMapping;
+
   public void setCompleteBlockRowDetailVO(Map<String, RowCountDetailsVO> completeBlockRowDetailVO) {
     this.completeBlockRowDetailVO = completeBlockRowDetailVO;
   }
@@ -51,4 +55,12 @@ public class BlockMappingVO {
     this.blockRowCountMapping = blockRowCountMapping;
     this.segmentNumberOfBlockMapping = segmentNumberOfBlockMapping;
   }
+
+  public void setBlockToSegmentMapping(Map<String, String> blockToSegmentMapping) {
+    this.blockToSegmentMapping = blockToSegmentMapping;
+  }
+
+  public Map<String, String> getBlockToSegmentMapping() {
+    return blockToSegmentMapping;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index dcb2c0f..b651034 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -432,7 +432,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     String blockId = CarbonUtil
         .getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segment.getSegmentNo(),
             queryModel.getTable().getTableInfo().isTransactionalTable(),
-            isStandardTable);
+            isStandardTable, queryModel.getTable().isHivePartitionTable());
     if (!isStandardTable) {
       blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockIdForPartitionTable(blockId));
     } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 5f333ae..a3e921c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -32,9 +32,7 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.DeleteDeltaVo;
-import org.apache.carbondata.core.mutate.TupleIdEnum;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
@@ -572,17 +570,17 @@ public abstract class BlockletScannedResult {
    * Set blocklet id, which looks like
    * "Part0/Segment_0/part-0-0_batchno0-0-1517155583332.carbondata/0"
    */
-  public void setBlockletId(String blockletId) {
-    this.blockletId = blockletId;
-    blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID);
+  public void setBlockletId(String blockletId, String blockletNumber) {
+    this.blockletId = blockletId + CarbonCommonConstants.FILE_SEPARATOR + blockletNumber;
+    this.blockletNumber = blockletNumber;
     // if deleted recors map is present for this block
     // then get the first page deleted vo
     if (null != deletedRecordMap) {
       String key;
       if (pageIdFiltered != null) {
-        key = blockletNumber + '_' + pageIdFiltered[pageCounter];
+        key = this.blockletNumber + '_' + pageIdFiltered[pageCounter];
       } else {
-        key = blockletNumber + '_' + pageCounter;
+        key = this.blockletNumber + '_' + pageCounter;
       }
       currentDeleteDeltaVo = deletedRecordMap.get(key);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index 347b9ce..cb87af4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -202,9 +202,8 @@ public class BlockletFilterScanner extends BlockletFullScanner {
 
     BlockletScannedResult scannedResult =
         new FilterQueryScannedResult(blockExecutionInfo, queryStatisticsModel);
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR +
-            rawBlockletColumnChunks.getDataBlock().blockletIndex());
+    scannedResult.setBlockletId(blockExecutionInfo.getBlockIdString(),
+        String.valueOf(rawBlockletColumnChunks.getDataBlock().blockletIndex()));
     // valid scanned blocklet
     QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
@@ -452,9 +451,8 @@ public class BlockletFilterScanner extends BlockletFullScanner {
     scannedResult.setPageFilteredRowCount(numberOfRows);
     scannedResult.setPageIdFiltered(pageFilteredPages);
     scannedResult.setLazyBlockletLoader(lazyBlocklet);
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR
-            + rawBlockletColumnChunks.getDataBlock().blockletIndex());
+    scannedResult.setBlockletId(blockExecutionInfo.getBlockIdString(),
+        String.valueOf(rawBlockletColumnChunks.getDataBlock().blockletIndex()));
     // adding statistics for carbon scan time
     QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index 62f883d..0485e9f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.core.scan.scanner.impl;
 
 import java.io.IOException;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -83,9 +82,8 @@ public class BlockletFullScanner implements BlockletScanner {
         .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
     totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
         totalPagesScanned.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages());
-    String blockletId = blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR
-        + rawBlockletColumnChunks.getDataBlock().blockletIndex();
-    scannedResult.setBlockletId(blockletId);
+    scannedResult.setBlockletId(blockExecutionInfo.getBlockIdString(),
+        String.valueOf(rawBlockletColumnChunks.getDataBlock().blockletIndex()));
     DimensionRawColumnChunk[] dimensionRawColumnChunks =
         rawBlockletColumnChunks.getDimensionRawColumnChunks();
     DimensionColumnPage[][] dimensionColumnDataChunks =
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index bb17d53..6327781 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -60,6 +60,7 @@ public class SegmentUpdateStatusManager {
   private LoadMetadataDetails[] segmentDetails;
   private SegmentUpdateDetails[] updateDetails;
   private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
+  private boolean isPartitionTable;
   /**
    * It contains the mapping of segment path and corresponding delete delta file paths,
    * avoiding listing these files for every query
@@ -79,6 +80,7 @@ public class SegmentUpdateStatusManager {
   public SegmentUpdateStatusManager(CarbonTable table,
       LoadMetadataDetails[] segmentDetails, String updateVersion) {
     this.identifier = table.getAbsoluteTableIdentifier();
+    this.isPartitionTable = table.isHivePartitionTable();
     // current it is used only for read function scenarios, as file update always requires to work
     // on latest file status.
     this.segmentDetails = segmentDetails;
@@ -102,6 +104,7 @@ public class SegmentUpdateStatusManager {
       segmentDetails = SegmentStatusManager.readLoadMetadata(
           CarbonTablePath.getMetadataPath(identifier.getTablePath()));
     }
+    this.isPartitionTable = table.isHivePartitionTable();
     if (segmentDetails.length != 0) {
       updateDetails = readLoadMetadata();
     } else {
@@ -140,14 +143,11 @@ public class SegmentUpdateStatusManager {
   private void populateMap() {
     blockAndDetailsMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (SegmentUpdateDetails blockDetails : updateDetails) {
-
       String blockIdentifier = CarbonUpdateUtil
-          .getSegmentBlockNameKey(blockDetails.getSegmentName(), blockDetails.getActualBlockName());
-
+          .getSegmentBlockNameKey(blockDetails.getSegmentName(), blockDetails.getActualBlockName(),
+              isPartitionTable);
       blockAndDetailsMap.put(blockIdentifier, blockDetails);
-
     }
-
   }
 
   /**
@@ -159,7 +159,7 @@ public class SegmentUpdateStatusManager {
   private SegmentUpdateDetails getDetailsForABlock(String segID, String actualBlockName) {
 
     String blockIdentifier = CarbonUpdateUtil
-        .getSegmentBlockNameKey(segID, actualBlockName);
+        .getSegmentBlockNameKey(segID, actualBlockName, isPartitionTable);
 
     return blockAndDetailsMap.get(blockIdentifier);
 
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 7917ddd..18900ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2772,11 +2772,24 @@ public final class CarbonUtil {
    * @param identifier
    * @param filePath
    * @param segmentId
+   * @param isTransactionalTable
    * @param isStandardTable
    * @return
    */
   public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
       String segmentId, boolean isTransactionalTable, boolean isStandardTable) {
+    return getBlockId(identifier, filePath, segmentId, isTransactionalTable, isStandardTable,
+        false);
+  }
+
+  /**
+   * Generate the blockid as per the block path
+   *
+   * @return
+   */
+  public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
+      String segmentId, boolean isTransactionalTable, boolean isStandardTable,
+      boolean isPartitionTable) {
     String blockId;
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
     String tablePath = identifier.getTablePath();
@@ -2795,10 +2808,18 @@ public final class CarbonUtil {
         } else {
           partitionDir = "";
         }
-        // Replace / with # on partition director to support multi level partitioning. And access
-        // them all as a single entity.
-        blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR
-            + segmentId + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+        if (isPartitionTable) {
+          blockId =
+              partitionDir.replace(CarbonCommonConstants.FILE_SEPARATOR, "#")
+                  + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+        } else {
+          // Replace / with # on partition director to support multi level partitioning. And access
+          // them all as a single entity.
+          blockId =
+              partitionDir.replace(CarbonCommonConstants.FILE_SEPARATOR, "#")
+                  + CarbonCommonConstants.FILE_SEPARATOR + segmentId
+                  + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+        }
       }
     } else {
       blockId = filePath.substring(0, filePath.length() - blockName.length()).replace("/", "#")
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndex.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndex.java
index 8a998ef..ac13a5d 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndex.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndex.java
@@ -18,9 +18,12 @@
 package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
@@ -59,6 +62,16 @@ public class TestBlockletIndex {
     };
 
     BlockIndex blockletDataMap = new BlockletIndex();
+
+    new MockUp<CarbonTable>() {
+      @Mock public boolean isHivePartitionTable() {
+        return false;
+      }
+    };
+
+    blockletDataMap.setSegmentPropertiesWrapper(
+        new SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper(new CarbonTable(),
+            new ArrayList<>()));
     Method method = BlockIndex.class
         .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class,
             byte[][].class, boolean[].class, String.class, int.class);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 98dbb51..010adeb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -175,7 +175,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     if (segmentSize > 0 || overwriteSet) {
       if (operationContext != null) {
         operationContext
-            .setProperty("current.segmentfile", newMetaEntry.getSegmentFile());
+            .setProperty(CarbonCommonConstants.CURRENT_SEGMENTFILE, newMetaEntry.getSegmentFile());
         LoadEvents.LoadTablePreStatusUpdateEvent event =
             new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(),
                 loadModel);
@@ -298,7 +298,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid);
     }
     if (operationContext != null) {
-      operationContext.setProperty("current.segmentfile", newMetaEntry.getSegmentFile());
+      operationContext
+          .setProperty(CarbonCommonConstants.CURRENT_SEGMENTFILE, newMetaEntry.getSegmentFile());
     }
     commitJobFinal(context, loadModel, operationContext, carbonTable, uniqueId);
   }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 8fab3a9..0a22e25 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -61,6 +61,7 @@ import org.apache.carbondata.core.stream.StreamFile;
 import org.apache.carbondata.core.stream.StreamPruner;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import org.apache.hadoop.fs.BlockLocation;
@@ -188,7 +189,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
     List<Segment> segmentToAccess =
         getFilteredSegment(job, validAndInProgressSegments, false, readCommittedScope);
-
+    String segmentFileName = job.getConfiguration().get(CarbonCommonConstants.CURRENT_SEGMENTFILE);
+    if (segmentFileName != null) {
+      //per segment it has only one file("current.segment")
+      segmentToAccess.get(0).setSegmentFileName(segmentFileName + CarbonTablePath.SEGMENT_EXT);
+    }
     // process and resolve the expression
     IndexFilter indexFilter = getFilterPredicates(job.getConfiguration());
 
@@ -442,6 +447,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
                 readCommittedScope);
     Map<String, Long> blockRowCountMapping = new HashMap<>();
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
+    Map<String, String> blockToSegmentMapping = new HashMap<>();
 
     // TODO: currently only batch segment is supported, add support for streaming table
     List<Segment> filteredSegment =
@@ -511,7 +517,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
         long rowCount = eachBlocklet.getValue();
 
-        String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName);
+        String key = CarbonUpdateUtil
+            .getSegmentBlockNameKey(segmentId, blockName, table.isHivePartitionTable());
 
         // if block is invalid then don't add the count
         SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
@@ -526,6 +533,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
             }
             segmentAndBlockCountMapping.put(segmentId, count + 1);
           }
+          blockToSegmentMapping.put(key, segmentId);
           blockCount += rowCount;
           blockRowCountMapping.put(key, blockCount);
         }
@@ -542,7 +550,10 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       }
       blockRowCountMapping.put(CarbonCommonConstantsInternal.ROW_COUNT, totalRowCount);
     }
-    return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
+    BlockMappingVO blockMappingVO =
+        new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
+    blockMappingVO.setBlockToSegmentMapping(blockToSegmentMapping);
+    return blockMappingVO;
   }
 
   public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifier identifier)
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index de01509..1ea9c99 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.IndexUtil;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +46,18 @@ public class CarbonInputFormatUtil {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(CarbonProperties.class.getName());
 
+  public static <V> CarbonFileInputFormat<V> createCarbonFileInputFormat(
+      AbsoluteTableIdentifier identifier, Job job) throws IOException {
+    CarbonFileInputFormat<V> carbonInputFormat = new CarbonFileInputFormat<V>();
+    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
+        identifier.getCarbonTableIdentifier().getDatabaseName());
+    CarbonTableInputFormat
+        .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
+    FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    setDataMapJobIfConfigured(job.getConfiguration());
+    return carbonInputFormat;
+  }
+
   public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
       AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestAlterTableColumnRenameWithIndex.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestAlterTableColumnRenameWithIndex.scala
index f6b5211..78fade3 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestAlterTableColumnRenameWithIndex.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestAlterTableColumnRenameWithIndex.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -94,20 +94,4 @@ class TestAlterTableColumnRenameWithIndex extends QueryTest with BeforeAndAfterA
   private def createTable(): Unit = {
     sql("create table si_rename (a string,b int, c string, d string) STORED AS carbondata")
   }
-
-  /**
-    * Method to check whether the filter is push down to SI table or not
-    *
-    * @param sparkPlan
-    * @return
-    */
-  private def isFilterPushedDownToSI(sparkPlan: SparkPlan): Boolean = {
-    var isValidPlan = false
-    sparkPlan.transform {
-      case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
-        isValidPlan = true
-        broadCastSIFilterPushDown
-    }
-    isValidPlan
-  }
 }
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala
index dc03a3d..fbe351e 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil
@@ -62,7 +64,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // approx_count_distinct udf
     carbonQuery = sql("select approx_count_distinct(empname), approx_count_distinct(deptname) from udfValidation where empname = 'pramod' or deptname = 'network'")
     hiveQuery = sql("select approx_count_distinct(empname), approx_count_distinct(deptname) from udfHive where empname = 'pramod' or deptname = 'network'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -74,7 +76,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // collect_list udf
     carbonQuery = sql("select collect_list(empname) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select collect_list(empname) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -86,7 +88,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // collect_set udf
     carbonQuery = sql("select collect_set(deptname) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select collect_set(deptname) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -98,7 +100,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // corr udf
     carbonQuery = sql("select corr(deptno, empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select corr(deptno, empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -110,7 +112,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // covar_pop udf
     carbonQuery = sql("select covar_pop(deptno, empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select covar_pop(deptno, empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -122,7 +124,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // covar_samp udf
     carbonQuery = sql("select covar_samp(deptno, empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select covar_samp(deptno, empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -134,7 +136,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // grouping udf
     carbonQuery = sql("select grouping(designation), grouping(deptname) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL' group by designation, deptname with ROLLUP")
     hiveQuery = sql("select grouping(designation), grouping(deptname) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL' group by designation, deptname with ROLLUP")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -146,7 +148,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // mean udf
     carbonQuery = sql("select mean(deptno), mean(empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select mean(deptno), mean(empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -158,7 +160,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // skewness udf
     carbonQuery = sql("select skewness(deptno), skewness(empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select skewness(deptno), skewness(empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -170,7 +172,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // stddev udf
     carbonQuery = sql("select stddev(deptno), stddev(empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select stddev(deptno), stddev(empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -182,7 +184,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // stddev_pop udf
     carbonQuery = sql("select stddev_pop(deptno), stddev_pop(empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select stddev_pop(deptno), stddev_pop(empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -194,7 +196,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // stddev_samp udf
     carbonQuery = sql("select stddev_samp(deptno), stddev_samp(empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select stddev_samp(deptno), stddev_samp(empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -206,7 +208,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // var_pop udf
     carbonQuery = sql("select var_pop(deptno), var_pop(empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select var_pop(deptno), var_pop(empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -218,7 +220,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // var_samp udf
     carbonQuery = sql("select var_samp(deptno), var_samp(empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select var_samp(deptno), var_samp(empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -230,7 +232,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // variance udf
     carbonQuery = sql("select variance(deptno), variance(empno) from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select variance(deptno), variance(empno) from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -242,7 +244,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
     // COALESCE, CONV and SUBSTRING udf
     carbonQuery = sql("select COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3, 2), 16, 10), '') from udfValidation where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3, 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -275,7 +277,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
           "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," +
           " 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname = 'network' or " +
           "designation='TL' group by designation, deptname, empname with ROLLUP")
-        if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+        if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
           assert(true)
         } else {
           assert(false)
@@ -310,7 +312,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
           "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c25, COALESCE(CONV(substring(deptname, 3," +
           " 2), 16, 10), '') as c26 from udfHive where empname = 'pramod' or deptname = 'network' or " +
           "designation='TL' group by designation, deptname, empname with ROLLUP")
-        if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+        if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
           assert(true)
         } else {
           assert(false)
@@ -345,7 +347,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
           "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," +
           " 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname = 'network' or " +
           "designation='TL' group by designation, deptname, empname with ROLLUP")
-        if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+        if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
           assert(true)
         } else {
           assert(false)
@@ -380,7 +382,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
           "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c26, COALESCE(CONV(substring(deptname, 3," +
           " 2), 16, 10), '') as c27 from udfHive where empname = 'pramod' or deptname = 'network' or " +
           "designation='TL' group by designation, deptname, empname with ROLLUP")
-        if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+        if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
           assert(true)
         } else {
           assert(false)
@@ -393,7 +395,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
   test("test udf on filter - concat") {
     carbonQuery = sql("select concat_ws(deptname)from udfValidation where concat_ws(deptname) IS NOT NULL or concat_ws(deptname) is null")
     hiveQuery = sql("select concat_ws(deptname)from udfHive where concat_ws(deptname) IS NOT NULL or concat_ws(deptname) is null")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -404,7 +406,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
   test("test udf on filter - find_in_set") {
     carbonQuery = sql("select find_in_set(deptname,'o')from udfValidation where find_in_set(deptname,'o') =0 or find_in_set(deptname,'a') is null")
     hiveQuery = sql("select find_in_set(deptname,'o')from udfHive where find_in_set(deptname,'o') =0 or find_in_set(deptname,'a') is null")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -415,7 +417,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter
   test("test udf on filter - agg") {
     carbonQuery = sql("select max(length(deptname)),min(length(designation)),avg(length(empname)),count(length(empname)),sum(length(deptname)),variance(length(designation)) from udfValidation where length(empname)=6 or length(empname) is NULL")
     hiveQuery = sql("select max(length(deptname)),min(length(designation)),avg(length(empname)),count(length(empname)),sum(length(deptname)),variance(length(designation)) from udfHive where length(empname)=6 or length(empname) is NULL")
-    if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
index 442a9a9..db82152 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
@@ -445,12 +445,6 @@ class TestCreateIndexTable extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test blocking secondary Index on Partition table") {
-    intercept[RuntimeException] {
-      sql("""create index part_index on table part_si(c3) AS 'carbondata'""").show()
-    }
-  }
-
   object CarbonMetastore {
     import org.apache.carbondata.core.reader.ThriftReader
 
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelForORFilterPushDown.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelForORFilterPushDown.scala
index b5e6adb..3b021b0 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelForORFilterPushDown.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelForORFilterPushDown.scala
@@ -17,9 +17,9 @@
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -234,21 +234,4 @@ class TestIndexModelForORFilterPushDown extends QueryTest with BeforeAndAfterAll
   override def afterAll: Unit = {
     dropTables
   }
-
-  /**
-   * Method to check whether the filter is push down to SI table or not
-   *
-   * @param sparkPlan
-   * @return
-   */
-  def isFilterPushedDownToSI(sparkPlan: SparkPlan): Boolean = {
-    var isValidPlan = false
-    sparkPlan.transform {
-      case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
-        isValidPlan = true
-        broadCastSIFilterPushDown
-    }
-    isValidPlan
-  }
-
 }
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
new file mode 100644
index 0000000..3581211
--- /dev/null
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+class TestSIWithPartition extends QueryTest with BeforeAndAfterAll {
+
+  override protected def beforeAll(): Unit = {
+    sql("drop table if exists uniqdata1")
+    sql(
+      "CREATE TABLE uniqdata1 (CUST_ID INT,CUST_NAME STRING,DOB timestamp,DOJ timestamp," +
+      "BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 DECIMAL(30, 10)," +
+      "DECIMAL_COLUMN2 DECIMAL(36, 10),Double_COLUMN1 double, Double_COLUMN2 double," +
+      "INTEGER_COLUMN1 int) PARTITIONED BY(ACTIVE_EMUI_VERSION string) STORED AS carbondata " +
+      "TBLPROPERTIES('TABLE_BLOCKSIZE'='256 MB')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data_2000.csv' INTO " +
+        "TABLE uniqdata1 partition(ACTIVE_EMUI_VERSION='abc') OPTIONS('DELIMITER'=',', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID," +
+        "CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1," +
+        "DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data_2000.csv' INTO " +
+        "TABLE uniqdata1 partition(ACTIVE_EMUI_VERSION='abc') OPTIONS('DELIMITER'=',', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID," +
+        "CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1," +
+        "DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data_2000.csv' INTO " +
+        "TABLE uniqdata1 partition(ACTIVE_EMUI_VERSION='abc') OPTIONS('DELIMITER'=',', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID," +
+        "CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1," +
+        "DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data_2000.csv' INTO " +
+        "TABLE uniqdata1 partition(ACTIVE_EMUI_VERSION='abc') OPTIONS('DELIMITER'=',', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID," +
+        "CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1," +
+        "DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
+  }
+
+  test("Testing SI on partition column") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    intercept[UnsupportedOperationException] {
+      sql("create index indextable1 on table uniqdata1 (ACTIVE_EMUI_VERSION) AS 'carbondata'")
+    }
+  }
+
+  test("Testing SI without partition column") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+    val withoutIndex =
+      sql("select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108')")
+        .collect().toSeq
+
+    checkAnswer(sql("select * from uniqdata1 where CUST_NAME='CUST_NAME_00108'"),
+      withoutIndex)
+
+    val df = sql("select * from uniqdata1 where CUST_NAME='CUST_NAME_00108'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI with partition column[where clause]") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with OR condition") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' OR ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(true)
+    } else {
+      assert(false)
+    }
+  }
+
+  test("Testing SI on partition table with combination of OR OR") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' OR CUST_ID='9000' OR " +
+        "ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR CUST_ID='9000' OR " +
+      "ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR CUST_ID='9000' OR " +
+      "ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(true)
+    } else {
+      assert(false)
+    }
+  }
+
+  test("Testing SI on partition table with combination of OR AND") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' OR CUST_ID='9000' AND " +
+        "ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR CUST_ID='9000' AND " +
+      "ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR CUST_ID='9000' AND " +
+      "ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(true)
+    } else {
+      assert(false)
+    }
+  }
+
+  test("Testing SI on partition table with combination of AND OR") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' AND CUST_ID='9000' OR " +
+        "ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' AND CUST_ID='9000' OR " +
+      "ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' AND CUST_ID='9000' OR " +
+      "ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(true)
+    } else {
+      assert(false)
+    }
+  }
+
+  test("Testing SI on partition table with combination of AND AND") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' AND CUST_ID='9000' AND " +
+        "ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' AND CUST_ID='9000' AND " +
+      "ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' AND CUST_ID='9000' AND " +
+      "ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with major compaction") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = " +
+        "'abc'")
+        .collect().toSeq
+
+    sql("alter table uniqdata1 compact 'major'")
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with minor compaction") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = " +
+        "'abc'")
+        .collect().toSeq
+
+    sql("alter table uniqdata1 compact 'minor'")
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with delete") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+
+    checkAnswer(sql(
+      "select count(*) from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION =" +
+      " 'abc'"),
+      Seq(Row(4)))
+
+    sql("delete from uniqdata1 where CUST_NAME='CUST_NAME_00108'").show()
+
+    checkAnswer(sql(
+      "select count(*) from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION =" +
+      " 'abc'"),
+      Seq(Row(0)))
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with update") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+
+    checkAnswer(sql(
+      "select count(*) from uniqdata1 where CUST_ID='9000' and ACTIVE_EMUI_VERSION = 'abc'"),
+      Seq(Row(4)))
+    intercept[RuntimeException] {
+      sql("update uniqdata1 d set (d.CUST_ID) = ('8000')  where d.CUST_ID = '9000'").show()
+    }
+  }
+
+  test("Testing SI on partition table with rename") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 'carbondata'")
+
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = " +
+        "'abc'")
+        .collect().toSeq
+
+    sql("alter table uniqdata1 change CUST_NAME test string")
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where test='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where test='CUST_NAME_00108' and ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("drop table if exists uniqdata1")
+  }
+}
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 06aa8a6..8f2c8fb 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -19,6 +19,8 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
 import scala.collection.JavaConverters._
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
@@ -26,8 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 import org.apache.spark.sql.test.util.QueryTest
 
 class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
@@ -230,21 +230,4 @@ class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists uniqdata")
     sql("drop table if exists uniqdataTable")
   }
-
-  /**
-    * Method to check whether the filter is push down to SI table or not
-    *
-    * @param sparkPlan
-    * @return
-    */
-  private def isFilterPushedDownToSI(sparkPlan: SparkPlan): Boolean = {
-    var isValidPlan = false
-    sparkPlan.transform {
-      case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
-        isValidPlan = true
-        broadCastSIFilterPushDown
-    }
-    isValidPlan
-  }
-
 }
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
new file mode 100644
index 0000000..76ff3ae
--- /dev/null
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+
+object TestSecondaryIndexUtils {
+  /**
+   * Method to check whether the filter is push down to SI table or not
+   *
+   * @param sparkPlan
+   * @return
+   */
+  def isFilterPushedDownToSI(sparkPlan: SparkPlan): Boolean = {
+    var isValidPlan = false
+    sparkPlan.transform {
+      case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
+        isValidPlan = true
+        broadCastSIFilterPushDown
+    }
+    isValidPlan
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
index 2ef954f..53b0a5a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
@@ -74,7 +74,9 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
       val partition = p.asInstanceOf[CarbonSparkPartition]
       val splits = partition.multiBlockSplit.getAllSplits.asScala.filter { s =>
         updateStatusManager.getDetailsForABlock(
-          CarbonUpdateUtil.getSegmentBlockNameKey(s.getSegmentId, s.getBlockPath)) != null
+          CarbonUpdateUtil.getSegmentBlockNameKey(s.getSegmentId,
+            s.getBlockPath,
+            table.isHivePartitionTable)) != null
       }.asJava
       new CarbonSparkPartition(partition.rddId, partition.index,
         new CarbonMultiBlockSplit(splits, partition.multiBlockSplit.getLocations))
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 608fdbe..35a1509 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -131,14 +131,16 @@ object DeleteExecution {
       case Some(id) =>
         deleteRdd.map { row =>
           val tupleId: String = row.getString(id)
-          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId,
+            carbonTable.isHivePartitionTable)
           (key, row)
         }.groupByKey()
       case _ =>
         deleteRdd.map { row =>
           val tupleId: String = row
             .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId,
+            carbonTable.isHivePartitionTable)
           (key, row)
         }.groupByKey()
     }
@@ -159,7 +161,6 @@ object DeleteExecution {
     val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
-
     val metadataDetails = SegmentStatusManager.readTableStatusFile(
       CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
     val isStandardTable = CarbonUtil.isStandardCarbonTable(carbonTable)
@@ -171,6 +172,7 @@ object DeleteExecution {
     val conf = SparkSQLUtil
       .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
     val rdd = rowContRdd.join(keyRdd)
+    val blockDetails = blockMappingVO.getBlockToSegmentMapping
     res = rdd.mapPartitionsWithIndex(
       (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
         Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]] {
@@ -185,7 +187,9 @@ object DeleteExecution {
                        timestamp,
                        rowCountDetailsVO,
                        isStandardTable,
-                       metadataDetails)
+                       metadataDetails
+                         .find(_.getLoadName.equalsIgnoreCase(blockDetails.get(key)))
+                         .get, carbonTable.isHivePartitionTable)
           }
           result
         }).collect()
@@ -196,18 +200,20 @@ object DeleteExecution {
         timestamp: String,
         rowCountDetailsVO: RowCountDetailsVO,
         isStandardTable: Boolean,
-        loads: Array[LoadMetadataDetails]
+        load: LoadMetadataDetails, isPartitionTable: Boolean
     ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))] = {
 
       val result = new DeleteDelataResultImpl()
       var deleteStatus = SegmentStatus.LOAD_FAILURE
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
       // here key = segment/blockName
-      val blockName = CarbonUpdateUtil
-        .getBlockName(
-          CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
-      val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
-      val load = loads.find(l => l.getLoadName.equalsIgnoreCase(segmentId)).get
+      val blockName = if (isPartitionTable) {
+        CarbonUpdateUtil.getBlockName(CarbonTablePath.addDataPartPrefix(key))
+      } else {
+        CarbonUpdateUtil
+          .getBlockName(
+            CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
+      }
       val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName)
       val resultIter =
         new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))] {
@@ -219,11 +225,19 @@ object DeleteExecution {
             val oneRow = iter.next
             TID = oneRow
               .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
-            val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
-            val blockletId = CarbonUpdateUtil
-              .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
-            val pageId = Integer.parseInt(CarbonUpdateUtil
-              .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
+            val (offset, blockletId, pageId) = if (isPartitionTable) {
+              (CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                TupleIdEnum.OFFSET.getTupleIdIndex - 1),
+                CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                  TupleIdEnum.BLOCKLET_ID.getTupleIdIndex - 1),
+                Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                  TupleIdEnum.PAGE_ID.getTupleIdIndex - 1)))
+            } else {
+              (CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET),
+                CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID),
+                Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                  TupleIdEnum.PAGE_ID)))
+            }
             val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
             // stop delete operation
             if(!IsValidOffset) {
@@ -240,9 +254,18 @@ object DeleteExecution {
             } else {
               CarbonUpdateUtil.getTableBlockPath(TID, tablePath, isStandardTable)
             }
-          val completeBlockName = CarbonTablePath
-            .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
-                               CarbonCommonConstants.FACT_FILE_EXT)
+          val completeBlockName = if (isPartitionTable) {
+            CarbonTablePath
+              .addDataPartPrefix(
+                CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                  TupleIdEnum.BLOCK_ID.getTupleIdIndex - 1) +
+                CarbonCommonConstants.FACT_FILE_EXT)
+          } else {
+            CarbonTablePath
+              .addDataPartPrefix(
+                CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
+                CarbonCommonConstants.FACT_FILE_EXT)
+          }
           val deleteDeletaPath = CarbonUpdateUtil
             .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
           val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath)
@@ -251,7 +274,7 @@ object DeleteExecution {
 
           segmentUpdateDetails.setBlockName(blockName)
           segmentUpdateDetails.setActualBlockName(completeBlockName)
-          segmentUpdateDetails.setSegmentName(segmentId)
+          segmentUpdateDetails.setSegmentName(load.getLoadName)
           segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
           segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index 7411145..21c32f5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -121,13 +121,20 @@ private[sql] case class CreateIndexTableCommand(
       }
 
       if (carbonTable.isHivePartitionTable) {
-        throw new ErrorMessage(
-          s"Parent Table  ${ carbonTable.getDatabaseName }." +
-          s"${ carbonTable.getTableName }" +
-          s" is Partition Table and Secondary index on Partition table is not supported ")
+        val isPartitionColumn = indexModel.columnNames.exists {
+          siColumns => carbonTable.getTableInfo
+            .getFactTable
+            .getPartitionInfo
+            .getColumnSchemaList
+            .asScala
+            .exists(_.getColumnName.equalsIgnoreCase(siColumns))
+        }
+        if (isPartitionColumn) {
+          throw new UnsupportedOperationException(
+            "Secondary Index cannot be created on a partition column.")
+        }
       }
 
-
       locks = acquireLockForSecondaryIndexCreation(carbonTable.getAbsoluteTableIdentifier)
       if (locks.isEmpty) {
         throw new ErrorMessage(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
index b04752e..bfaa864 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
@@ -109,8 +109,12 @@ class AlterTableCompactionPostEventListener extends OperationEventListener with
           val loadName = mergedLoadName
             .substring(mergedLoadName.indexOf(CarbonCommonConstants.LOAD_FOLDER) +
                        CarbonCommonConstants.LOAD_FOLDER.length)
-          val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime()
-
+          val factTimestamp = carbonLoadModel.getFactTimeStamp
+          val mergeLoadStartTime = if (factTimestamp == 0) {
+            CarbonUpdateUtil.readCurrentTime()
+          } else {
+            factTimestamp
+          }
           val segmentIdToLoadStartTimeMapping: scala.collection.mutable.Map[String, java.lang
           .Long] = scala.collection.mutable.Map((loadName, mergeLoadStartTime))
           Compactor.createSecondaryIndexAfterCompaction(sQLContext,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
index 678d409..f122913 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
@@ -60,7 +60,8 @@ class CarbonSecondaryIndexRDD[K, V](
     segmentId: String,
     confExecutorsTemp: String,
     indexCarbonTable: CarbonTable,
-    forceAccessSegment: Boolean = false)
+    forceAccessSegment: Boolean = false,
+    isCompactionCall: Boolean = false)
   extends CarbonRDD[(K, V)](ss, Nil) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
@@ -190,24 +191,25 @@ class CarbonSecondaryIndexRDD[K, V](
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
       carbonStoreLocation, databaseName, factTableName, tableId)
-    val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
     val jobConf: JobConf = new JobConf(hadoopConf)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
-    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+
+    if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+      // set the configuration for current segment file("current.segment") as
+      // same as carbon output committer
+      job.getConfiguration.set(CarbonCommonConstants.CURRENT_SEGMENTFILE,
+        segmentId + CarbonCommonConstants.UNDERSCORE + carbonLoadModel.getFactTimeStamp)
+    }
+    val format =
+      CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
     // initialise query_id for job
     job.getConfiguration.set("query.id", queryId)
     var defaultParallelism = sparkContext.defaultParallelism
     val result = new java.util.ArrayList[Partition](defaultParallelism)
     var partitionNo = 0
-    var columnSize = 0
     var noOfBlocks = 0
 
-    // mapping of the node and block list.
-    var nodeBlockMapping: java.util.Map[String, java.util.List[Distributable]] = new
-        java.util.HashMap[String, java.util.List[Distributable]]
-
     val taskInfoList = new java.util.ArrayList[Distributable]
     var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
 
@@ -227,7 +229,6 @@ class CarbonSecondaryIndexRDD[K, V](
     if (!splits.isEmpty) {
       splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).toList.asJava
 
-
       carbonInputSplits ++= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 
       carbonInputSplits.foreach(splits => {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 8e7a4da..bd1edae 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -164,13 +164,12 @@ object SecondaryIndexCreator {
               new SecondaryIndexCreationResultImpl,
               carbonLoadModel,
               secondaryIndexModel.secondaryIndex,
-              segId, execInstance, indexCarbonTable, forceAccessSegment).collect()
-            val segmentFileName =
+              segId, execInstance, indexCarbonTable, forceAccessSegment, isCompactionCall).collect()
               SegmentFileStore
                 .writeSegmentFile(indexCarbonTable,
                   segId,
                   String.valueOf(carbonLoadModel.getFactTimeStamp))
-            segmentToLoadStartTimeMap.put(segId, String.valueOf(carbonLoadModel.getFactTimeStamp))
+            segmentToLoadStartTimeMap.put(segId, carbonLoadModel.getFactTimeStamp.toString)
             if (secondaryIndexCreationStatus.length > 0) {
               eachSegmentSecondaryIndexCreationStatus = secondaryIndexCreationStatus
             }
@@ -360,6 +359,7 @@ object SecondaryIndexCreator {
     copyObj.setColumnCompressor(
       CarbonInternalScalaUtil.getCompressorForIndexTable(
         indexTable, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable))
+    copyObj.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
     copyObj
   }
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index b437dd4..95ec141 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -40,17 +40,20 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop database  if exists iud_db cascade")
     sql("create database  iud_db")
 
-    sql("""create table iud_db.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED AS carbondata""")
+    sql(
+      """create table iud_db.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED
+        |AS carbondata""".stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table iud_db.source2""")
     sql("use iud_db")
   }
+
   test("delete data from carbon table with alias [where clause ]") {
     sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata""")
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest""")
     sql("""delete from iud_db.dest d where d.c1 = 'a'""").show
     checkAnswer(
       sql("""select c2 from iud_db.dest"""),
-      Seq(Row(2), Row(3),Row(4), Row(5))
+      Seq(Row(2), Row(3), Row(4), Row(5))
     )
   }
   test("delete data from  carbon table[where clause ]") {
@@ -70,7 +73,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("""delete from dest where c1 IN ('d', 'e')""").show
     checkAnswer(
       sql("""select c1 from dest"""),
-      Seq(Row("a"), Row("b"),Row("c"))
+      Seq(Row("a"), Row("b"), Row("c"))
     )
   }
 
@@ -119,18 +122,22 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("partition delete data from carbon table with alias [where clause ]") {
     sql("drop table if exists iud_db.dest")
-    sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED AS carbondata""")
+    sql(
+      """create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED AS
+        |carbondata""".stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest""")
     sql("""delete from iud_db.dest d where d.c1 = 'a'""").show
     checkAnswer(
       sql("""select c2 from iud_db.dest"""),
-      Seq(Row(2), Row(3),Row(4), Row(5))
+      Seq(Row(2), Row(3), Row(4), Row(5))
     )
   }
 
   test("partition delete data from  carbon table[where clause ]") {
     sql("""drop table if exists iud_db.dest""")
-    sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED AS carbondata""")
+    sql(
+      """create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED
+        |AS carbondata""".stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest""")
     sql("""delete from iud_db.dest where c2 = 2""").show
     checkAnswer(
@@ -143,7 +150,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon2")
     import sqlContext.implicits._
     val df = sqlContext.sparkContext.parallelize(1 to 2000000)
-      .map(x => (x+"a", "b", x))
+      .map(x => (x + "a", "b", x))
       .toDF("c1", "c2", "c3")
     df.write
       .format("carbondata")
@@ -196,39 +203,44 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     val metaPath = carbonTable.getMetadataPath
     val files = FileFactory.getCarbonFile(metaPath)
     val result = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.getClass
-    if(result.getCanonicalName.contains("CarbonFileMetastore")) {
+    if (result.getCanonicalName.contains("CarbonFileMetastore")) {
       assert(files.listFiles(new CarbonFileFilter {
         override def accept(file: CarbonFile): Boolean = !file.isDirectory
       }).length == 2)
     }
-    else
+    else {
       assert(files.listFiles().length == 2)
-
+    }
     sql("drop table update_status_files")
   }
 
   test("tuple-id for partition table ") {
     sql("drop table if exists iud_db.dest_tuple_part")
     sql(
-      """create table iud_db.dest_tuple_part (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) STORED AS carbondata""".stripMargin)
+      """create table iud_db.dest_tuple_part (c1 string,c2 int,c5 string) PARTITIONED BY(c3
+        |string) STORED AS carbondata"""
+        .stripMargin)
     sql(
-      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest_tuple_part""".stripMargin)
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest_tuple_part"""
+        .stripMargin)
     sql("drop table if exists iud_db.dest_tuple")
     sql(
-      """create table iud_db.dest_tuple (c1 string,c2 int,c5 string,c3 string) STORED AS carbondata""".stripMargin)
+      """create table iud_db.dest_tuple (c1 string,c2 int,c5 string,c3 string) STORED AS
+        |carbondata"""
+        .stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db.dest_tuple""")
 
     val dataframe_part = sql("select getTupleId() as tupleId from iud_db.dest_tuple_part").collect()
     val listOfTupleId_part = dataframe_part.map(df => df.get(0).toString).sorted
-    assert(listOfTupleId_part(0).startsWith("c3=aa/0/0-100100000100001_batchno0-0-0-") &&
+    assert(listOfTupleId_part(0).startsWith("c3=aa/0-100100000100001_batchno0-0-0-") &&
            listOfTupleId_part(0).endsWith("/0/0/0"))
-    assert(listOfTupleId_part(1).startsWith("c3=bb/0/0-100100000100002_batchno0-0-0-") &&
+    assert(listOfTupleId_part(1).startsWith("c3=bb/0-100100000100002_batchno0-0-0-") &&
            listOfTupleId_part(1).endsWith("/0/0/0"))
-    assert(listOfTupleId_part(2).startsWith("c3=cc/0/0-100100000100003_batchno0-0-0-") &&
+    assert(listOfTupleId_part(2).startsWith("c3=cc/0-100100000100003_batchno0-0-0-") &&
            listOfTupleId_part(2).endsWith("/0/0/0"))
-    assert(listOfTupleId_part(3).startsWith("c3=dd/0/0-100100000100004_batchno0-0-0-") &&
+    assert(listOfTupleId_part(3).startsWith("c3=dd/0-100100000100004_batchno0-0-0-") &&
            listOfTupleId_part(3).endsWith("/0/0/0"))
-    assert(listOfTupleId_part(4).startsWith("c3=ee/0/0-100100000100005_batchno0-0-0-") &&
+    assert(listOfTupleId_part(4).startsWith("c3=ee/0-100100000100005_batchno0-0-0-") &&
            listOfTupleId_part(4).endsWith("/0/0/0"))
 
     val dataframe = sql("select getTupleId() as tupleId from iud_db.dest_tuple")
@@ -311,12 +323,15 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   test("test delete on table with decimal column") {
     sql("drop table if exists decimal_table")
     sql(
-      s"""create table decimal_table(smallIntField smallInt,intField int,bigIntField bigint,floatField float,
-          doubleField double,decimalField decimal(25, 4),timestampField timestamp,dateField date,stringField string,
+      s"""create table decimal_table(smallIntField smallInt,intField int,bigIntField bigint,
+          floatField float,
+          doubleField double,decimalField decimal(25, 4),timestampField timestamp,dateField date,
+          stringField string,
           varcharField varchar(10),charField char(10))stored as carbondata
       """.stripMargin)
     sql(s"load data local inpath '$resourcesPath/decimalData.csv' into table decimal_table")
-    val frame = sql("select decimalfield from decimal_table where smallIntField = -1 or smallIntField = 3")
+    val frame = sql(
+      "select decimalfield from decimal_table where smallIntField = -1 or smallIntField = 3")
     sql(s"delete from decimal_table where smallIntField = 2")
     checkAnswer(frame, Seq(
       Row(-1.1234),
@@ -328,30 +343,35 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   test("[CARBONDATA-3491] Return updated/deleted rows count when execute update/delete sql") {
     sql("drop table if exists test_return_row_count")
 
-    sql("create table test_return_row_count (a string, b string, c string) STORED AS carbondata").show()
+    sql("create table test_return_row_count (a string, b string, c string) STORED AS carbondata")
+      .show()
     sql("insert into test_return_row_count select 'aaa','bbb','ccc'").show()
     sql("insert into test_return_row_count select 'bbb','bbb','ccc'").show()
     sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
     sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
 
     checkAnswer(sql("delete from test_return_row_count where a = 'aaa'"),
-        Seq(Row(1))
+      Seq(Row(1))
     )
     checkAnswer(sql("select * from test_return_row_count"),
-        Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "bbb", "ccc"), Row("ccc", "bbb", "ccc"))
+      Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "bbb", "ccc"), Row("ccc", "bbb", "ccc"))
     )
 
     sql("drop table if exists test_return_row_count").show()
   }
 
-  test("[CARBONDATA-3561] Fix incorrect results after execute delete/update operation if there are null values") {
+  test(
+    "[CARBONDATA-3561] Fix incorrect results after execute delete/update operation if there are " +
+    "null values")
+  {
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
+      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
     val tableName = "fix_incorrect_results_for_iud"
-    sql(s"drop table if exists ${tableName}")
+    sql(s"drop table if exists ${ tableName }")
 
-    sql(s"create table ${tableName} (a string, b string, c string) STORED AS carbondata").show()
-    sql(s"""insert into table ${tableName}
+    sql(s"create table ${ tableName } (a string, b string, c string) STORED AS carbondata").show()
+    sql(
+      s"""insert into table ${ tableName }
               select '1','1','2017' union all
               select '2','2','2017' union all
               select '3','3','2017' union all
@@ -363,17 +383,17 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
               select '9',null,'2017' union all
               select '10',null,'2017'""").show()
 
-    checkAnswer(sql(s"select count(1) from ${tableName} where b is null"), Seq(Row(4)))
+    checkAnswer(sql(s"select count(1) from ${ tableName } where b is null"), Seq(Row(4)))
 
-    checkAnswer(sql(s"delete from ${tableName} where b ='4'"), Seq(Row(1)))
-    checkAnswer(sql(s"delete from ${tableName} where a ='9'"), Seq(Row(1)))
-    checkAnswer(sql(s"update ${tableName} set (b) = ('10') where a = '10'"), Seq(Row(1)))
+    checkAnswer(sql(s"delete from ${ tableName } where b ='4'"), Seq(Row(1)))
+    checkAnswer(sql(s"delete from ${ tableName } where a ='9'"), Seq(Row(1)))
+    checkAnswer(sql(s"update ${ tableName } set (b) = ('10') where a = '10'"), Seq(Row(1)))
 
-    checkAnswer(sql(s"select count(1) from ${tableName} where b is null"), Seq(Row(2)))
-    checkAnswer(sql(s"select * from ${tableName} where a = '1'"), Seq(Row("1", "1", "2017")))
-    checkAnswer(sql(s"select * from ${tableName} where a = '10'"), Seq(Row("10", "10", "2017")))
+    checkAnswer(sql(s"select count(1) from ${ tableName } where b is null"), Seq(Row(2)))
+    checkAnswer(sql(s"select * from ${ tableName } where a = '1'"), Seq(Row("1", "1", "2017")))
+    checkAnswer(sql(s"select * from ${ tableName } where a = '10'"), Seq(Row("10", "10", "2017")))
 
-    sql(s"drop table if exists ${tableName}").show()
+    sql(s"drop table if exists ${ tableName }").show()
   }
 
   override def afterAll {