You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/06/08 22:46:40 UTC
hbase git commit: HBASE-13811 Splitting WALs,
we are filtering out too many edits -> DATALOSS
Repository: hbase
Updated Branches:
refs/heads/branch-1.1 978c503d9 -> 5dd4c9832
HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5dd4c983
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5dd4c983
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5dd4c983
Branch: refs/heads/branch-1.1
Commit: 5dd4c9832d0cc70a2121a6250060a2b5cf7d4312
Parents: 978c503
Author: stack <st...@apache.org>
Authored: Mon Jun 8 13:46:16 2015 -0700
Committer: stack <st...@apache.org>
Committed: Mon Jun 8 13:46:16 2015 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 28 ++--
.../hbase/regionserver/HRegionServer.java | 19 +--
.../hadoop/hbase/regionserver/wal/FSHLog.java | 42 ++++--
.../hadoop/hbase/wal/DisabledWALProvider.java | 11 +-
.../java/org/apache/hadoop/hbase/wal/WAL.java | 46 +++---
.../regionserver/TestSplitWalDataLoss.java | 149 +++++++++++++++++++
6 files changed, 233 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0d5306e..0e83dd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -217,13 +217,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final AtomicBoolean closing = new AtomicBoolean(false);
/**
- * The max sequence id of flushed data on this region. Used doing some rough calculations on
- * whether time to flush or not.
+ * The max sequence id of flushed data on this region. There is no edit in memory that is
+ * less that this sequence id.
*/
private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
/**
- * Record the sequence id of last flush operation.
+ * Record the sequence id of last flush operation. Can be in advance of
+ * {@link #maxFlushedSeqId} when flushing a single column family. In this case,
+ * {@link #maxFlushedSeqId} will be older than the oldest edit in memory.
*/
private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
/**
@@ -1619,7 +1621,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
.setSequenceId(
oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
}
- return regionLoadBldr.setCompleteSequenceId(this.maxFlushedSeqId);
+ return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
}
//////////////////////////////////////////////////////////////////////////////
@@ -2132,21 +2134,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
w = mvcc.beginMemstoreInsert();
if (wal != null) {
- if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
- // This should never happen.
- String msg = "Flush will not be started for ["
- + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
+ Long earliestUnflushedSequenceIdForTheRegion =
+ wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
+ if (earliestUnflushedSequenceIdForTheRegion == null) {
+ // This should never happen. This is how startCacheFlush signals flush cannot proceed.
+ String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
status.setStatus(msg);
return new PrepareFlushResult(
new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
myseqid);
}
flushOpSeqId = getNextSequenceId(wal);
- long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
- // no oldestUnflushedSeqId means we flushed all stores.
- // or the unflushed stores are all empty.
- flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId
- : oldestUnflushedSeqId - 1;
+ // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
+ flushedSeqId =
+ earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
+ flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
} else {
// use the provided sequence Id as WAL is not being used for this flush.
flushedSeqId = flushOpSeqId = myseqid;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8f4059e..3664ffb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2835,22 +2835,15 @@ public class HRegionServer extends HasThread implements
Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
if (destination != null) {
- try {
- WAL wal = getWAL(r.getRegionInfo());
- long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
+ long closeSeqNum = r.getMaxFlushedSeqId();
+ if (closeSeqNum == HConstants.NO_SEQNUM) {
+ // No edits in WAL for this region; get the sequence number when the region was opened.
+ closeSeqNum = r.getOpenSeqNum();
if (closeSeqNum == HConstants.NO_SEQNUM) {
- // No edits in WAL for this region; get the sequence number when the region was opened.
- closeSeqNum = r.getOpenSeqNum();
- if (closeSeqNum == HConstants.NO_SEQNUM) {
- closeSeqNum = 0;
- }
+ closeSeqNum = 0;
}
- addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
- } catch (IOException exception) {
- LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
- "; not adding to moved regions.");
- LOG.debug("Exception details for failure to get wal", exception);
}
+ addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
}
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
return toReturn != null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 7c170b0..76e93a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -28,6 +28,7 @@ import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -850,7 +851,12 @@ public class FSHLog implements WAL {
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (regionSequenceIdLock) {
for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
- long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
+ ConcurrentMap<byte[], Long> m =
+ this.oldestUnflushedStoreSequenceIds.get(e.getKey());
+ if (m == null) {
+ continue;
+ }
+ long unFlushedVal = Collections.min(m.values());
if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
if (regionsToFlush == null)
regionsToFlush = new ArrayList<byte[]>();
@@ -1631,14 +1637,15 @@ public class FSHLog implements WAL {
}
@Override
- public boolean startCacheFlush(final byte[] encodedRegionName,
+ public Long startCacheFlush(final byte[] encodedRegionName,
Set<byte[]> flushedFamilyNames) {
Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
if (!closeBarrier.beginOp()) {
LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
" - because the server is closing.");
- return false;
+ return null;
}
+ long oldestUnflushedSequenceId = HConstants.NO_SEQNUM;
synchronized (regionSequenceIdLock) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
oldestUnflushedStoreSequenceIds.get(encodedRegionName);
@@ -1661,6 +1668,9 @@ public class FSHLog implements WAL {
// Do not worry about data racing, we held write lock of region when calling
// startCacheFlush, so no one can add value to the map we removed.
oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
+ } else {
+ oldestUnflushedSequenceId =
+ Collections.min(oldestUnflushedStoreSequenceIdsOfRegion.values());
}
}
}
@@ -1673,7 +1683,7 @@ public class FSHLog implements WAL {
LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
+ Bytes.toString(encodedRegionName) + "]");
}
- return true;
+ return oldestUnflushedSequenceId;
}
@Override
@@ -1759,7 +1769,6 @@ public class FSHLog implements WAL {
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}
-
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
@@ -1771,14 +1780,23 @@ public class FSHLog implements WAL {
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
byte[] familyName) {
- ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
- this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
- if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
- Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName);
- return result != null ? result.longValue() : HConstants.NO_SEQNUM;
- } else {
- return HConstants.NO_SEQNUM;
+ synchronized (regionSequenceIdLock) {
+ Map<byte[], Long> m = this.lowestFlushingStoreSequenceIds.get(encodedRegionName);
+ if (m != null) {
+ Long earlist = m.get(familyName);
+ if (earlist != null) {
+ return earlist;
+ }
+ }
+ m = this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+ if (m != null) {
+ Long earlist = m.get(familyName);
+ if (earlist != null) {
+ return earlist;
+ }
+ }
}
+ return HConstants.NO_SEQNUM;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 7254ad1..56d17a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -26,21 +26,19 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.util.FSUtils;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.FSUtils;
/**
* No-op implementation of {@link WALProvider} used when the WAL is disabled.
@@ -187,8 +185,9 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
- return !(closed.get());
+ public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
+ if (closed.get()) return null;
+ return HConstants.NO_SEQNUM;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 5a2b08d..473bba9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -25,12 +25,12 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
// imports we use from yet-to-be-moved regionsever.wal
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
* APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
@@ -140,20 +142,23 @@ public interface WAL {
void sync(long txid) throws IOException;
/**
- * WAL keeps track of the sequence numbers that were not yet flushed from memstores
- * in order to be able to do cleanup. This method tells WAL that some region is about
- * to flush memstore.
- *
- * <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this
- * region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit,
- * AtomicLong, boolean, List)} as new oldest seqnum.
- * In case of flush being aborted, we put the stashed value back; in case of flush succeeding,
- * the seqNum of that first edit after start becomes the valid oldest seqNum for this region.
+ * WAL keeps track of the sequence numbers that are as yet not flushed im memstores
+ * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL
+ * that some region is about to flush. The flush can be the whole region or for a column family
+ * of the region only.
*
- * @return true if the flush can proceed, false in case wal is closing (ususally, when server is
- * closing) and flush couldn't be started.
+ * <p>Currently, it is expected that the update lock is held for the region; i.e. no
+ * concurrent appends while we set up cache flush.
+ * @param families Families to flush. May be a subset of all families in the region.
+ * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if
+ * we are flushing a subset of all families but there are no edits in those families not
+ * being flushed; in other words, this is effectively same as a flush of all of the region
+ * though we were passed a subset of regions. Otherwise, it returns the sequence id of the
+ * oldest/lowest outstanding edit.
+ * @see #completeCacheFlush(byte[])
+ * @see #abortCacheFlush(byte[])
*/
- boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
+ Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
/**
* Complete the cache flush.
@@ -175,11 +180,16 @@ public interface WAL {
WALCoprocessorHost getCoprocessorHost();
- /** Gets the earliest sequence number in the memstore for this particular region.
- * This can serve as best-effort "recent" WAL number for this region.
+ /**
+ * Gets the earliest unflushed sequence id in the memstore for the region.
+
* @param encodedRegionName The region to get the number for.
- * @return The number if present, HConstants.NO_SEQNUM if absent.
+ * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
+ * @deprecated Since version 1.1.1. Removing because not used and exposes subtle internal
+ * workings. Use {@link #getEarliestMemstoreSeqNum(byte[], byte[])}
*/
+ @VisibleForTesting
+ @Deprecated
long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/5dd4c983/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
new file mode 100644
index 0000000..be201ea
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
+import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mortbay.log.Log;
+
+/**
+ * Testcase for https://issues.apache.org/jira/browse/HBASE-13811
+ */
+@Category({ MediumTests.class })
+public class TestSplitWalDataLoss {
+
+ private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+
+ private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
+ .build();
+
+ private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
+
+ private byte[] family = Bytes.toBytes("f");
+
+ private byte[] qualifier = Bytes.toBytes("q");
+
+ @Before
+ public void setUp() throws Exception {
+ testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
+ testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+ testUtil.startMiniCluster(2);
+ HBaseAdmin admin = testUtil.getHBaseAdmin();
+ admin.createNamespace(namespace);
+ admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
+ testUtil.waitTableAvailable(tableName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ testUtil.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
+ final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
+ HRegion spiedRegion = spy(region);
+ final MutableBoolean flushed = new MutableBoolean(false);
+ final MutableBoolean reported = new MutableBoolean(false);
+ doAnswer(new Answer<FlushResult>() {
+ @Override
+ public FlushResult answer(InvocationOnMock invocation) throws Throwable {
+ synchronized (flushed) {
+ flushed.setValue(true);
+ flushed.notifyAll();
+ }
+ synchronized (reported) {
+ while (!reported.booleanValue()) {
+ reported.wait();
+ }
+ }
+ rs.getWAL(region.getRegionInfo()).abortCacheFlush(
+ region.getRegionInfo().getEncodedNameAsBytes());
+ throw new DroppedSnapshotException("testcase");
+ }
+ }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
+ Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
+ Matchers.<Collection<Store>> any());
+ rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion);
+ Connection conn = testUtil.getConnection();
+
+ try (Table table = conn.getTable(tableName)) {
+ table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
+ }
+ long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
+ Log.info("CHANGE OLDEST " + oldestSeqIdOfStore);
+ assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
+ rs.cacheFlusher.requestFlush(spiedRegion, false);
+ synchronized (flushed) {
+ while (!flushed.booleanValue()) {
+ flushed.wait();
+ }
+ }
+ try (Table table = conn.getTable(tableName)) {
+ table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
+ }
+ long now = EnvironmentEdgeManager.currentTime();
+ rs.tryRegionServerReport(now - 500, now);
+ synchronized (reported) {
+ reported.setValue(true);
+ reported.notifyAll();
+ }
+ while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
+ Thread.sleep(100);
+ }
+ try (Table table = conn.getTable(tableName)) {
+ Result result = table.get(new Get(Bytes.toBytes("row0")));
+ assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
+ }
+ }
+}
\ No newline at end of file