You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2020/01/31 06:13:21 UTC

[hbase] branch branch-2 updated: HBASE-23350 Make compaction files cacheonWrite configurable based on threshold

This is an automated email from the ASF dual-hosted git repository.

ramkrishna pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new bf924cc  HBASE-23350 Make compaction files cacheonWrite configurable based on threshold
bf924cc is described below

commit bf924ccdaa8b71c319da63ce91e614cd167cd7bd
Author: abhinaba.sarkar <ab...@gmail.com>
AuthorDate: Fri Jan 31 11:35:11 2020 +0530

    HBASE-23350 Make compaction files cacheonWrite configurable based on threshold
    
    Signed-off-by: ramkrish86 <ra...@apache.org>
---
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  | 38 ++++++++++-
 .../apache/hadoop/hbase/regionserver/HStore.java   | 26 ++++++--
 .../hbase/regionserver/compactions/Compactor.java  | 11 +++-
 .../hadoop/hbase/client/TestFromClientSide.java    |  8 ++-
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java    | 75 +++++++++++++++++++---
 .../compactions/TestDateTieredCompactor.java       |  2 +
 .../compactions/TestStripeCompactionPolicy.java    |  3 +
 .../compactions/TestStripeCompactor.java           |  2 +
 8 files changed, 143 insertions(+), 22 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 2558e1e..dfce791 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -87,6 +87,13 @@ public class CacheConfig {
   public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY =
       "hbase.rs.cachecompactedblocksonwrite";
 
+  /**
+   * Configuration key to determine total size in bytes of compacted files beyond which we do not
+   * cache blocks on compaction
+   */
+  public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY =
+      "hbase.rs.cachecompactedblocksonwrite.threshold";
+
   public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
       "hbase.hfile.drop.behind.compaction";
 
@@ -101,6 +108,7 @@ public class CacheConfig {
   public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
   public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
   public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
+  public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = Long.MAX_VALUE;
 
   /**
    * Whether blocks should be cached on read (default is on if there is a
@@ -136,6 +144,11 @@ public class CacheConfig {
    */
   private final boolean cacheCompactedDataOnWrite;
 
+  /**
+   * Determine threshold beyond which we do not cache blocks on compaction
+   */
+  private long cacheCompactedDataOnWriteThreshold;
+
   private final boolean dropBehindCompaction;
 
   // Local reference to the block cache
@@ -188,6 +201,7 @@ public class CacheConfig {
         (family == null ? false : family.isPrefetchBlocksOnOpen());
     this.cacheCompactedDataOnWrite = conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
       DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
+    this.cacheCompactedDataOnWriteThreshold = getCacheCompactedBlocksOnWriteThreshold(conf);
     this.blockCache = blockCache;
     this.byteBuffAllocator = byteBuffAllocator;
     LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) +
@@ -208,6 +222,7 @@ public class CacheConfig {
     this.cacheDataCompressed = cacheConf.cacheDataCompressed;
     this.prefetchOnOpen = cacheConf.prefetchOnOpen;
     this.cacheCompactedDataOnWrite = cacheConf.cacheCompactedDataOnWrite;
+    this.cacheCompactedDataOnWriteThreshold = cacheConf.cacheCompactedDataOnWriteThreshold;
     this.dropBehindCompaction = cacheConf.dropBehindCompaction;
     this.blockCache = cacheConf.blockCache;
     this.byteBuffAllocator = cacheConf.byteBuffAllocator;
@@ -275,7 +290,6 @@ public class CacheConfig {
     this.cacheDataOnWrite = cacheDataOnWrite;
   }
 
-
   /**
    * Enable cache on write including:
    * cacheDataOnWrite
@@ -288,7 +302,6 @@ public class CacheConfig {
     this.cacheBloomsOnWrite = true;
   }
 
-
   /**
    * @return true if index blocks should be written to the cache when an HFile
    *         is written, false if not
@@ -357,6 +370,12 @@ public class CacheConfig {
   }
 
   /**
+   * @return total file size in bytes threshold for caching while writing during compaction
+   */
+  public long getCacheCompactedBlocksOnWriteThreshold() {
+    return this.cacheCompactedDataOnWriteThreshold;
+  }
+  /**
    * Return true if we may find this type of block in block cache.
    * <p>
    * TODO: today {@code family.isBlockCacheEnabled()} only means {@code cacheDataOnRead}, so here we
@@ -412,6 +431,21 @@ public class CacheConfig {
     return this.byteBuffAllocator;
   }
 
+  private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) {
+    long cacheCompactedBlocksOnWriteThreshold = conf
+      .getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
+        DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
+
+    if (cacheCompactedBlocksOnWriteThreshold < 0) {
+      LOG.warn(
+        "cacheCompactedBlocksOnWriteThreshold value : {} is less than 0, resetting it to: {}",
+        cacheCompactedBlocksOnWriteThreshold, DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
+      cacheCompactedBlocksOnWriteThreshold = DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD;
+    }
+
+    return cacheCompactedBlocksOnWriteThreshold;
+  }
+
   @Override
   public String toString() {
     return "cacheDataOnRead=" + shouldCacheDataOnRead() + ", cacheDataOnWrite="
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a0ad3e8..167bb3e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1110,6 +1110,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     return sf;
   }
 
+  public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
+    boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
+    boolean shouldDropBehind) throws IOException {
+    return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
+      includesTag, shouldDropBehind, -1);
+  }
+
   /**
    * @param compression Compression algorithm to use
    * @param isCompaction whether we are creating a new file in a compaction
@@ -1121,17 +1128,19 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   // compaction
   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
-      boolean shouldDropBehind) throws IOException {
-    final CacheConfig writerCacheConf;
+      boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
+    // creating new cache config for each new writer
+    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
     if (isCompaction) {
       // Don't cache data on write on compactions, unless specifically configured to do so
-      writerCacheConf = new CacheConfig(cacheConf);
+      // Cache only when total file size remains lower than configured threshold
       final boolean cacheCompactedBlocksOnWrite =
         cacheConf.shouldCacheCompactedBlocksOnWrite();
       // if data blocks are to be cached on write
       // during compaction, we should forcefully
       // cache index and bloom blocks as well
-      if (cacheCompactedBlocksOnWrite) {
+      if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
+        .getCacheCompactedBlocksOnWriteThreshold()) {
         writerCacheConf.enableCacheOnWrite();
         if (!cacheOnWriteLogged) {
           LOG.info("For Store {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
@@ -1141,9 +1150,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         }
       } else {
         writerCacheConf.setCacheDataOnWrite(false);
+        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
+          // checking condition once again for logging
+          LOG.debug(
+            "For Store {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
+              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
+            getColumnFamilyName(), totalCompactedFilesSize,
+            cacheConf.getCacheCompactedBlocksOnWriteThreshold());
+        }
       }
     } else {
-      writerCacheConf = cacheConf;
       final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
       if (shouldCacheDataOnWrite) {
         writerCacheConf.enableCacheOnWrite();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 63bf130..10fac55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -130,6 +130,8 @@ public abstract class Compactor<T extends CellSink> {
     public int maxTagsLength = 0;
     /** Min SeqId to keep during a major compaction **/
     public long minSeqIdToKeep = 0;
+    /** Total size of the compacted files **/
+    private long totalCompactedFilesSize = 0;
   }
 
   /**
@@ -166,6 +168,10 @@ public abstract class Compactor<T extends CellSink> {
       fd.maxKeyCount += keyCount;
       // calculate the latest MVCC readpoint in any of the involved store files
       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
+
+      // calculate the total size of the compacted files
+      fd.totalCompactedFilesSize += r.length();
+
       byte[] tmp = null;
       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
       // SeqId number.
@@ -260,8 +266,9 @@ public abstract class Compactor<T extends CellSink> {
       throws IOException {
     // When all MVCC readpoints are 0, don't write them.
     // See HBASE-8166, HBASE-12600, and HBASE-13389.
-    return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
-    fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind);
+    return store
+      .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
+        fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize);
   }
 
   private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 622bc39..5cb9ffd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -5424,8 +5424,9 @@ public class TestFromClientSide {
         System.out.println("Flushing cache");
         region.flush(true);
 
-        // + 1 for Index Block
-        assertEquals(++expectedBlockCount + 1, cache.getBlockCount());
+        // + 1 for Index Block, +1 for data block
+        expectedBlockCount += 2;
+        assertEquals(expectedBlockCount, cache.getBlockCount());
         assertEquals(expectedBlockHits, cache.getStats().getHitCount());
         assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
         // compact, net minus two blocks, two hits, no misses
@@ -5436,7 +5437,8 @@ public class TestFromClientSide {
         store.closeAndArchiveCompactedFiles();
         waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
         assertEquals(1, store.getStorefilesCount());
-        expectedBlockCount -= 2; // evicted two blocks, cached none
+        // evicted two data blocks and two index blocks and compaction does not cache new blocks
+        expectedBlockCount = 0;
         assertEquals(expectedBlockCount, cache.getBlockCount());
         expectedBlockHits += 2;
         assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 609ff9d..119d26c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -127,6 +127,10 @@ public class TestCacheOnWrite {
     BlockType.DATA
   );
 
+  // All test cases are supposed to generate files for compaction within this range
+  private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L;
+  private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L;
+
   /** The number of valid key types possible in a store file */
   private static final int NUM_VALID_KEY_TYPES =
       KeyValue.Type.values().length - 2;
@@ -424,15 +428,31 @@ public class TestCacheOnWrite {
   }
 
   private void testCachingDataBlocksDuringCompactionInternals(boolean useTags,
-      boolean cacheBlocksOnCompaction) throws IOException, InterruptedException {
+    boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold)
+    throws IOException, InterruptedException {
     // create a localConf
-    boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
-      false);
+    boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false);
+    long localCacheCompactedBlocksThreshold = conf
+      .getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
+        CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
+    boolean localCacheBloomBlocksValue = conf
+      .getBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
+        CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE);
+    boolean localCacheIndexBlocksValue = conf
+      .getBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
+        CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE);
+
     try {
       // Set the conf if testing caching compacted blocks on write
       conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
         cacheBlocksOnCompaction);
 
+      // set size threshold if testing compaction size threshold
+      if (cacheBlocksOnCompactionThreshold > 0) {
+        conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
+          cacheBlocksOnCompactionThreshold);
+      }
+
       // TODO: need to change this test if we add a cache size threshold for
       // compactions, or if we implement some other kind of intelligent logic for
       // deciding what blocks to cache-on-write on compaction.
@@ -467,7 +487,9 @@ public class TestCacheOnWrite {
                     HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
                 p.add(kv);
               } else {
-                p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
+                KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
+                  ts++, Bytes.toBytes(valueStr));
+                p.add(kv);
               }
             }
           }
@@ -507,11 +529,33 @@ public class TestCacheOnWrite {
         "\ncacheBlocksOnCompaction: "
         + cacheBlocksOnCompaction + "\n";
 
-      assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached);
+      if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) {
+        if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) {
+          assertTrue(assertErrorMessage, dataBlockCached);
+          assertTrue(assertErrorMessage, bloomBlockCached);
+          assertTrue(assertErrorMessage, indexBlockCached);
+        } else {
+          assertFalse(assertErrorMessage, dataBlockCached);
+
+          if (localCacheBloomBlocksValue) {
+            assertTrue(assertErrorMessage, bloomBlockCached);
+          } else {
+            assertFalse(assertErrorMessage, bloomBlockCached);
+          }
 
-      if (cacheOnCompactAndNonBucketCache) {
-        assertTrue(assertErrorMessage, bloomBlockCached);
-        assertTrue(assertErrorMessage, indexBlockCached);
+          if (localCacheIndexBlocksValue) {
+            assertTrue(assertErrorMessage, indexBlockCached);
+          } else {
+            assertFalse(assertErrorMessage, indexBlockCached);
+          }
+        }
+      } else {
+        assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached);
+
+        if (cacheOnCompactAndNonBucketCache) {
+          assertTrue(assertErrorMessage, bloomBlockCached);
+          assertTrue(assertErrorMessage, indexBlockCached);
+        }
       }
 
 
@@ -519,6 +563,10 @@ public class TestCacheOnWrite {
     } finally {
       // reset back
       conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue);
+      conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
+        localCacheCompactedBlocksThreshold);
+      conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue);
+      conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue);
     }
   }
 
@@ -530,8 +578,15 @@ public class TestCacheOnWrite {
 
   @Test
   public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
-    testCachingDataBlocksDuringCompactionInternals(false, false);
-    testCachingDataBlocksDuringCompactionInternals(true, true);
+    testCachingDataBlocksDuringCompactionInternals(false, false, -1);
+    testCachingDataBlocksDuringCompactionInternals(true, true, -1);
+  }
+
+  @Test
+  public void testCachingDataBlocksThresholdDuringCompaction()
+    throws IOException, InterruptedException {
+    testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD);
+    testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD);
   }
 
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
index 698dc81..812ee4b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -108,6 +108,8 @@ public class TestDateTieredCompactor {
     when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
     when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
       anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
+      anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
     when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
     OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
     when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index aee3dc6..3a64c84 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -785,6 +785,9 @@ public class TestStripeCompactionPolicy {
     when(
       store.createWriterInTmp(anyLong(), any(), anyBoolean(),
         anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+    when(
+      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
+        anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
 
     Configuration conf = HBaseConfiguration.create();
     conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
index 8b5df72..0221274 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -208,6 +208,8 @@ public class TestStripeCompactor {
     when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
     when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
       anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
+      anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
     when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
 
     return new StripeCompactor(conf, store) {