You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/04/25 19:49:11 UTC

svn commit: r1475871 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: sershe
Date: Thu Apr 25 17:49:11 2013
New Revision: 1475871

URL: http://svn.apache.org/r1475871
Log:
HBASE-8024 Make Store flush algorithm pluggable

Added:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java Thu Apr 25 17:49:11 2013
@@ -37,13 +37,17 @@ import org.apache.hadoop.hbase.regionser
  */
 @InterfaceAudience.Private
 public class DefaultStoreEngine extends StoreEngine<
-    RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
+  DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
 
+  public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
+      "hbase.hstore.defaultengine.storeflusher.class";
   public static final String DEFAULT_COMPACTOR_CLASS_KEY =
       "hbase.hstore.defaultengine.compactor.class";
   public static final String DEFAULT_COMPACTION_POLICY_CLASS_KEY =
       "hbase.hstore.defaultengine.compactionpolicy.class";
 
+  private static final Class<? extends DefaultStoreFlusher>
+    DEFAULT_STORE_FLUSHER_CLASS = DefaultStoreFlusher.class;
   private static final Class<? extends DefaultCompactor>
     DEFAULT_COMPACTOR_CLASS = DefaultCompactor.class;
   private static final Class<? extends RatioBasedCompactionPolicy>
@@ -69,8 +73,17 @@ public class DefaultStoreEngine extends 
     } catch (Exception e) {
       throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
     }
+    className = conf.get(
+        DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
+    try {
+      storeFlusher = ReflectionUtils.instantiateWithCustomCtor(className,
+          new Class[] { Configuration.class, Store.class }, new Object[] { conf, store });
+    } catch (Exception e) {
+      throw new IOException("Unable to load configured store flusher '" + className + "'", e);
+    }
   }
 
+
   @Override
   public CompactionContext createCompaction() {
     return new DefaultCompactionContext();

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java?rev=1475871&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java Thu Apr 25 17:49:11 2013
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Default implementation of StoreFlusher.
+ */
+public class DefaultStoreFlusher extends StoreFlusher {
+  private static final Log LOG = LogFactory.getLog(DefaultStoreFlusher.class);
+  private final Object flushLock = new Object();
+
+  public DefaultStoreFlusher(Configuration conf, Store store) {
+    super(conf, store);
+  }
+
+  @Override
+  public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
+      TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize,
+      MonitoredTask status) throws IOException {
+    ArrayList<Path> result = new ArrayList<Path>();
+    if (snapshot.size() == 0) return result; // don't flush if there are no entries
+
+    // Use a store scanner to find which rows to flush.
+    long smallestReadPoint = store.getSmallestReadPoint();
+    KeyValueScanner memstoreScanner =
+        new CollectionBackedScanner(snapshot, store.getComparator());
+    InternalScanner scanner = preCreateCoprocScanner(memstoreScanner);
+    if (scanner == null) {
+      scanner = createStoreScanner(smallestReadPoint, memstoreScanner);
+    }
+    scanner = postCreateCoprocScanner(scanner);
+    if (scanner == null) {
+      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
+    }
+
+    StoreFile.Writer writer;
+    long flushed = 0;
+    try {
+      // TODO:  We can fail in the below block before we complete adding this flush to
+      //        list of store files.  Add cleanup of anything put on filesystem if we fail.
+      synchronized (flushLock) {
+        status.setStatus("Flushing " + store + ": creating writer");
+        // Write the map out to the disk
+        writer = store.createWriterInTmp(
+            snapshot.size(), store.getFamily().getCompression(), false, true);
+        writer.setTimeRangeTracker(snapshotTimeRangeTracker);
+        try {
+          flushed = performFlush(scanner, writer, smallestReadPoint);
+        } finally {
+          finalizeWriter(writer, cacheFlushId, status);
+        }
+      }
+    } finally {
+      flushedSize.set(flushed);
+      scanner.close();
+    }
+    LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
+        + StringUtils.humanReadableInt(flushed) +", into tmp file " + writer.getPath());
+    result.add(writer.getPath());
+    return result;
+  }
+}

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Apr 25 17:49:11 2013
@@ -1457,7 +1457,7 @@ public class HRegion implements HeapSize
     this.updatesLock.writeLock().lock();
     long flushsize = this.memstoreSize.get();
     status.setStatus("Preparing to flush by snapshotting stores");
-    List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
+    List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
     long flushSeqId = -1L;
     try {
       // Record the mvcc for all transactions in progress.
@@ -1477,12 +1477,12 @@ public class HRegion implements HeapSize
       }
 
       for (Store s : stores.values()) {
-        storeFlushers.add(s.getStoreFlusher(flushSeqId));
+        storeFlushCtxs.add(s.createFlushContext(flushSeqId));
       }
 
       // prepare flush (take a snapshot)
-      for (StoreFlusher flusher : storeFlushers) {
-        flusher.prepare();
+      for (StoreFlushContext flush : storeFlushCtxs) {
+        flush.prepare();
       }
     } finally {
       this.updatesLock.writeLock().unlock();
@@ -1519,19 +1519,19 @@ public class HRegion implements HeapSize
       // just-made new flush store file. The new flushed file is still in the
       // tmp directory.
 
-      for (StoreFlusher flusher : storeFlushers) {
-        flusher.flushCache(status);
+      for (StoreFlushContext flush : storeFlushCtxs) {
+        flush.flushCache(status);
       }
 
       // Switch snapshot (in memstore) -> new hfile (thus causing
       // all the store scanners to reset/reseek).
-      for (StoreFlusher flusher : storeFlushers) {
-        boolean needsCompaction = flusher.commit(status);
+      for (StoreFlushContext flush : storeFlushCtxs) {
+        boolean needsCompaction = flush.commit(status);
         if (needsCompaction) {
           compactionRequested = true;
         }
       }
-      storeFlushers.clear();
+      storeFlushCtxs.clear();
 
       // Set down the memstore size by amount of flush.
       this.addAndGetGlobalMemstoreSize(-flushsize);

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Apr 25 17:49:11 2013
@@ -123,7 +123,6 @@ public class HStore implements Store {
   static int closeCheckInterval = 0;
   private volatile long storeSize = 0L;
   private volatile long totalUncompressedBytes = 0L;
-  private final Object flushLock = new Object();
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final boolean verifyBulkLoads;
 
@@ -145,14 +144,14 @@ public class HStore implements Store {
   // Comparing KeyValues
   private final KeyValue.KVComparator comparator;
 
-  final StoreEngine<?, ?, ?> storeEngine;
+  final StoreEngine<?, ?, ?, ?> storeEngine;
 
   private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
   private final OffPeakHours offPeakHours;
 
   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
-  private static int flush_retries_number;
-  private static int pauseTime;
+  private int flushRetriesNumber;
+  private int pauseTime;
 
   private long blockingFileCount;
 
@@ -222,17 +221,13 @@ public class HStore implements Store {
     this.checksumType = getChecksumType(conf);
     // initilize bytes per checksum
     this.bytesPerChecksum = getBytesPerChecksum(conf);
-    // Create a compaction manager.
-    if (HStore.flush_retries_number == 0) {
-      HStore.flush_retries_number = conf.getInt(
-          "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
-      HStore.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE,
-          HConstants.DEFAULT_HBASE_SERVER_PAUSE);
-      if (HStore.flush_retries_number <= 0) {
-        throw new IllegalArgumentException(
-            "hbase.hstore.flush.retries.number must be > 0, not "
-                + HStore.flush_retries_number);
-      }
+    flushRetriesNumber = conf.getInt(
+        "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
+    pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
+    if (flushRetriesNumber <= 0) {
+      throw new IllegalArgumentException(
+          "hbase.hstore.flush.retries.number must be > 0, not "
+              + flushRetriesNumber);
     }
   }
 
@@ -644,10 +639,10 @@ public class HStore implements Store {
    * @param snapshotTimeRangeTracker
    * @param flushedSize The number of bytes flushed
    * @param status
-   * @return Path The path name of the tmp file to which the store was flushed
+   * @return The path name of the tmp file to which the store was flushed
    * @throws IOException
    */
-  protected Path flushCache(final long logCacheFlushId,
+  protected List<Path> flushCache(final long logCacheFlushId,
       SortedSet<KeyValue> snapshot,
       TimeRangeTracker snapshotTimeRangeTracker,
       AtomicLong flushedSize,
@@ -657,20 +652,21 @@ public class HStore implements Store {
     // 'snapshot', the next time flush comes around.
     // Retry after catching exception when flushing, otherwise server will abort
     // itself
+    StoreFlusher flusher = storeEngine.getStoreFlusher();
     IOException lastException = null;
-    for (int i = 0; i < HStore.flush_retries_number; i++) {
+    for (int i = 0; i < flushRetriesNumber; i++) {
       try {
-        Path pathName = internalFlushCache(snapshot, logCacheFlushId,
-            snapshotTimeRangeTracker, flushedSize, status);
+        List<Path> pathNames = flusher.flushSnapshot(
+            snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
+        Path lastPathName = null;
         try {
-          // Path name is null if there is no entry to flush
-          if (pathName != null) {
+          for (Path pathName : pathNames) {
+            lastPathName = pathName;
             validateStoreFile(pathName);
           }
-          return pathName;
+          return pathNames;
         } catch (Exception e) {
-          LOG.warn("Failed validating store file " + pathName
-              + ", retring num=" + i, e);
+          LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
           if (e instanceof IOException) {
             lastException = (IOException) e;
           } else {
@@ -695,109 +691,6 @@ public class HStore implements Store {
   }
 
   /*
-   * @param cache
-   * @param logCacheFlushId
-   * @param snapshotTimeRangeTracker
-   * @param flushedSize The number of bytes flushed
-   * @return Path The path name of the tmp file to which the store was flushed
-   * @throws IOException
-   */
-  private Path internalFlushCache(final SortedSet<KeyValue> set,
-      final long logCacheFlushId,
-      TimeRangeTracker snapshotTimeRangeTracker,
-      AtomicLong flushedSize,
-      MonitoredTask status)
-      throws IOException {
-    StoreFile.Writer writer;
-    // Find the smallest read point across all the Scanners.
-    long smallestReadPoint = region.getSmallestReadPoint();
-    long flushed = 0;
-    Path pathName;
-    // Don't flush if there are no entries.
-    if (set.size() == 0) {
-      return null;
-    }
-    // Use a store scanner to find which rows to flush.
-    // Note that we need to retain deletes, hence
-    // treat this as a minor compaction.
-    InternalScanner scanner = null;
-    KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
-    if (this.getCoprocessorHost() != null) {
-      scanner = this.getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
-    }
-    if (scanner == null) {
-      Scan scan = new Scan();
-      scan.setMaxVersions(scanInfo.getMaxVersions());
-      scanner = new StoreScanner(this, scanInfo, scan,
-          Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
-          smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
-    }
-    if (this.getCoprocessorHost() != null) {
-      InternalScanner cpScanner =
-        this.getCoprocessorHost().preFlush(this, scanner);
-      // NULL scanner returned from coprocessor hooks means skip normal processing
-      if (cpScanner == null) {
-        return null;
-      }
-      scanner = cpScanner;
-    }
-    try {
-      int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
-      // TODO:  We can fail in the below block before we complete adding this
-      // flush to list of store files.  Add cleanup of anything put on filesystem
-      // if we fail.
-      synchronized (flushLock) {
-        status.setStatus("Flushing " + this + ": creating writer");
-        // A. Write the map out to the disk
-        writer = createWriterInTmp(set.size());
-        writer.setTimeRangeTracker(snapshotTimeRangeTracker);
-        pathName = writer.getPath();
-        try {
-          List<KeyValue> kvs = new ArrayList<KeyValue>();
-          boolean hasMore;
-          do {
-            hasMore = scanner.next(kvs, compactionKVMax);
-            if (!kvs.isEmpty()) {
-              for (KeyValue kv : kvs) {
-                // If we know that this KV is going to be included always, then let us
-                // set its memstoreTS to 0. This will help us save space when writing to
-                // disk.
-                if (kv.getMemstoreTS() <= smallestReadPoint) {
-                  // let us not change the original KV. It could be in the memstore
-                  // changing its memstoreTS could affect other threads/scanners.
-                  kv = kv.shallowCopy();
-                  kv.setMemstoreTS(0);
-                }
-                writer.append(kv);
-                flushed += this.memstore.heapSizeChange(kv, true);
-              }
-              kvs.clear();
-            }
-          } while (hasMore);
-        } finally {
-          // Write out the log sequence number that corresponds to this output
-          // hfile. Also write current time in metadata as minFlushTime.
-          // The hfile is current up to and including logCacheFlushId.
-          status.setStatus("Flushing " + this + ": appending metadata");
-          writer.appendMetadata(logCacheFlushId, false);
-          status.setStatus("Flushing " + this + ": closing flushed file");
-          writer.close();
-        }
-      }
-    } finally {
-      flushedSize.set(flushed);
-      scanner.close();
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Flushed " +
-               ", sequenceid=" + logCacheFlushId +
-               ", memsize=" + StringUtils.humanReadableInt(flushed) +
-               ", into tmp file " + pathName);
-    }
-    return pathName;
-  }
-
-  /*
    * @param path The pathname of the tmp file into which the store was flushed
    * @param logCacheFlushId
    * @return StoreFile created.
@@ -871,17 +764,18 @@ public class HStore implements Store {
 
   /*
    * Change storeFiles adding into place the Reader produced by this new flush.
-   * @param sf
-   * @param set That was used to make the passed file <code>p</code>.
+   * @param sfs Store files
+   * @param set That was used to make the passed file.
    * @throws IOException
    * @return Whether compaction is required.
    */
-  private boolean updateStorefiles(final StoreFile sf,
-                                   final SortedSet<KeyValue> set)
-  throws IOException {
+  private boolean updateStorefiles(
+      final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
     this.lock.writeLock().lock();
     try {
-      this.storeEngine.getStoreFileManager().insertNewFile(sf);
+      for (StoreFile sf : sfs) {
+        this.storeEngine.getStoreFileManager().insertNewFile(sf);
+      }
       this.memstore.clearSnapshot(set);
     } finally {
       // We need the lock, as long as we are updating the storeFiles
@@ -1746,22 +1640,20 @@ public class HStore implements Store {
     }
   }
 
-  public StoreFlusher getStoreFlusher(long cacheFlushId) {
+  public StoreFlushContext createFlushContext(long cacheFlushId) {
     return new StoreFlusherImpl(cacheFlushId);
   }
 
-  private class StoreFlusherImpl implements StoreFlusher {
+  private class StoreFlusherImpl implements StoreFlushContext {
 
-    private long cacheFlushId;
+    private long cacheFlushSeqNum;
     private SortedSet<KeyValue> snapshot;
-    private StoreFile storeFile;
-    private Path storeFilePath;
+    private List<Path> tempFiles;
     private TimeRangeTracker snapshotTimeRangeTracker;
-    private AtomicLong flushedSize;
+    private final AtomicLong flushedSize = new AtomicLong();
 
-    private StoreFlusherImpl(long cacheFlushId) {
-      this.cacheFlushId = cacheFlushId;
-      this.flushedSize = new AtomicLong();
+    private StoreFlusherImpl(long cacheFlushSeqNum) {
+      this.cacheFlushSeqNum = cacheFlushSeqNum;
     }
 
     @Override
@@ -1773,24 +1665,43 @@ public class HStore implements Store {
 
     @Override
     public void flushCache(MonitoredTask status) throws IOException {
-      storeFilePath = HStore.this.flushCache(
-        cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status);
+      tempFiles = HStore.this.flushCache(
+        cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
     }
 
     @Override
     public boolean commit(MonitoredTask status) throws IOException {
-      if (storeFilePath == null) {
+      if (this.tempFiles == null || this.tempFiles.isEmpty()) {
         return false;
       }
-      storeFile = HStore.this.commitFile(storeFilePath, cacheFlushId,
-                               snapshotTimeRangeTracker, flushedSize, status);
-      if (HStore.this.getCoprocessorHost() != null) {
-        HStore.this.getCoprocessorHost().postFlush(HStore.this, storeFile);
+      List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
+      for (Path storeFilePath : tempFiles) {
+        try {
+          storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
+              snapshotTimeRangeTracker, flushedSize, status));
+        } catch (IOException ex) {
+          LOG.error("Failed to commit store file " + storeFilePath, ex);
+          // Try to delete the files we have committed before.
+          for (StoreFile sf : storeFiles) {
+            Path pathToDelete = sf.getPath();
+            try {
+              sf.deleteReader();
+            } catch (IOException deleteEx) {
+              LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
+              Runtime.getRuntime().halt(1);
+            }
+          }
+          throw new IOException("Failed to commit the flush", ex);
+        }
       }
 
-      // Add new file to store files.  Clear snapshot too while we have
-      // the Store write lock.
-      return HStore.this.updateStorefiles(storeFile, snapshot);
+      if (HStore.this.getCoprocessorHost() != null) {
+        for (StoreFile sf : storeFiles) {
+          HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
+        }
+      }
+      // Add new file to store files.  Clear snapshot too while we have the Store write lock.
+      return HStore.this.updateStorefiles(storeFiles, snapshot);
     }
   }
 
@@ -1806,8 +1717,8 @@ public class HStore implements Store {
   }
 
   public static final long FIXED_OVERHEAD =
-      ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
-              + (2 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
+      ClassSize.align(ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+              + (4 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Apr 25 17:49:11 2013
@@ -958,7 +958,7 @@ public class MemStore implements HeapSiz
    * @param notpresent True if the kv was NOT present in the set.
    * @return Size
    */
-  long heapSizeChange(final KeyValue kv, final boolean notpresent) {
+  static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
     return notpresent ?
         ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
         0;

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Apr 25 17:49:11 2013
@@ -183,7 +183,7 @@ public interface Store extends HeapSize,
 
   public int getCompactPriority();
 
-  public StoreFlusher getStoreFlusher(long cacheFlushId);
+  public StoreFlushContext createFlushContext(long cacheFlushId);
 
   // Split oriented methods
 
@@ -320,4 +320,4 @@ public interface Store extends HeapSize,
    * @return Whether this store has too many store files.
    */
   public boolean hasTooManyStoreFiles();
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java Thu Apr 25 17:49:11 2013
@@ -35,8 +35,9 @@ import org.apache.hadoop.hbase.util.Refl
  * they are tied together and replaced together via StoreEngine-s.
  */
 @InterfaceAudience.Private
-public abstract class StoreEngine<
-  CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
+public abstract class StoreEngine<SF extends StoreFlusher,
+    CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
+  protected SF storeFlusher;
   protected CP compactionPolicy;
   protected C compactor;
   protected SFM storeFileManager;
@@ -47,7 +48,7 @@ public abstract class StoreEngine<
    */
   public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
 
-  private static final Class<? extends StoreEngine<?, ?, ?>>
+  private static final Class<? extends StoreEngine<?, ?, ?, ?>>
     DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
 
   /**
@@ -72,6 +73,13 @@ public abstract class StoreEngine<
   }
 
   /**
+   * @return Store flusher to use.
+   */
+  public StoreFlusher getStoreFlusher() {
+    return this.storeFlusher;
+  }
+
+  /**
    * Creates an instance of a compaction context specific to this engine.
    * Doesn't actually select or start a compaction. See CompactionContext class comment.
    * @return New CompactionContext object.
@@ -86,9 +94,11 @@ public abstract class StoreEngine<
 
   private void createComponentsOnce(
       Configuration conf, Store store, KVComparator kvComparator) throws IOException {
-    assert compactor == null && compactionPolicy == null && storeFileManager == null;
+    assert compactor == null && compactionPolicy == null
+        && storeFileManager == null && storeFlusher == null;
     createComponents(conf, store, kvComparator);
-    assert compactor != null && compactionPolicy != null && storeFileManager != null;
+    assert compactor != null && compactionPolicy != null
+        && storeFileManager != null && storeFlusher != null;
   }
 
   /**
@@ -99,11 +109,11 @@ public abstract class StoreEngine<
    * @param kvComparator KVComparator for storeFileManager.
    * @return StoreEngine to use.
    */
-  public static StoreEngine<?, ?, ?> create(
+  public static StoreEngine<?, ?, ?, ?> create(
       Store store, Configuration conf, KVComparator kvComparator) throws IOException {
     String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
     try {
-      StoreEngine<?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
+      StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
           className, new Class[] { }, new Object[] { });
       se.createComponentsOnce(conf, store, kvComparator);
       return se;

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java?rev=1475871&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java Thu Apr 25 17:49:11 2013
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+
+/**
+ * A package protected interface for a store flushing.
+ * A store flush context carries the state required to prepare/flush/commit the store's cache.
+ */
+@InterfaceAudience.Private
+interface StoreFlushContext {
+
+  /**
+   * Prepare for a store flush (create snapshot)
+   *
+   * Requires pausing writes.
+   *
+   * A very short operation.
+   */
+  void prepare();
+
+  /**
+   * Flush the cache (create the new store file)
+   *
+   * A length operation which doesn't require locking out any function
+   * of the store.
+   *
+   * @throws IOException in case the flush fails
+   */
+  void flushCache(MonitoredTask status) throws IOException;
+
+  /**
+   * Commit the flush - add the store file to the store and clear the
+   * memstore snapshot.
+   *
+   * Requires pausing scans.
+   *
+   * A very short operation
+   *
+   * @return
+   * @throws IOException
+   */
+  boolean commit(MonitoredTask status) throws IOException;
+}

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Thu Apr 25 17:49:11 2013
@@ -19,47 +19,125 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 
 /**
- * A package protected interface for a store flushing.
- * A store flusher carries the state required to prepare/flush/commit the
- * store's cache.
+ * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
+ * Custom implementation can be provided.
  */
 @InterfaceAudience.Private
-interface StoreFlusher {
+abstract class StoreFlusher {
+  protected Configuration conf;
+  protected Store store;
+
+  public StoreFlusher(Configuration conf, Store store) {
+    this.conf = conf;
+    this.store = store;
+  }
 
   /**
-   * Prepare for a store flush (create snapshot)
-   *
-   * Requires pausing writes.
-   *
-   * A very short operation.
+   * Turns a snapshot of memstore into a set of store files.
+   * @param snapshot Memstore snapshot.
+   * @param cacheFlushSeqNum Log cache flush sequence number.
+   * @param snapshotTimeRangeTracker Time range tracker from the memstore
+   *                                 pertaining to the snapshot.
+   * @param flushedSize Out parameter for the size of the KVs flushed.
+   * @param status Task that represents the flush operation and may be updated with status.
+   * @return List of files written. Can be empty; must not be null.
    */
-  void prepare();
+  public abstract List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
+      TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
+      throws IOException;
+
+  protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
+      MonitoredTask status) throws IOException {
+    // Write out the log sequence number that corresponds to this output
+    // hfile. Also write current time in metadata as minFlushTime.
+    // The hfile is current up to and including cacheFlushSeqNum.
+    status.setStatus("Flushing " + store + ": appending metadata");
+    writer.appendMetadata(cacheFlushSeqNum, false);
+    status.setStatus("Flushing " + store + ": closing flushed file");
+    writer.close();
+  }
+
+  /** Calls coprocessor to create a flush scanner based on memstore scanner */
+  protected InternalScanner preCreateCoprocScanner(
+      KeyValueScanner memstoreScanner) throws IOException {
+    if (store.getCoprocessorHost() != null) {
+      return store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
+    }
+    return null;
+  }
+
+  /** Creates the default flush scanner based on memstore scanner */
+  protected InternalScanner createStoreScanner(long smallestReadPoint,
+      KeyValueScanner memstoreScanner) throws IOException {
+    Scan scan = new Scan();
+    scan.setMaxVersions(store.getScanInfo().getMaxVersions());
+    return new StoreScanner(store, store.getScanInfo(), scan,
+        Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+        smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
+  }
 
   /**
-   * Flush the cache (create the new store file)
-   *
-   * A length operation which doesn't require locking out any function
-   * of the store.
-   *
-   * @throws IOException in case the flush fails
+   * Calls coprocessor to create a scanner based on default flush scanner
+   * @return new or default scanner; if null, flush should not proceed.
    */
-  void flushCache(MonitoredTask status) throws IOException;
+  protected  InternalScanner postCreateCoprocScanner(InternalScanner scanner)
+      throws IOException {
+    if (store.getCoprocessorHost() != null) {
+      return store.getCoprocessorHost().preFlush(store, scanner);
+    }
+    return scanner;
+  }
 
   /**
-   * Commit the flush - add the store file to the store and clear the
-   * memstore snapshot.
-   *
-   * Requires pausing scans.
-   *
-   * A very short operation
-   *
-   * @return
-   * @throws IOException
-   */
-  boolean commit(MonitoredTask status) throws IOException;
+   * Performs memstore flush, writing data from scanner into sink.
+   * @param scanner Scanner to get data from.
+   * @param sink Sink to write data to. Could be StoreFile.Writer.
+   * @param smallestReadPoint Smallest read point used for the flush.
+   * @return Bytes flushed.
+s   */
+  protected long performFlush(InternalScanner scanner,
+      Compactor.CellSink sink, long smallestReadPoint) throws IOException {
+    int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    boolean hasMore;
+    long flushed = 0;
+    do {
+      hasMore = scanner.next(kvs, compactionKVMax);
+      if (!kvs.isEmpty()) {
+        for (KeyValue kv : kvs) {
+          // If we know that this KV is going to be included always, then let us
+          // set its memstoreTS to 0. This will help us save space when writing to
+          // disk.
+          if (kv.getMemstoreTS() <= smallestReadPoint) {
+            // let us not change the original KV. It could be in the memstore
+            // changing its memstoreTS could affect other threads/scanners.
+            kv = kv.shallowCopy();
+            kv.setMemstoreTS(0);
+          }
+          sink.append(kv);
+          flushed += MemStore.heapSizeChange(kv, true);
+        }
+        kvs.clear();
+      }
+    } while (hasMore);
+    return flushed;
+  }
 }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java Thu Apr 25 17:49:11 2013
@@ -32,6 +32,12 @@ import org.mockito.Mockito;
 
 @Category(SmallTests.class)
 public class TestDefaultStoreEngine {
+  public static class DummyStoreFlusher extends DefaultStoreFlusher {
+    public DummyStoreFlusher(Configuration conf, Store store) {
+      super(conf, store);
+    }
+  }
+
   public static class DummyCompactor extends DefaultCompactor {
     public DummyCompactor(Configuration conf, Store store) {
       super(conf, store);
@@ -45,15 +51,18 @@ public class TestDefaultStoreEngine {
   }
 
   @Test
-  public void testCustomPolicyAndCompactor() throws Exception {
+  public void testCustomParts() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, DummyCompactor.class.getName());
     conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
         DummyCompactionPolicy.class.getName());
+    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+        DummyStoreFlusher.class.getName());
     Store mockStore = Mockito.mock(Store.class);
-    StoreEngine<?, ?, ?> se = StoreEngine.create(mockStore, conf, new KVComparator());
+    StoreEngine<?, ?, ?, ?> se = StoreEngine.create(mockStore, conf, new KVComparator());
     Assert.assertTrue(se instanceof DefaultStoreEngine);
     Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy);
+    Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);
     Assert.assertTrue(se.getCompactor() instanceof DummyCompactor);
   }
 }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Thu Apr 25 17:49:11 2013
@@ -732,10 +732,10 @@ public class TestStore extends TestCase 
 
 
   private static void flushStore(HStore store, long id) throws IOException {
-    StoreFlusher storeFlusher = store.getStoreFlusher(id);
-    storeFlusher.prepare();
-    storeFlusher.flushCache(Mockito.mock(MonitoredTask.class));
-    storeFlusher.commit(Mockito.mock(MonitoredTask.class));
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id);
+    storeFlushCtx.prepare();
+    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
   }
 
 

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1475871&r1=1475870&r2=1475871&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Thu Apr 25 17:49:11 2013
@@ -61,10 +61,11 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -548,6 +549,29 @@ public class TestWALReplay {
     assertEquals(result.size(), result1b.size());
   }
 
+
+  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
+  // Only throws exception if throwExceptionWhenFlushing is set true.
+  public static class CustomStoreFlusher extends DefaultStoreFlusher {
+    // Switch between throw and not throw exception in flush
+    static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
+
+    public CustomStoreFlusher(Configuration conf, Store store) {
+      super(conf, store);
+    }
+    @Override
+    public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
+        TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
+            throws IOException {
+      if (throwExceptionWhenFlushing.get()) {
+        throw new IOException("Simulated exception by tests");
+      }
+      return super.flushSnapshot(snapshot, cacheFlushId, snapshotTimeRangeTracker,
+          flushedSize, status);
+    }
+
+  };
+
   /**
    * Test that we could recover the data correctly after aborting flush. In the
    * test, first we abort flush after writing some data, then writing more data
@@ -568,28 +592,12 @@ public class TestWALReplay {
     // of the families during the load of edits so its seqid is not same as
     // others to test we do right thing when different seqids.
     HLog wal = createWAL(this.conf);
-    final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
     RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
     Mockito.doReturn(false).when(rsServices).isAborted();
-    HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd,
-        rsServices) {
-      @Override
-      protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
-        return new HStore(this, family, conf) {
-          @Override
-          protected Path flushCache(final long logCacheFlushId,
-              SortedSet<KeyValue> snapshot,
-              TimeRangeTracker snapshotTimeRangeTracker,
-              AtomicLong flushedSize, MonitoredTask status) throws IOException {
-            if (throwExceptionWhenFlushing.get()) {
-              throw new IOException("Simulated exception by tests");
-            }
-            return super.flushCache(logCacheFlushId, snapshot,
-                snapshotTimeRangeTracker, flushedSize, status);
-          }
-        };
-      }
-    };
+    Configuration customConf = new Configuration(this.conf);
+    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+        CustomStoreFlusher.class.getName());
+    HRegion region = new HRegion(basedir, wal, this.fs, customConf, hri, htd, rsServices);
     long seqid = region.initialize();
     // HRegionServer usually does this. It knows the largest seqid across all
     // regions.
@@ -610,7 +618,7 @@ public class TestWALReplay {
     assertEquals(writtenRowCount, getScannedCount(scanner));
 
     // Let us flush the region
-    throwExceptionWhenFlushing.set(true);
+    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
     try {
       region.flushcache();
       fail("Injected exception hasn't been thrown");
@@ -630,7 +638,7 @@ public class TestWALReplay {
     }
     writtenRowCount += moreRow;
     // call flush again
-    throwExceptionWhenFlushing.set(false);
+    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
     try {
       region.flushcache();
     } catch (IOException t) {