You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/19 03:51:46 UTC

svn commit: r1387429 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: mbautin
Date: Wed Sep 19 01:51:45 2012
New Revision: 1387429

URL: http://svn.apache.org/viewvc?rev=1387429&view=rev
Log:
[HBASE-6059] Fixing WAL Replay bug

Author: lipi

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1387429&r1=1387428&r2=1387429&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Sep 19 01:51:45 2012
@@ -54,7 +54,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.Date;
-import java.util.Calendar;
 import java.text.SimpleDateFormat;
 
 import org.apache.commons.logging.Log;
@@ -70,7 +69,6 @@ import org.apache.hadoop.hbase.HColumnDe
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
@@ -563,22 +561,19 @@ public class HRegion implements HeapSize
       cleanupTmpDir();
 
       // Load in all the HStores.
-      // Get minimum of the maxSeqId across all the store.
       //
       // Context: During replay we want to ensure that we do not lose any data. So, we
       // have to be conservative in how we replay logs. For each store, we calculate
-      // the maxSeqId up to which the store was flushed. But, since different stores
-      // could have a different maxSeqId, we choose the
-      // minimum across all the stores.
-      // This could potentially result in duplication of data for stores that are ahead
-      // of others. ColumnTrackers in the ScanQueryMatchers do the de-duplication, so we
-      // do not have to worry.
-      // TODO: If there is a store that was never flushed in a long time, we could replay
-      // a lot of data. Currently, this is not a problem because we flush all the stores at
-      // the same time. If we move to per-cf flushing, we might want to revisit this and send
-      // in a vector of maxSeqIds instead of sending in a single number, which has to be the
-      // min across all the max.
-      long minSeqId = -1;
+      // the maximum seqId up to which the store was flushed. We skip the edits which
+      // are equal to or lower than the maxSeqId for each store.
+      //
+      // We cannot just choose the minimum maxSeqId for all stores, because doing so
+      // can cause correctness issues if redundant edits replayed from the logs are
+      // not contiguous.
+
+      Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+            Bytes.BYTES_COMPARATOR);
+
       long maxSeqId = -1;
       // initialized to -1 so that we pick up MemstoreTS from column families
       long maxMemstoreTS = -1;
@@ -609,10 +604,10 @@ public class HRegion implements HeapSize
 
             this.stores.put(store.getColumnFamilyName().getBytes(), store);
             // Do not include bulk loaded files when determining seqIdForReplay
-            long storeSeqIdForReplay = store.getMaxSequenceId(false);
-            if (minSeqId == -1 || storeSeqIdForReplay < minSeqId) {
-              minSeqId = storeSeqIdForReplay;
-            }
+            long storeSeqIdforReplay = store.getMaxSequenceId(false);
+            maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
+                storeSeqIdforReplay);
+
             // Include bulk loaded files when determining seqIdForAssignment
             long storeSeqIdForAssignment = store.getMaxSequenceId(true);
             if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
@@ -634,7 +629,7 @@ public class HRegion implements HeapSize
       mvcc.initialize(maxMemstoreTS + 1);
       // Recover any edits if available.
       maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
-          this.regiondir, minSeqId, reporter, status));
+          this.regiondir, maxSeqIdInStores, reporter, status));
 
       // Get rid of any splits or merges that were lost in-progress.  Clean out
       // these directories here on open.  We may be opening a region that was
@@ -1540,13 +1535,6 @@ public class HRegion implements HeapSize
         this.getRegionInfo().isMetaRegion());
     }
 
-    // Update the last flushed sequence id for region
-    if (this.regionServer != null) {
-      this.regionServer.getServerInfo().setFlushedSequenceIdForRegion(
-          getRegionName(),
-          completeSequenceId);
-    }
-
     // C. Finally notify anyone waiting on memstore to clear:
     // e.g. checkResources().
     synchronized (this) {
@@ -2516,19 +2504,21 @@ public class HRegion implements HeapSize
    * make sense in a this single region context only -- until we online.
    *
    * @param regiondir
-   * @param minSeqId Any edit found in split editlogs needs to be in excess of
-   * this minSeqId to be applied, else its skipped.
+   * @param maxSeqIdInStores Any edit found in split editlogs needs to be in
+   * excess of the maxSeqId for the store to be applied, else its skipped.
    * @param reporter
    * @param status
-   * @return the sequence id of the last edit added to this region out of the
-   * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
+   * @return the maximum sequence id of the edits replayed or -1 if nothing
+   * added from editlogs.
    * @throws UnsupportedEncodingException
    * @throws IOException
    */
   protected long replayRecoveredEditsIfAny(final Path regiondir,
-      final long minSeqId, final Progressable reporter, MonitoredTask status)
+      final Map<byte[], Long> maxSeqIdInStores, final Progressable reporter, MonitoredTask status)
   throws UnsupportedEncodingException, IOException {
-    long seqid = minSeqId;
+
+    long seqid = -1;
+
     NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
     if (files == null || files.isEmpty()) return seqid;
     for (Path edits: files) {
@@ -2536,9 +2526,11 @@ public class HRegion implements HeapSize
         LOG.warn("Null or non-existent edits file: " + edits);
         continue;
       }
-      if (isZeroLengthThenDelete(this.fs, edits)) continue;
+      if (isZeroLengthThenDelete(this.fs, edits)) {
+        continue;
+      }
       try {
-        seqid = replayRecoveredEdits(edits, seqid, reporter);
+        seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
       } catch (IOException e) {
         boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
         if (skipErrors) {
@@ -2550,10 +2542,12 @@ public class HRegion implements HeapSize
         }
       }
     }
-    if (seqid > minSeqId) {
-      // Then we added some edits to memory. Flush and cleanup split edit files.
+
+    if (seqid > -1){
+      // In case we added some edits to memory, we should flush
       internalFlushcache(null, seqid, status);
     }
+
     // Now delete the content of recovered edits.  We're done w/ them.
     for (Path file: files) {
       if (!this.fs.delete(file, false)) {
@@ -2567,112 +2561,117 @@ public class HRegion implements HeapSize
 
   /*
    * @param edits File of recovered edits.
-   * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
-   * must be larger than this to be replayed.
+   * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
+   * must be larger than this to be replayed for each store.
    * @param reporter
-   * @return the sequence id of the last edit added to this region out of the
-   * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
+   * @return the sequence id of the max edit log entry seen in the HLog or -1
+   * if no edits have been replayed
    * @throws IOException
    */
   private long replayRecoveredEdits(final Path edits,
-      final long minSeqId, final Progressable reporter)
+      final Map<byte[], Long> maxSeqIdInStores, final Progressable reporter)
     throws IOException {
-    String msg = "Replaying edits from " + edits + "; minSeqId=" + minSeqId;
+    String msg = "Replaying edits from " + edits;
     LOG.info(msg);
     MonitoredTask status = TaskMonitor.get().createStatus(msg);
     status.setStatus("Opening logs");
     HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
     try {
-    long currentEditSeqId = minSeqId;
-    long firstSeqIdInLog = -1;
-    long skippedEdits = 0;
-    long editsCount = 0;
-    HLog.Entry entry;
-    Store store = null;
+      long currentEditSeqId = -1;
+      long firstSeqIdInLog = -1;
+      long skippedEdits = 0;
+      long editsCount = 0;
+      HLog.Entry entry;
+      Store store = null;
 
-    try {
-      // How many edits to apply before we send a progress report.
-      int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
-      while ((entry = reader.next()) != null) {
-        HLogKey key = entry.getKey();
-        WALEdit val = entry.getEdit();
-        if (firstSeqIdInLog == -1) {
-          firstSeqIdInLog = key.getLogSeqNum();
-        }
-        // Now, figure if we should skip this edit.
-        if (key.getLogSeqNum() <= currentEditSeqId) {
-          skippedEdits++;
-          continue;
-        }
-        currentEditSeqId = key.getLogSeqNum();
-        boolean flush = false;
-        for (KeyValue kv: val.getKeyValues()) {
-          // Check this edit is for me. Also, guard against writing the special
-          // METACOLUMN info such as HBASE::CACHEFLUSH entries
-          if (kv.matchingFamily(HLog.METAFAMILY) ||
-              !Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) {
-            skippedEdits++;
-            continue;
+      try {
+        // How many edits to apply before we send a progress report.
+        int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
+        while ((entry = reader.next()) != null) {
+          HLogKey key = entry.getKey();
+          WALEdit val = entry.getEdit();
+          if (firstSeqIdInLog == -1) {
+            firstSeqIdInLog = key.getLogSeqNum();
           }
-          // Figure which store the edit is meant for.
-          if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
-            store = this.stores.get(kv.getFamily());
+          currentEditSeqId = key.getLogSeqNum();
+          boolean flush = false;
+          for (KeyValue kv: val.getKeyValues()) {
+            // Check this edit is for me. Also, guard against writing the special
+            // METACOLUMN info such as HBASE::CACHEFLUSH entries
+            if (kv.matchingFamily(HLog.METAFAMILY) ||
+                !Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) {
+              skippedEdits++;
+              continue;
+            }
+            // Figure which store the edit is meant for.
+            if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
+              store = this.stores.get(kv.getFamily());
+            }
+            if (store == null) {
+              // This should never happen.  Perhaps schema was changed between
+              // crash and redeploy?
+              LOG.warn("No family for " + kv);
+              skippedEdits++;
+              continue;
+            }
+            // Now, figure if we should skip this edit.
+            if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
+                .getName())) {
+              skippedEdits++;
+              continue;
+            }
+            // Once we are over the limit, restoreEdit will keep returning true to
+            // flush -- but don't flush until we've played all the kvs that make up
+            // the WALEdit.
+            flush = restoreEdit(store, kv);
+            editsCount++;
           }
-          if (store == null) {
-            // This should never happen.  Perhaps schema was changed between
-            // crash and redeploy?
-            LOG.warn("No family for " + kv);
-            skippedEdits++;
-            continue;
+          if (flush) internalFlushcache(null, currentEditSeqId, status);
+
+          // Every 'interval' edits, tell the reporter we're making progress.
+          // Have seen 60k edits taking 3minutes to complete.
+          if (reporter != null && (editsCount % interval) == 0) {
+            status.setStatus("Replaying edits..." +
+                " skipped=" + skippedEdits +
+                " edits=" + editsCount);
+            reporter.progress();
           }
-          // Once we are over the limit, restoreEdit will keep returning true to
-          // flush -- but don't flush until we've played all the kvs that make up
-          // the WALEdit.
-          flush = restoreEdit(store, kv);
-          editsCount++;
-        }
-        if (flush) internalFlushcache(null, currentEditSeqId, status);
-
-        // Every 'interval' edits, tell the reporter we're making progress.
-        // Have seen 60k edits taking 3minutes to complete.
-        if (reporter != null && (editsCount % interval) == 0) {
-          status.setStatus("Replaying edits..." +
-              " skipped=" + skippedEdits +
-              " edits=" + editsCount);
-          reporter.progress();
-        }
-      }
-    } catch (EOFException eof) {
-      Path p = HLog.moveAsideBadEditsFile(fs, edits);
-      msg = "Encountered EOF. Most likely due to Master failure during " +
-          "log spliting, so we have this data in another edit.  " +
-          "Continuing, but renaming " + edits + " as " + p;
-      LOG.warn(msg, eof);
-      status.setStatus(msg);
-    } catch (IOException ioe) {
-      // If the IOE resulted from bad file format,
-      // then this problem is idempotent and retrying won't help
-      if (ioe.getCause() instanceof ParseException) {
+        }
+      } catch (EOFException eof) {
         Path p = HLog.moveAsideBadEditsFile(fs, edits);
-        msg = "File corruption encountered!  " +
+        msg = "Encountered EOF. Most likely due to Master failure during " +
+            "log spliting, so we have this data in another edit.  " +
             "Continuing, but renaming " + edits + " as " + p;
-        LOG.warn(msg, ioe);
+        LOG.warn(msg, eof);
         status.setStatus(msg);
+      } catch (IOException ioe) {
+          // If the IOE resulted from bad file format,
+          // then this problem is idempotent and retrying won't help
+        if (ioe.getCause() instanceof ParseException) {
+          Path p = HLog.moveAsideBadEditsFile(fs, edits);
+          msg = "File corruption encountered!  " +
+              "Continuing, but renaming " + edits + " as " + p;
+          LOG.warn(msg, ioe);
+          status.setStatus(msg);
+        } else {
+          // other IO errors may be transient (bad network connection,
+          // checksum exception on one datanode, etc).  throw & retry
+          status.abort(StringUtils.stringifyException(ioe));
+          throw ioe;
+        }
+      }
+      msg = "Applied " + editsCount + ", skipped " + skippedEdits +
+          ", firstSeqIdInLog=" + firstSeqIdInLog +
+          ", maxSeqIdInLog=" + currentEditSeqId;
+      status.markComplete(msg);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(msg);
+      }
+      if(editsCount == 0){
+        return -1;
       } else {
-        // other IO errors may be transient (bad network connection,
-        // checksum exception on one datanode, etc).  throw & retry
-        status.abort(StringUtils.stringifyException(ioe));
-        throw ioe;
+        return currentEditSeqId;
       }
-    }
-    msg = "Applied " + editsCount + ", skipped " + skippedEdits +
-        ", firstSeqIdInLog=" + firstSeqIdInLog +
-        ", maxSeqIdInLog=" + currentEditSeqId;
-    status.markComplete(msg);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(msg);
-    }
-    return currentEditSeqId;
     } finally {
       reader.close();
       status.cleanup();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1387429&r1=1387428&r2=1387429&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Sep 19 01:51:45 2012
@@ -1159,10 +1159,13 @@ public class Store extends SchemaConfigu
         boolean hasMore;
         do {
           hasMore = scanner.next(kvs, 1);
+          // Create the writer even if no kv(Empty store file is also ok),
+          // because we need record the max seq id for the store file, see
+          // HBASE-6059
+          if (writer == null) {
+            writer = createWriterInTmp(maxKeyCount, compression, true);
+          }
           if (!kvs.isEmpty()) {
-            if (writer == null) {
-              writer = createWriterInTmp(maxKeyCount, compression, true);
-            }
             // output to writer:
             for (KeyValue kv : kvs) {
               if (kv.getMemstoreTS() <= smallestReadPoint) {

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java?rev=1387429&r1=1387428&r2=1387429&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java Wed Sep 19 01:51:45 2012
@@ -19,8 +19,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionInfo;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1387429&r1=1387428&r2=1387429&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Wed Sep 19 01:51:45 2012
@@ -38,16 +38,24 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ResultScanner;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -75,7 +83,7 @@ public class TestWALReplay {
     // The below config supported by 0.20-append and CDH3b2
     conf.setInt("dfs.client.block.recovery.retries", 2);
     conf.setInt("hbase.regionserver.flushlogentries", 1);
-    TEST_UTIL.startMiniDFSCluster(3);
+    TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000);
     Path hbaseRootDir =
       TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
@@ -85,7 +93,7 @@ public class TestWALReplay {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniDFSCluster();
+    TEST_UTIL.shutdownMiniCluster();
   }
 
   @Before
@@ -476,6 +484,97 @@ public class TestWALReplay {
     }
   }
 
+  /**
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
+    final byte[] tableName = Bytes
+        .toBytes("testReplayEditsAfterRegionMovedWithMultiCF");
+    byte[] family1 = Bytes.toBytes("cf1");
+    byte[] family2 = Bytes.toBytes("cf2");
+    byte[] qualifier = Bytes.toBytes("q");
+    byte[] value = Bytes.toBytes("testV");
+    byte[][] familys = { family1, family2 };
+    TEST_UTIL.createTable(tableName, familys);
+    HTable htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    Put put = new Put(Bytes.toBytes("r1"));
+    put.add(family1, qualifier, value);
+    htable.put(put);
+    ResultScanner resultScanner = htable.getScanner(new Scan());
+    int count = 0;
+    while (resultScanner.next() != null) {
+      count++;
+    }
+    resultScanner.close();
+    assertEquals(1, count);
+
+    MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
+    List<HRegion> regions = hbaseCluster.getRegions(tableName);
+    assertEquals(1, regions.size());
+
+    // move region to another regionserver
+    HRegion regionToMove = regions.get(0);
+    int originServerNum = hbaseCluster
+        .getServerWith(regionToMove.getRegionName());
+    assertTrue("Please start more than 1 regionserver", hbaseCluster
+        .getRegionServerThreads().size() > 1);
+    int destServerNum = 0;
+    while (destServerNum == originServerNum) {
+      destServerNum++;
+    }
+    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
+    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
+    // move region to destination regionserver
+    moveRegionAndWait(regionToMove, destServer);
+
+    // delete the row
+    Delete del = new Delete(Bytes.toBytes("r1"));
+    htable.delete(del);
+    resultScanner = htable.getScanner(new Scan());
+    count = 0;
+    while (resultScanner.next() != null) {
+      count++;
+    }
+    resultScanner.close();
+    assertEquals(0, count);
+
+    // flush region and make major compaction
+    destServer.getOnlineRegion(regionToMove.getRegionName()).flushcache();
+    // wait to complete major compaction
+    for (Store store : destServer.getOnlineRegion(regionToMove.getRegionName())
+        .getStores().values()) {
+      store.triggerMajorCompaction();
+    }
+    destServer.getOnlineRegion(regionToMove.getRegionName()).compactStores();
+
+    // move region to origin regionserver
+    moveRegionAndWait(regionToMove, originServer);
+    // abort the origin regionserver
+    originServer.abort("testing");
+
+    // see what we get
+    Result result = htable.get(new Get(Bytes.toBytes("r1")));
+    if (result != null) {
+      assertTrue("Row is deleted, but we get" + result.toString(),
+          (result == null) || result.isEmpty());
+    }
+    resultScanner.close();
+  }
+
+  private void moveRegionAndWait(HRegion regionToMove, HRegionServer destServer)
+      throws InterruptedException, MasterNotRunningException,
+       IOException {
+    TEST_UTIL.getHBaseAdmin().moveRegion(
+        regionToMove.getRegionName(),
+        destServer.getServerInfo().getHostnamePort());
+    while (destServer.getOnlineRegion(regionToMove.getRegionName()) == null){
+      //Wait for this move to complete.
+      Thread.sleep(10);
+    }
+  }
+
   private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
       final byte [] rowName, final byte [] family,
       final int count, EnvironmentEdge ee, final HLog wal)