You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/06/08 22:46:40 UTC

hbase git commit: HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS

Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 978c503d9 -> 5dd4c9832


HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5dd4c983
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5dd4c983
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5dd4c983

Branch: refs/heads/branch-1.1
Commit: 5dd4c9832d0cc70a2121a6250060a2b5cf7d4312
Parents: 978c503
Author: stack <st...@apache.org>
Authored: Mon Jun 8 13:46:16 2015 -0700
Committer: stack <st...@apache.org>
Committed: Mon Jun 8 13:46:16 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  28 ++--
 .../hbase/regionserver/HRegionServer.java       |  19 +--
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |  42 ++++--
 .../hadoop/hbase/wal/DisabledWALProvider.java   |  11 +-
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  46 +++---
 .../regionserver/TestSplitWalDataLoss.java      | 149 +++++++++++++++++++
 6 files changed, 233 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0d5306e..0e83dd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -217,13 +217,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   final AtomicBoolean closing = new AtomicBoolean(false);
 
   /**
-   * The max sequence id of flushed data on this region.  Used doing some rough calculations on
-   * whether time to flush or not.
+   * The max sequence id of flushed data on this region. There is no edit in memory that is
+   * less that this sequence id.
    */
   private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
 
   /**
-   * Record the sequence id of last flush operation.
+   * Record the sequence id of last flush operation. Can be in advance of
+   * {@link #maxFlushedSeqId} when flushing a single column family. In this case,
+   * {@link #maxFlushedSeqId} will be older than the oldest edit in memory.
    */
   private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
   /**
@@ -1619,7 +1621,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           .setSequenceId(
             oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
     }
-    return regionLoadBldr.setCompleteSequenceId(this.maxFlushedSeqId);
+    return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -2132,21 +2134,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       try {
         w = mvcc.beginMemstoreInsert();
         if (wal != null) {
-          if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
-            // This should never happen.
-            String msg = "Flush will not be started for ["
-                + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
+          Long earliestUnflushedSequenceIdForTheRegion =
+              wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
+          if (earliestUnflushedSequenceIdForTheRegion == null) {
+            // This should never happen. This is how startCacheFlush signals flush cannot proceed.
+            String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
             status.setStatus(msg);
             return new PrepareFlushResult(
               new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
               myseqid);
           }
           flushOpSeqId = getNextSequenceId(wal);
-          long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
-          // no oldestUnflushedSeqId means we flushed all stores.
-          // or the unflushed stores are all empty.
-          flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId
-              : oldestUnflushedSeqId - 1;
+          // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
+          flushedSeqId =
+            earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
+              flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
         } else {
           // use the provided sequence Id as WAL is not being used for this flush.
           flushedSeqId = flushOpSeqId = myseqid;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8f4059e..3664ffb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2835,22 +2835,15 @@ public class HRegionServer extends HasThread implements
     Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
 
     if (destination != null) {
-      try {
-        WAL wal = getWAL(r.getRegionInfo());
-        long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
+      long closeSeqNum = r.getMaxFlushedSeqId();
+      if (closeSeqNum == HConstants.NO_SEQNUM) {
+        // No edits in WAL for this region; get the sequence number when the region was opened.
+        closeSeqNum = r.getOpenSeqNum();
         if (closeSeqNum == HConstants.NO_SEQNUM) {
-          // No edits in WAL for this region; get the sequence number when the region was opened.
-          closeSeqNum = r.getOpenSeqNum();
-          if (closeSeqNum == HConstants.NO_SEQNUM) {
-            closeSeqNum = 0;
-          }
+          closeSeqNum = 0;
         }
-        addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
-      } catch (IOException exception) {
-        LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
-            "; not adding to moved regions.");
-        LOG.debug("Exception details for failure to get wal", exception);
       }
+      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
     }
     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
     return toReturn != null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 7c170b0..76e93a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -28,6 +28,7 @@ import java.lang.reflect.Method;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -850,7 +851,12 @@ public class FSHLog implements WAL {
     // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
     synchronized (regionSequenceIdLock) {
       for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
-        long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
+        ConcurrentMap<byte[], Long> m =
+            this.oldestUnflushedStoreSequenceIds.get(e.getKey());
+        if (m == null) {
+          continue;
+        }
+        long unFlushedVal = Collections.min(m.values());
         if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
           if (regionsToFlush == null)
             regionsToFlush = new ArrayList<byte[]>();
@@ -1631,14 +1637,15 @@ public class FSHLog implements WAL {
   }
 
   @Override
-  public boolean startCacheFlush(final byte[] encodedRegionName,
+  public Long startCacheFlush(final byte[] encodedRegionName,
       Set<byte[]> flushedFamilyNames) {
     Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
     if (!closeBarrier.beginOp()) {
       LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
         " - because the server is closing.");
-      return false;
+      return null;
     }
+    long oldestUnflushedSequenceId = HConstants.NO_SEQNUM;
     synchronized (regionSequenceIdLock) {
       ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
           oldestUnflushedStoreSequenceIds.get(encodedRegionName);
@@ -1661,6 +1668,9 @@ public class FSHLog implements WAL {
           // Do not worry about data racing, we held write lock of region when calling
           // startCacheFlush, so no one can add value to the map we removed.
           oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
+        } else {
+          oldestUnflushedSequenceId =
+              Collections.min(oldestUnflushedStoreSequenceIdsOfRegion.values());
         }
       }
     }
@@ -1673,7 +1683,7 @@ public class FSHLog implements WAL {
       LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
         + Bytes.toString(encodedRegionName) + "]");
     }
-    return true;
+    return oldestUnflushedSequenceId;
   }
 
   @Override
@@ -1759,7 +1769,6 @@ public class FSHLog implements WAL {
     WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
   }
 
-
   @Override
   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
     ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
@@ -1771,14 +1780,23 @@ public class FSHLog implements WAL {
   @Override
   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
       byte[] familyName) {
-    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
-        this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
-    if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
-      Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName);
-      return result != null ? result.longValue() : HConstants.NO_SEQNUM;
-    } else {
-      return HConstants.NO_SEQNUM;
+    synchronized (regionSequenceIdLock) {
+      Map<byte[], Long> m = this.lowestFlushingStoreSequenceIds.get(encodedRegionName);
+      if (m != null) {
+        Long earlist = m.get(familyName);
+        if (earlist != null) {
+          return earlist;
+        }
+      }
+      m = this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+      if (m != null) {
+        Long earlist = m.get(familyName);
+        if (earlist != null) {
+          return earlist;
+        }
+      }
     }
+    return HConstants.NO_SEQNUM;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 7254ad1..56d17a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -26,21 +26,19 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.util.FSUtils;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 // imports for things that haven't moved from regionserver.wal yet.
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.FSUtils;
 
 /**
  * No-op implementation of {@link WALProvider} used when the WAL is disabled.
@@ -187,8 +185,9 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
-      return !(closed.get());
+    public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
+      if (closed.get()) return null;
+      return HConstants.NO_SEQNUM;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 5a2b08d..473bba9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -25,12 +25,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 // imports we use from yet-to-be-moved regionsever.wal
 import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
  * APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
@@ -140,20 +142,23 @@ public interface WAL {
   void sync(long txid) throws IOException;
 
   /**
-   * WAL keeps track of the sequence numbers that were not yet flushed from memstores
-   * in order to be able to do cleanup. This method tells WAL that some region is about
-   * to flush memstore.
-   *
-   * <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this
-   * region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit,
-   * AtomicLong, boolean, List)} as new oldest seqnum.
-   * In case of flush being aborted, we put the stashed value back; in case of flush succeeding,
-   * the seqNum of that first edit after start becomes the valid oldest seqNum for this region.
+   * WAL keeps track of the sequence numbers that are as yet not flushed im memstores
+   * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL
+   * that some region is about to flush. The flush can be the whole region or for a column family
+   * of the region only.
    *
-   * @return true if the flush can proceed, false in case wal is closing (ususally, when server is
-   * closing) and flush couldn't be started.
+   * <p>Currently, it is expected that the update lock is held for the region; i.e. no
+   * concurrent appends while we set up cache flush.
+   * @param families Families to flush. May be a subset of all families in the region.
+   * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if
+   * we are flushing a subset of all families but there are no edits in those families not
+   * being flushed; in other words, this is effectively same as a flush of all of the region
+   * though we were passed a subset of regions. Otherwise, it returns the sequence id of the
+   * oldest/lowest outstanding edit.
+   * @see #completeCacheFlush(byte[])
+   * @see #abortCacheFlush(byte[])
    */
-  boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
+  Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
 
   /**
    * Complete the cache flush.
@@ -175,11 +180,16 @@ public interface WAL {
   WALCoprocessorHost getCoprocessorHost();
 
 
-  /** Gets the earliest sequence number in the memstore for this particular region.
-   * This can serve as best-effort "recent" WAL number for this region.
+  /**
+   * Gets the earliest unflushed sequence id in the memstore for the region.
+
    * @param encodedRegionName The region to get the number for.
-   * @return The number if present, HConstants.NO_SEQNUM if absent.
+   * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
+   * @deprecated Since version 1.1.1. Removing because not used and exposes subtle internal
+   * workings. Use {@link #getEarliestMemstoreSeqNum(byte[], byte[])}
    */
+  @VisibleForTesting
+  @Deprecated
   long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
new file mode 100644
index 0000000..be201ea
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
+import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mortbay.log.Log;
+
+/**
+ * Testcase for https://issues.apache.org/jira/browse/HBASE-13811
+ */
+@Category({ MediumTests.class })
+public class TestSplitWalDataLoss {
+
+  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+
+  private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
+      .build();
+
+  private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
+
+  private byte[] family = Bytes.toBytes("f");
+
+  private byte[] qualifier = Bytes.toBytes("q");
+
+  @Before
+  public void setUp() throws Exception {
+    testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
+    testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+    testUtil.startMiniCluster(2);
+    HBaseAdmin admin = testUtil.getHBaseAdmin();
+    admin.createNamespace(namespace);
+    admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
+    testUtil.waitTableAvailable(tableName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    testUtil.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
+    final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
+    HRegion spiedRegion = spy(region);
+    final MutableBoolean flushed = new MutableBoolean(false);
+    final MutableBoolean reported = new MutableBoolean(false);
+    doAnswer(new Answer<FlushResult>() {
+      @Override
+      public FlushResult answer(InvocationOnMock invocation) throws Throwable {
+        synchronized (flushed) {
+          flushed.setValue(true);
+          flushed.notifyAll();
+        }
+        synchronized (reported) {
+          while (!reported.booleanValue()) {
+            reported.wait();
+          }
+        }
+        rs.getWAL(region.getRegionInfo()).abortCacheFlush(
+          region.getRegionInfo().getEncodedNameAsBytes());
+        throw new DroppedSnapshotException("testcase");
+      }
+    }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
+      Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
+      Matchers.<Collection<Store>> any());
+    rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion);
+    Connection conn = testUtil.getConnection();
+
+    try (Table table = conn.getTable(tableName)) {
+      table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
+    }
+    long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
+    Log.info("CHANGE OLDEST " + oldestSeqIdOfStore);
+    assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
+    rs.cacheFlusher.requestFlush(spiedRegion, false);
+    synchronized (flushed) {
+      while (!flushed.booleanValue()) {
+        flushed.wait();
+      }
+    }
+    try (Table table = conn.getTable(tableName)) {
+      table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
+    }
+    long now = EnvironmentEdgeManager.currentTime();
+    rs.tryRegionServerReport(now - 500, now);
+    synchronized (reported) {
+      reported.setValue(true);
+      reported.notifyAll();
+    }
+    while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
+      Thread.sleep(100);
+    }
+    try (Table table = conn.getTable(tableName)) {
+      Result result = table.get(new Get(Bytes.toBytes("row0")));
+      assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
+    }
+  }
+}
\ No newline at end of file