You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/03/14 19:08:58 UTC

svn commit: r1577636 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: liyin
Date: Fri Mar 14 18:08:58 2014
New Revision: 1577636

URL: http://svn.apache.org/r1577636
Log:
[HBASE-10730] Optimize Delete Column on per StoreFile basis.

Author: everyoung

Summary:
1. add deleteColumn check on KeyValueHeap.generalizedSeek level.
2. small fix on createLastOnRow.

Test Plan:
Write one new unit tests.

MR test passed.

Reviewers: aaiyer, adela

Reviewed By: aaiyer

CC: hbase-dev@, fan, arice

Differential Revision: https://phabricator.fb.com/D1183910

Task ID: 2672883

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1577636&r1=1577635&r2=1577636&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java Fri Mar 14 18:08:58 2014
@@ -1679,7 +1679,7 @@ public class KeyValue implements Writabl
    * @return Last possible KeyValue on passed <code>row</code>
    */
   public static KeyValue createLastOnRow(final byte[] row) {
-    return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
+    return new KeyValue(row, null, null, HConstants.OLDEST_TIMESTAMP, Type.Minimum);
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1577636&r1=1577635&r2=1577636&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Fri Mar 14 18:08:58 2014
@@ -29,6 +29,8 @@ import java.util.PriorityQueue;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValueContext;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Implements a heap merge across any number of KeyValueScanners.
@@ -311,10 +313,10 @@ public class KeyValueHeap extends NonLaz
    * @param useBloom whether to optimize seeks using Bloom filters
    */
   private boolean generalizedSeek(boolean isLazy, KeyValue seekKey,
-      boolean forward, boolean useBloom) throws IOException {
+                                           boolean forward, boolean useBloom) throws IOException {
     if (!isLazy && useBloom) {
       throw new IllegalArgumentException("Multi-column Bloom filter " +
-          "optimization requires a lazy seek");
+                                           "optimization requires a lazy seek");
     }
 
     if (current == null) {
@@ -343,7 +345,7 @@ public class KeyValueHeap extends NonLaz
         seekResult = scanner.requestSeek(seekKey, forward, useBloom);
       } else {
         seekResult = NonLazyKeyValueScanner.doRealSeek(
-            scanner, seekKey, forward);
+          scanner, seekKey, forward);
       }
 
       if (!seekResult) {
@@ -357,6 +359,200 @@ public class KeyValueHeap extends NonLaz
     return false;
   }
 
+
+  /**
+   * reseek exact KV or next column with deleteColumn bloomFilter check
+   */
+  public boolean reseekExactKVOrNextCol(Pair<KeyValue, KeyValue> seekKeyPair, ScanQueryMatcher.MatchCode qcode)
+      throws IOException {
+    switch (qcode) {
+      case SEEK_NEXT_COL:
+        return seekNextColWithCheck(
+          false,    // This is not a lazy seek
+          seekKeyPair,
+          true,     // forward (true because this is reseek)
+          false    // Not using Bloom filters
+        );
+      case SEEK_TO_EXACT_KV:
+        return seekExactKVWithCheck(
+          false,    // This is not a lazy seek
+          seekKeyPair.getFirst(),
+          true,     // forward (true because this is reseek)
+          false    // Not using Bloom filters
+        );
+      default:
+        throw new RuntimeException("UNEXPECTED");
+    }
+  }
+
+  /**
+   * requestSeek exact KV or next column with deleteColumn bloomFilter check
+   */
+  public boolean requestSeekExactKVOrNextCol(Pair<KeyValue, KeyValue> keyPair, boolean forward,
+      boolean useBloom, ScanQueryMatcher.MatchCode qcode) throws IOException {
+    switch (qcode) {
+      case SEEK_NEXT_COL:
+        return seekNextColWithCheck(true, keyPair, forward, useBloom);
+      case SEEK_TO_EXACT_KV:
+        return seekExactKVWithCheck(true, keyPair.getFirst(), forward, useBloom);
+      default:
+        throw new RuntimeException("UNEXPECTED");
+    }
+  }
+
+  /**
+   * seekNextCol with deleteColumn bloomFilter check :
+   * (i)    All scanners in the KVH, that do not contain a delete marker, seek past the second key
+   *        in the key pair, which is the kv of seeking timestamp or
+   *        the lastKeyOnRow of the current column if there is no next column;
+   * (ii)   scanners that contain a delete marker, move forward to the first key in the pair,
+   *        if the scanner is behind it.
+   *        The first key in the pair is either firstKeyOnRow of the next column,
+   *        or the lastKeyOnRow of the current column if there is no next column.
+   * (iii)  The "current" scanner at the end of the function call is
+   *        guaranteed to have done a real seek.
+   *
+   * @param isLazy whether we are trying to seek to exactly the given row/col.
+   *          Enables Bloom filter and most-recent-file-first optimizations for
+   *          multi-column get/scan queries.
+   * @param seekKeyPair key pair to seek to, if scanner has deleteBloomFilter, seek the first key
+   *                    otherwise, seek the second key in the pair.
+   * @param forward whether to seek forward (also known as reseek)
+   * @param useBloom whether to optimize seeks using Bloom filters
+   */
+  private boolean seekNextColWithCheck(boolean isLazy, Pair<KeyValue, KeyValue> seekKeyPair,
+      boolean forward, boolean useBloom) throws IOException {
+    if (!isLazy && useBloom) {
+      throw new IllegalArgumentException("Multi-column Bloom filter " +
+          "optimization requires a lazy seek");
+    }
+
+    if (current == null) {
+      return false;
+    }
+    heap.add(current);
+    current = null;
+
+    // if it has deleteColumn, use first Key
+    KeyValue firstKey = seekKeyPair.getFirst();
+    // if it doesn't have deleteColumn, use second Key
+    KeyValue seekKey = seekKeyPair.getSecond();
+
+    List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
+    allScanners.addAll(this.heap);
+    boolean hasOptimized = false;
+    // clear the heap
+    this.heap.clear();
+
+    // process each scanner to go to either firstKey or seekKey
+    for (KeyValueScanner kvScanner : allScanners) {
+      KeyValue realSeekKey = seekKey;
+      KeyValue topKey = kvScanner.peek();
+      if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
+        // already passed the seekKey
+        // add back to heap and continue
+        this.heap.add(kvScanner);
+        continue;
+      }
+
+      if (kvScanner.passesDeleteColumnCheck(seekKey)) {
+        // has deleteColumn, seek firstKey
+        realSeekKey = firstKey;
+      } else {
+        hasOptimized = true;
+      }
+
+      // seek realSeekKey
+      boolean seekResult;
+      if (isLazy) {
+        seekResult = kvScanner.requestSeek(realSeekKey, forward, useBloom);
+      } else {
+        seekResult = NonLazyKeyValueScanner.doRealSeek(
+          kvScanner, realSeekKey, forward);
+      }
+
+      if (!seekResult) {
+        kvScanner.close();
+      } else {
+        this.heap.add(kvScanner);
+      }
+    }
+    // update metric
+    if (hasOptimized) {
+      HRegionServer.numOptimizedSeeks.incrementAndGet();
+    }
+    // done
+    current = pollRealKV();
+    return current != null;
+  }
+
+
+  /**
+   * seekExactKV with deleteColumn bloomFilter check :
+   * If there is a deleteColumn record in the top scanner, do a next();
+   * else seek past seekKey.
+   *
+   * @param isLazy whether we are trying to seek to exactly the given row/col.
+   *          Enables Bloom filter and most-recent-file-first optimizations for
+   *          multi-column get/scan queries.
+   * @param seekKey key to seek to
+   * @param forward whether to seek forward (also known as reseek)
+   * @param useBloom whether to optimize seeks using Bloom filters
+   */
+  private boolean seekExactKVWithCheck(boolean isLazy, KeyValue seekKey,
+     boolean forward, boolean useBloom) throws IOException {
+    if (!isLazy && useBloom) {
+      throw new IllegalArgumentException("Multi-column Bloom filter " +
+        "optimization requires a lazy seek");
+    }
+
+    if (current == null) {
+      return false;
+    }
+    heap.add(current);
+    current = null;
+
+    KeyValueScanner scanner;
+    while ((scanner = heap.poll()) != null) {
+      KeyValue topKey = scanner.peek();
+      if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
+        // Top KeyValue is at-or-after Seek KeyValue. We only know that all
+        // scanners are at or after seekKey (because fake keys of
+        // "lazily-seeked" scanners are not greater than their real next keys),
+        // but we still need to enforce our invariant that the top scanner has
+        // done a real seek. This way StoreScanner and RegionScanner do not
+        // have to worry about fake keys.
+
+        HRegionServer.numOptimizedSeeks.incrementAndGet();
+
+        heap.add(scanner);
+        current = pollRealKV();
+        return current != null;
+      }
+
+      if (scanner.passesDeleteColumnCheck(seekKey)) {
+        current = scanner;
+        return this.next() != null;
+      } else {
+        boolean seekResult;
+        if (isLazy) {
+          seekResult = scanner.requestSeek(seekKey, forward, useBloom);
+        } else {
+          seekResult = NonLazyKeyValueScanner.doRealSeek(
+            scanner, seekKey, forward);
+        }
+
+        if (!seekResult) {
+          scanner.close();
+        } else {
+          heap.add(scanner);
+        }
+      }
+    }
+    // Heap is returning empty, scanner is done
+    return false;
+  }
+
   /**
    * Fetches the top sub-scanner from the priority queue, ensuring that a real
    * seek has been done on it. Works by fetching the top sub-scanner, and if it

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1577636&r1=1577635&r2=1577636&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Mar 14 18:08:58 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.filter.Fi
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * A query matcher that is specifically designed for the scan case.
@@ -267,18 +268,7 @@ public class ScanQueryMatcher {
 
     int timestampComparison = tr.compare(timestamp);
     if (timestampComparison >= 1) {
-      // we have to have the delete column bloom enabled to be able to seek to
-      // the exact kv - (if we don't know if there are any deletes we are unable
-      // to do that
       if (isDeleteColumnUsageEnabled) {
-        KeyValue queriedKV = kv.createFirstOnRowColTS(Math.max(tr.getMax() - 1,
-            tr.getMin()));
-        for (KeyValueScanner kvScanner : allScanners) {
-          if (kvScanner.passesDeleteColumnCheck(queriedKV)) {
-            return MatchCode.SKIP;
-          }
-        }
-        HRegionServer.numOptimizedSeeks.incrementAndGet();
         return MatchCode.SEEK_TO_EXACT_KV;
       } else {
         return MatchCode.SKIP;
@@ -400,45 +390,31 @@ public class ScanQueryMatcher {
   }
 
   /**
-   * If there is no next column, we return a KV which is the last possible key
-   * for the current row. Otherwise, we proceed to the next kv (the first kv on
-   * the next column). if there is no delete column blooom filter enabled, we
-   * return the next kv on the column. If the delete column bloom filter is
-   * enabled, we first check whether the scanners contain delete information
-   * about the next kv. If there is no delete, we proceed to the searched kv
-   * with highest timestamp.
+   * If there is no next column, we return a pair of KV which are the last possible key
+   * for the current row.
+   * Otherwise, return a pair of key, first if the firstOnRow and
+   * the second is the exact key with seeking timestamp
    *
    * @param kv - current keyvalue
-   * @param allScanners - all scanners including the scanners from memstore and
-   * storefiles
-   * @param deleteColBloomFilterEnabled - is the delete column bloom filter
-   * ebnabled
-   * @return - kv in the next column
+   * @return - a pair of kvs to search for next column
    */
-  public KeyValue getKeyForNextColumn(KeyValue kv, List<KeyValueScanner> allScanners, boolean deleteColBloomFilterEnabled) {
+  public Pair<KeyValue, KeyValue> getKeyPairForNextColumn(KeyValue kv) {
     ColumnCount nextColumn = columns.getColumnHint();
     if (nextColumn == null) {
-      return KeyValue.createLastOnRow(
+      KeyValue key = KeyValue.createLastOnRow(
           kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
           kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
           kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
+      return new Pair<KeyValue, KeyValue>(key, key);
     } else {
-      KeyValue nextKV = KeyValue.createFirstOnRow(kv.getBuffer(),
-          kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
-          kv.getFamilyOffset(), kv.getFamilyLength(), nextColumn.getBuffer(),
-          nextColumn.getOffset(), nextColumn.getLength());
-      if (!deleteColBloomFilterEnabled) {
-        return nextKV;
-      }
-      for (KeyValueScanner kvScanner : allScanners) {
-        if (kvScanner.passesDeleteColumnCheck(nextKV))
-          return nextKV;
-      }
-      HRegionServer.numOptimizedSeeks.incrementAndGet();
-      return KeyValue.createFirstOnRow(nextKV.getBuffer(), nextKV.getRowOffset(), nextKV.getRowLength(),
-          nextKV.getBuffer(), nextKV.getFamilyOffset(), nextKV.getFamilyLength(),
-          nextColumn.getBuffer(), nextColumn.getOffset(), nextColumn.getLength(),
-          Math.max(tr.getMax() -1, tr.getMin()));
+      KeyValue firstKey = KeyValue.createFirstOnRow(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
+                            kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
+                            nextColumn.getBuffer(), nextColumn.getOffset(), nextColumn.getLength());
+      KeyValue exactKey = KeyValue.createFirstOnRow(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
+                            kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
+                            nextColumn.getBuffer(), nextColumn.getOffset(), nextColumn.getLength(),
+                            Math.max(tr.getMax() - 1, tr.getMin()));
+      return new Pair<KeyValue, KeyValue>(firstKey, exactKey);
     }
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1577636&r1=1577635&r2=1577636&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Fri Mar 14 18:08:58 2014
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.util.Inje
 import org.apache.hadoop.hbase.util.InjectionHandler;
 import org.apache.hadoop.hbase.regionserver.kvaggregator.KeyValueAggregator;
 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
-
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
@@ -437,7 +437,8 @@ public class StoreScanner extends NonLaz
           case SEEK_TO_EXACT_KV:
             KeyValue queriedKV = kv.createFirstOnRowColTS(Math.max(
                 matcher.tr.getMax() - 1, matcher.tr.getMin()));
-            reseek(queriedKV);
+            reseekExactKVOrNextCol(new Pair<KeyValue, KeyValue>(queriedKV, queriedKV),
+                                   MatchCode.SEEK_TO_EXACT_KV);
             break;
           case SEEK_TO_EFFECTIVE_TS:
             reseek(matcher.getKeyForEffectiveTSOnRow(kv));
@@ -486,12 +487,17 @@ public class StoreScanner extends NonLaz
               }
               reseek(matcher.getKeyForNextRow(kv));
             } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
-            // we check this since some tests have store = null
-              if (this.store == null) {
-                reseek(matcher.getKeyForNextColumn(kv, null, false));
+              if (isUsingDeleteColBloom) {
+                // we check this since some tests have store = null
+                if (this.store == null) {
+                  Pair<KeyValue, KeyValue> keyPair = matcher.getKeyPairForNextColumn(kv);
+                  keyPair.setSecond(keyPair.getFirst());
+                  reseekExactKVOrNextCol(keyPair, MatchCode.SEEK_NEXT_COL);
+                } else {
+                  reseekExactKVOrNextCol(matcher.getKeyPairForNextColumn(kv), MatchCode.SEEK_NEXT_COL);
+                }
               } else {
-                reseek(matcher.getKeyForNextColumn(kv,
-                    this.heap.getActiveScanners(), isUsingDeleteColBloom));
+                reseek(matcher.getKeyPairForNextColumn(kv).getFirst());
               }
             } else {
               this.heap.next();
@@ -529,11 +535,17 @@ public class StoreScanner extends NonLaz
             break;
 
           case SEEK_NEXT_COL:
-          // we check this since some tests have store = null
-            if (this.store == null) {
-              reseek(matcher.getKeyForNextColumn(kv, null, false));
+            if (isUsingDeleteColBloom) {
+              // we check this since some tests have store = null
+              if (this.store == null) {
+                Pair<KeyValue, KeyValue> keyPair = matcher.getKeyPairForNextColumn(kv);
+                keyPair.setSecond(keyPair.getFirst());
+                reseekExactKVOrNextCol(keyPair, MatchCode.SEEK_NEXT_COL);
+              } else {
+                reseekExactKVOrNextCol(matcher.getKeyPairForNextColumn(kv), MatchCode.SEEK_NEXT_COL);
+              }
             } else {
-              reseek(matcher.getKeyForNextColumn(kv, this.heap.getActiveScanners(),isUsingDeleteColBloom));
+              reseek(matcher.getKeyPairForNextColumn(kv).getFirst());
             }
             break;
 
@@ -702,6 +714,21 @@ public class StoreScanner extends NonLaz
     }
   }
 
+  /**
+   * reseek exact KV or next column with deleteColumn bloomFilter check.
+   * It will be called when the matcher return either SEEK_EXACT_KV or SEEK_NEXT_COL
+   * The heap will check deleteColumn bloomFilter on each scanner and perform accordingly
+   */
+  public synchronized boolean reseekExactKVOrNextCol(Pair<KeyValue, KeyValue> kvPair, MatchCode qcode) throws IOException {
+    //Heap cannot be null, because this is only called from next() which
+    //guarantees that heap will never be null before this call.
+    if (explicitColumnQuery && lazySeekEnabledGlobally) {
+      return heap.requestSeekExactKVOrNextCol(kvPair, true, useRowColBloom, qcode);
+    } else {
+      return heap.reseekExactKVOrNextCol(kvPair, qcode);
+    }
+  }
+
   @Override
   public long getSequenceID() {
     return 0;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java?rev=1577636&r1=1577635&r2=1577636&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java Fri Mar 14 18:08:58 2014
@@ -148,6 +148,44 @@ public class TestBlocksRead extends HBas
     return kvs;
   }
 
+  private KeyValue[] getDataInRange(String family, String row, List<String> columns,
+                             long minTimestamp, long maxTimestamp, int expBlocks)
+      throws IOException {
+    return getDataInRange(family, row, columns, minTimestamp, maxTimestamp,
+                          expBlocks, expBlocks, expBlocks);
+  }
+
+  private KeyValue[] getDataInRange(String family, String row, List<String> columns,
+                             long minTimestamp, long maxTimestamp,
+                             int expBlocksRowCol, int expBlocksRow, int expBlocksNone)
+    throws IOException {
+    int[] expBlocks = new int[] { expBlocksRowCol, expBlocksRow, expBlocksNone };
+    KeyValue[] kvs = null;
+
+    for (int i = 0; i < BLOOM_TYPE.length; i++) {
+      BloomType bloomType = BLOOM_TYPE[i];
+      byte[] cf = Bytes.toBytes(family + "_" + bloomType);
+      long blocksStart = getBlkAccessCount(cf);
+      Get get = new Get(Bytes.toBytes(row));
+      get.setMaxVersions();
+
+      for (String column : columns) {
+        get.addColumn(cf, Bytes.toBytes(column));
+        get.setTimeRange(minTimestamp, maxTimestamp);
+      }
+
+      kvs = region.get(get, null).raw();
+      long blocksEnd = getBlkAccessCount(cf);
+      if (expBlocks[i] != -1) {
+        assertEquals("Blocks Read Check for Bloom: " + bloomType, expBlocks[i],
+                     blocksEnd - blocksStart);
+      }
+      LOG.info("Blocks Read for Bloom: " + bloomType + " = "
+                           + (blocksEnd - blocksStart) + "Expected = " + expBlocks[i]);
+    }
+    return kvs;
+  }
+
   private KeyValue[] getData(String family, String row, List<String> columns,
       int expBlocksRowCol, int expBlocksRow, int expBlocksNone)
       throws IOException {
@@ -202,7 +240,7 @@ public class TestBlocksRead extends HBas
       long version) throws IOException {
     Delete del = new Delete(Bytes.toBytes(row));
     for (int i=0; i<BLOOM_TYPE.length; i++) {
-      del.deleteColumn(Bytes.toBytes(family + BLOOM_TYPE[i]),
+      del.deleteColumn(Bytes.toBytes(family + "_" + BLOOM_TYPE[i]),
           Bytes.toBytes(qualifier), version);
     }
     region.delete(del, null, true);
@@ -218,6 +256,16 @@ public class TestBlocksRead extends HBas
     region.delete(del, null, true);
   }
 
+  public void deleteColumnsUntil(String family, String qualifier, String row, long version)
+    throws IOException {
+    Delete del = new Delete(Bytes.toBytes(row));
+    for (int i=0; i<BLOOM_TYPE.length; i++) {
+      del.deleteColumns(Bytes.toBytes(family + "_" + BLOOM_TYPE[i]),
+                        Bytes.toBytes(qualifier), version);
+    }
+    region.delete(del, null, true);
+  }
+
   private static void verifyData(KeyValue kv, String expectedRow,
       String expectedCol, long expectedVersion) {
     assertEquals("RowCheck", expectedRow, Bytes.toString(kv.getRow()));
@@ -565,6 +613,116 @@ public class TestBlocksRead extends HBas
     assertEquals(6, numSeeks);
   }
 
+  @Test
+  public void testDeleteColBloomFilterWithDeletesWithMultipleFlushCache() throws IOException{
+    byte[] TABLE = Bytes.toBytes("testDeleteColBloomFilterWithDeletes");
+    String FAMILY = "cf1";
+    KeyValue kvs[];
+    HBaseConfiguration conf = getConf();
+    conf.setBoolean("io.storefile.delete.column.bloom.enabled", true);
+    initHRegion(TABLE, getName(), conf, FAMILY, true);
+    if (!conf.getBoolean(BloomFilterFactory.IO_STOREFILE_DELETECOLUMN_BLOOM_ENABLED, false)) {
+      System.out.println("ignoring this test since the delete bloom filter is not enabled...");
+      return;
+    }
+
+    // SF1: w/o deleteColumn, but with Delete
+    for (int i = 1; i < 8; i++) {
+      for (int j = 1; j < 6; j++) {
+        putData(FAMILY, "row", "col"+i, j);
+      }
+    }
+
+    deleteColumn(FAMILY, "col2", "row", 3);
+    deleteColumn(FAMILY, "col5", "row", 3);
+    deleteColumn(FAMILY, "col7", "row", 3);
+    region.flushcache();
+
+    // SF2: w/o deleteColumn
+    for (int i = 1; i < 8; i++) {
+      for (int j = 1; j < 6; j++) {
+        putData(FAMILY, "row", "col"+i, j);
+      }
+    }
+    region.flushcache();
+
+    // SF3: w/o deleteColumn
+    for (int i = 1; i < 8; i++) {
+      for (int j = 1; j < 6; j++) {
+        putData(FAMILY, "row", "col"+i, j);
+      }
+    }
+    region.flushcache();
+
+    // SF4: w deleteColumn
+    for (int i = 1; i < 8; i++) {
+      for (int j = 1; j < 6; j++) {
+        putData(FAMILY, "row", "col"+i, j);
+      }
+    }
+    deleteColumn(FAMILY, "col3", "row");
+    deleteColumnsUntil(FAMILY, "col6", "row", 4);
+    deleteColumn(FAMILY, "col8", "row");
+    region.flushcache();
+
+    // memStore: w/o deleteColumn
+    for (int i = 1; i < 8; i++) {
+      for (int j = 1; j < 6; j++) {
+        putData(FAMILY, "row", "col"+i, j);
+      }
+    }
+
+    /**
+     * only the seeks for the KVs for which we don't have any deletes should be
+     * optimized, and since we have 3 col families we will have number of seeks
+     *
+     **/
+    int numSeeks;
+
+    HRegionServer.numOptimizedSeeks.set(0);
+    // No deletes or deleteColumn
+    kvs = getData(FAMILY, "row",  Arrays.asList("col1"), 4, 8);
+    assertTrue(kvs.length == 1);
+    numSeeks = HRegionServer.numOptimizedSeeks.get();
+    assertEquals(3, numSeeks);
+
+    HRegionServer.numOptimizedSeeks.set(0);
+    // w delete and no deleteColumn
+    kvs = getData(FAMILY, "row",  Arrays.asList("col2"), 3, 12);
+    assertTrue(kvs.length == 0);
+    numSeeks = HRegionServer.numOptimizedSeeks.get();
+    assertEquals(3, numSeeks);
+
+    HRegionServer.numOptimizedSeeks.set(0);
+    // w deleteColumn of the whole column
+    kvs = getData(FAMILY, "row",  Arrays.asList("col3"), 4, 9);
+    assertTrue(kvs.length == 0);
+    numSeeks = HRegionServer.numOptimizedSeeks.get();
+    assertEquals(3, numSeeks);
+
+    HRegionServer.numOptimizedSeeks.set(0);
+    // w deleteColumn to the specified version
+    kvs = getData(FAMILY, "row",  Arrays.asList("col6"), 2, 10);
+    assertTrue(kvs.length == 0);
+    numSeeks = HRegionServer.numOptimizedSeeks.get();
+    assertEquals(3, numSeeks);
+
+    HRegionServer.numOptimizedSeeks.set(0);
+    // w/o deleteColumn within a time range.
+    kvs = getDataInRange(FAMILY, "row",  Arrays.asList("col1"), 1, 3, 24);
+    assertTrue(kvs.length == 2);
+    numSeeks = HRegionServer.numOptimizedSeeks.get();
+    assertEquals(6, numSeeks);
+
+    HRegionServer.numOptimizedSeeks.set(0);
+    // w deleteColumn within a time range.
+    kvs = getDataInRange(FAMILY, "row",  Arrays.asList("col6"), 1, 3, 10);
+    assertTrue(kvs.length == 0);
+    numSeeks = HRegionServer.numOptimizedSeeks.get();
+    assertEquals(3, numSeeks);
+  }
+
+
   /**
    * This test will make a number of puts, and then do flush, then do another
    * series of puts. Then we will test the number of blocks read while doing get