You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/15 11:50:42 UTC

[37/42] carbondata git commit: Adding the Pages support in the Delete Method.

Adding the Pages support in the Delete Method.

correcting the size of the vector batch excluding the filtered rows.

changing page id from string to integer.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bbf5dc18
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bbf5dc18
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bbf5dc18

Branch: refs/heads/branch-1.1
Commit: bbf5dc1815e34921b52e9d15c6552e04dcd114d6
Parents: 2c83e02
Author: ravikiran23 <ra...@gmail.com>
Authored: Fri Jun 2 20:31:57 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 15 13:26:50 2017 +0530

----------------------------------------------------------------------
 .../BlockletLevelDeleteDeltaDataCache.java      | 28 +++++++++++++-------
 .../core/mutate/DeleteDeltaBlockDetails.java    |  4 +--
 .../core/mutate/DeleteDeltaBlockletDetails.java | 11 ++++++--
 .../carbondata/core/mutate/TupleIdEnum.java     |  3 ++-
 .../data/BlockletDeleteDeltaCacheLoader.java    | 11 +++++---
 .../reader/CarbonDeleteFilesDataReader.java     | 25 ++++++++++-------
 .../impl/DictionaryBasedResultCollector.java    |  3 ++-
 .../DictionaryBasedVectorResultCollector.java   |  7 +++--
 .../collector/impl/RawBasedResultCollector.java |  3 ++-
 ...structureBasedDictionaryResultCollector.java |  3 ++-
 .../RestructureBasedRawResultCollector.java     |  3 ++-
 .../RestructureBasedVectorResultCollector.java  |  3 +++
 .../core/scan/result/AbstractScannedResult.java | 13 ++++++---
 .../SegmentUpdateStatusManager.java             |  2 +-
 .../sql/execution/command/IUDCommands.scala     |  4 ++-
 .../sql/execution/command/IUDCommands.scala     |  4 ++-
 16 files changed, 84 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java b/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
index 5d2e8ce..abad924 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/update/BlockletLevelDeleteDeltaDataCache.java
@@ -17,26 +17,36 @@
 
 package org.apache.carbondata.core.cache.update;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
 import org.roaringbitmap.RoaringBitmap;
 
 /**
  * This class maintains delete delta data cache of each blocklet along with the block timestamp
  */
 public class BlockletLevelDeleteDeltaDataCache {
-  private RoaringBitmap deleteDelataDataCache;
+  private Map<Integer, RoaringBitmap> deleteDelataDataCache =
+      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   private String timeStamp;
 
-  public BlockletLevelDeleteDeltaDataCache(int[] deleteDeltaFileData, String timeStamp) {
-    deleteDelataDataCache = RoaringBitmap.bitmapOf(deleteDeltaFileData);
+  public BlockletLevelDeleteDeltaDataCache(Map<Integer, Integer[]> deleteDeltaFileData,
+      String timeStamp) {
+    for (Map.Entry<Integer, Integer[]> entry : deleteDeltaFileData.entrySet()) {
+      int[] dest = new int[entry.getValue().length];
+      int i = 0;
+      for (Integer val : entry.getValue()) {
+        dest[i++] = val.intValue();
+      }
+      deleteDelataDataCache.put(entry.getKey(), RoaringBitmap.bitmapOf(dest));
+    }
     this.timeStamp = timeStamp;
   }
 
-  public boolean contains(int key) {
-    return deleteDelataDataCache.contains(key);
-  }
-
-  public int getSize() {
-    return deleteDelataDataCache.getCardinality();
+  public boolean contains(int key, Integer pageId) {
+    return deleteDelataDataCache.get(pageId).contains(key);
   }
 
   public String getCacheTimeStamp() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
index c4e9ea2..0f66d7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
@@ -80,8 +80,8 @@ public class DeleteDeltaBlockDetails implements Serializable {
     }
   }
 
-  public boolean addBlocklet(String blockletId, String offset) throws Exception {
-    DeleteDeltaBlockletDetails blocklet = new DeleteDeltaBlockletDetails(blockletId);
+  public boolean addBlocklet(String blockletId, String offset, Integer pageId) throws Exception {
+    DeleteDeltaBlockletDetails blocklet = new DeleteDeltaBlockletDetails(blockletId, pageId);
     try {
       blocklet.addDeletedRow(CarbonUpdateUtil.getIntegerValue(offset));
       return addBlockletDetails(blocklet);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
index 5418211..7df5f22 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
@@ -31,6 +31,8 @@ public class DeleteDeltaBlockletDetails implements Serializable {
 
   private static final long serialVersionUID = 1206104914911491724L;
   private String id;
+  private Integer pageId;
+
   private Set<Integer> deletedRows;
 
   /**
@@ -39,9 +41,10 @@ public class DeleteDeltaBlockletDetails implements Serializable {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DeleteDeltaBlockletDetails.class.getName());
 
-  public DeleteDeltaBlockletDetails(String id) {
+  public DeleteDeltaBlockletDetails(String id, Integer pageId) {
     this.id = id;
     deletedRows = new TreeSet<Integer>();
+    this.pageId = pageId;
   }
 
   public boolean addDeletedRows(Set<Integer> rows) {
@@ -60,6 +63,10 @@ public class DeleteDeltaBlockletDetails implements Serializable {
     this.id = id;
   }
 
+  public Integer getPageId() {
+    return pageId;
+  }
+
   public Set<Integer> getDeletedRows() {
     return deletedRows;
   }
@@ -73,7 +80,7 @@ public class DeleteDeltaBlockletDetails implements Serializable {
     }
 
     DeleteDeltaBlockletDetails that = (DeleteDeltaBlockletDetails) obj;
-    return id.equals(that.id);
+    return id.equals(that.id) && pageId == that.pageId;
   }
 
   @Override public int hashCode() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java b/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
index 0c1318c..e8c60b3 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
@@ -24,7 +24,8 @@ public enum TupleIdEnum {
   SEGMENT_ID(1),
   BLOCK_ID(2),
   BLOCKLET_ID(3),
-  OFFSET(4);
+  PAGE_ID(4),
+  OFFSET(5);
 
   private int index;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
index 6665c5b..309e486 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.mutate.data;
 
+import java.util.Map;
+
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
@@ -35,8 +37,8 @@ public class BlockletDeleteDeltaCacheLoader implements DeleteDeltaCacheLoaderInt
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDeleteDeltaCacheLoader.class.getName());
 
-  public BlockletDeleteDeltaCacheLoader(String blockletID,
-       DataRefNode blockletNode, AbsoluteTableIdentifier absoluteIdentifier) {
+  public BlockletDeleteDeltaCacheLoader(String blockletID, DataRefNode blockletNode,
+      AbsoluteTableIdentifier absoluteIdentifier) {
     this.blockletID = blockletID;
     this.blockletNode = blockletNode;
     this.absoluteIdentifier = absoluteIdentifier;
@@ -49,11 +51,12 @@ public class BlockletDeleteDeltaCacheLoader implements DeleteDeltaCacheLoaderInt
   public void loadDeleteDeltaFileDataToCache() {
     SegmentUpdateStatusManager segmentUpdateStatusManager =
         new SegmentUpdateStatusManager(absoluteIdentifier);
-    int[] deleteDeltaFileData = null;
+    Map<Integer, Integer[]> deleteDeltaFileData = null;
     BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = null;
     if (null == blockletNode.getDeleteDeltaDataCache()) {
       try {
-        deleteDeltaFileData = segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
+        deleteDeltaFileData =
+            segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
         deleteDeltaDataCache = new BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData,
             segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, null));
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index 89219e1..e689566 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -19,9 +19,10 @@ package org.apache.carbondata.core.reader;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,8 +37,6 @@ import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockletDetails;
 import org.apache.carbondata.core.util.CarbonProperties;
 
-import org.apache.commons.lang.ArrayUtils;
-
 
 /**
  * This class perform the functionality of reading multiple delete delta files
@@ -80,8 +79,8 @@ public class CarbonDeleteFilesDataReader {
    * @return
    * @throws Exception
    */
-  public int[] getDeleteDataFromAllFiles(List<String> deltaFiles, String blockletId)
-      throws Exception {
+  public Map<Integer, Integer[]> getDeleteDataFromAllFiles(List<String> deltaFiles,
+      String blockletId) throws Exception {
 
     List<Future<DeleteDeltaBlockDetails>> taskSubmitList = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
@@ -101,20 +100,26 @@ public class CarbonDeleteFilesDataReader {
       LOGGER.error("Error while reading the delete delta files : " + e.getMessage());
     }
 
-    Set<Integer> result = new TreeSet<Integer>();
+    Map<Integer, Integer[]> pageIdDeleteRowsMap =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (int i = 0; i < taskSubmitList.size(); i++) {
       try {
         List<DeleteDeltaBlockletDetails> blockletDetails =
             taskSubmitList.get(i).get().getBlockletDetails();
-        result.addAll(
-            blockletDetails.get(blockletDetails.indexOf(new DeleteDeltaBlockletDetails(blockletId)))
-                .getDeletedRows());
+        for (DeleteDeltaBlockletDetails eachBlockletDetails : blockletDetails) {
+          Integer pageId = eachBlockletDetails.getPageId();
+          Set<Integer> rows = blockletDetails
+              .get(blockletDetails.indexOf(new DeleteDeltaBlockletDetails(blockletId, pageId)))
+              .getDeletedRows();
+          pageIdDeleteRowsMap.put(pageId, rows.toArray(new Integer[rows.size()]));
+        }
+
       } catch (Throwable e) {
         LOGGER.error(e.getMessage());
         throw new Exception(e.getMessage());
       }
     }
-    return ArrayUtils.toPrimitive(result.toArray(new Integer[result.size()]));
+    return pageIdDeleteRowsMap;
 
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index b784f94..d4d16d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -109,7 +109,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
         scannedResult.incrementCounter();
       }
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId())) {
+          .contains(scannedResult.getCurrentRowId(), scannedResult.getCurrentPageCounter())) {
         continue;
       }
       fillMeasureData(scannedResult, row);
@@ -128,6 +128,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
             .equals(queryDimensions[i].getDimension().getColName())) {
           row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
               scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
+                  .getCurrentPageCounter() + CarbonCommonConstants.FILE_SEPARATOR + scannedResult
                   .getCurrentRowId(), DataType.STRING);
         } else {
           row[order[i]] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 7a8fe06..3203934 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -144,9 +144,10 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
         return;
       }
       fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
-      scannedResult.markFilteredRows(
-          columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
+      int filteredRows = scannedResult
+          .markFilteredRows(columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
       scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
+      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
     }
   }
 
@@ -164,8 +165,6 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
       // Or set the row counter.
       scannedResult.setRowCounter(rowCounter + requiredRows);
     }
-    columnarBatch.setActualSize(
-        columnarBatch.getActualSize() + requiredRows - columnarBatch.getRowsFilteredCount());
     columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 0af4957..478dc8c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -61,7 +61,8 @@ public class RawBasedResultCollector extends AbstractScannedResultCollector {
     while (scannedResult.hasNext() && rowCounter < batchSize) {
       scanResultAndGetData(scannedResult);
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId())) {
+          .contains(scannedResult.getCurrentRowId(),
+              scannedResult.getCurrentPageCounter())) {
         continue;
       }
       prepareRow(scannedResult, listBasedResult, queryMeasures);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 71045ff..4fa1494 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -81,7 +81,8 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
         scannedResult.incrementCounter();
       }
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId())) {
+          .contains(scannedResult.getCurrentRowId(),
+              scannedResult.getCurrentPageCounter())) {
         continue;
       }
       fillMeasureData(scannedResult, row);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index aa5802d..2de74fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -159,7 +159,8 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
     while (scannedResult.hasNext() && rowCounter < batchSize) {
       scanResultAndGetData(scannedResult);
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId())) {
+          .contains(scannedResult.getCurrentRowId(),
+              scannedResult.getCurrentPageCounter())) {
         continue;
       }
       // re-fill dictionary and no dictionary key arrays for the newly added columns

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index 3df4541..6f45c47 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -109,11 +109,14 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector
         return;
       }
       fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+      int filteredRows = scannedResult
+          .markFilteredRows(columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
       // fill default values for non existing dimensions and measures
       fillDataForNonExistingDimensions();
       fillDataForNonExistingMeasures();
       // fill existing dimensions and measures data
       scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
+      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index a1074ea..1dda1aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -284,8 +284,10 @@ public abstract class AbstractScannedResult {
         String data = getBlockletId();
         if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
             .equals(columnVectorInfo.dimension.getColumnName())) {
-          data = data + CarbonCommonConstants.FILE_SEPARATOR +
-              (rowMapping == null ? j : rowMapping[pageCounter][j]);
+          data = data + CarbonCommonConstants.FILE_SEPARATOR + pageCounter
+              + CarbonCommonConstants.FILE_SEPARATOR + (rowMapping == null ?
+              j :
+              rowMapping[pageCounter][j]);
         }
         vector.putBytes(vectorOffset++, offset, data.length(), data.getBytes());
       }
@@ -648,17 +650,20 @@ public abstract class AbstractScannedResult {
    * @param size
    * @param vectorOffset
    */
-  public void markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
+  public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
       int vectorOffset) {
+    int rowsFiltered = 0;
     if (blockletDeleteDeltaCache != null) {
       int len = startRow + size;
       for (int i = startRow; i < len; i++) {
         int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i;
-        if (blockletDeleteDeltaCache.contains(rowId)) {
+        if (blockletDeleteDeltaCache.contains(rowId, pageCounter)) {
           columnarBatch.markFiltered(vectorOffset);
+          rowsFiltered++;
         }
         vectorOffset++;
       }
     }
+    return rowsFiltered;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index c822935..6fab563 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -254,7 +254,7 @@ public class SegmentUpdateStatusManager {
    * @return
    * @throws Exception
    */
-  public int[] getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception {
+  public Map<Integer, Integer[]> getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception {
     List<String> deltaFiles = getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
     CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
     String blockletId = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCKLET_ID);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index a439c30..a292cde 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -689,7 +689,9 @@ object deleteExecution {
             val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
             val blockletId = CarbonUpdateUtil
               .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
-            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset)
+            val pageId = Integer.parseInt(CarbonUpdateUtil
+              .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
+            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
             // stop delete operation
             if(!IsValidOffset) {
               executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bbf5dc18/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 01395ff..0894f23 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -704,7 +704,9 @@ object deleteExecution {
             val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
             val blockletId = CarbonUpdateUtil
               .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
-            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset)
+            val pageId = Integer.parseInt(CarbonUpdateUtil
+              .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
+            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
             // stop delete operation
             if(!IsValidOffset) {
               executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING