You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/10 22:10:13 UTC

[33/50] [abbrv] hbase git commit: HBASE-17712 Remove/Simplify the logic of RegionScannerImpl.handleFileNotFound

HBASE-17712 Remove/Simplify the logic of RegionScannerImpl.handleFileNotFound


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/58c76192
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/58c76192
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/58c76192

Branch: refs/heads/hbase-12439
Commit: 58c76192bdbf1f4863c1c87d165c2e3b9674d4ad
Parents: c42a066
Author: zhangduo <zh...@apache.org>
Authored: Mon Mar 6 21:00:50 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Mar 8 14:39:29 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 185 +++++++------------
 .../hbase/regionserver/HRegionServer.java       |   5 +
 .../regionserver/RegionServerServices.java      |  10 +
 .../hbase/regionserver/RegionUnassigner.java    |  68 +++++++
 .../hbase/regionserver/wal/AsyncFSWAL.java      |  12 +-
 .../hadoop/hbase/MockRegionServerServices.java  |   4 +
 .../hadoop/hbase/master/MockRegionServer.java   |   4 +
 .../TestCompactionInDeadRegionServer.java       | 164 ++++++++++++++++
 .../TestCorruptedRegionStoreFile.java           |  28 ++-
 9 files changed, 357 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index be01220..29773c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -202,6 +202,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign";
   public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true;
 
+  public static final String HREGION_UNASSIGN_FOR_FNFE = "hbase.hregion.unassign.for.fnfe";
+  public static final boolean DEFAULT_HREGION_UNASSIGN_FOR_FNFE = true;
+
   /**
    * This is the global default value for durability. All tables/mutations not
    * defining a durability or using USE_DEFAULT will default to this value.
@@ -238,9 +241,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   protected volatile long lastReplayedOpenRegionSeqId = -1L;
   protected volatile long lastReplayedCompactionSeqId = -1L;
 
-  // collects Map(s) of Store to sequence Id when handleFileNotFound() is involved
-  protected List<Map> storeSeqIds = new ArrayList<>();
-
   //////////////////////////////////////////////////////////////////////////////
   // Members
   //////////////////////////////////////////////////////////////////////////////
@@ -642,6 +642,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
       Bytes.BYTES_COMPARATOR);
 
+  // whether to unassign region if we hit FNFE
+  private final RegionUnassigner regionUnassigner;
   /**
    * HRegion constructor. This constructor should only be used for testing and
    * extensions.  Instances of HRegion should be instantiated with the
@@ -800,6 +802,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           false :
           conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
               HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
+    boolean unassignForFNFE =
+        conf.getBoolean(HREGION_UNASSIGN_FOR_FNFE, DEFAULT_HREGION_UNASSIGN_FOR_FNFE);
+    if (unassignForFNFE) {
+      this.regionUnassigner = new RegionUnassigner(rsServices, fs.getRegionInfo());
+    } else {
+      this.regionUnassigner = null;
+    }
   }
 
   void setHTableSpecificConf() {
@@ -1536,7 +1545,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       status.setStatus("Running coprocessor pre-close hooks");
       this.coprocessorHost.preClose(abort);
     }
-
     status.setStatus("Disabling compacts and flushes for region");
     boolean canFlush = true;
     synchronized (writestate) {
@@ -1697,7 +1705,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       // stop the Compacted hfile discharger
       if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true);
-
       status.markComplete("Closed");
       LOG.info("Closed " + this);
       return result;
@@ -5070,15 +5077,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+      justification = "Notify is about post replay. Intentional")
   @Override
   public boolean refreshStoreFiles() throws IOException {
-    return refreshStoreFiles(false);
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
-      justification="Notify is about post replay. Intentional")
-  protected boolean refreshStoreFiles(boolean force) throws IOException {
-    if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+    if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
       return false; // if primary nothing to do
     }
 
@@ -5110,7 +5113,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
           // see whether we can drop the memstore or the snapshot
           if (storeSeqId > maxSeqIdBefore) {
-
             if (writestate.flushing) {
               // only drop memstore snapshots if they are smaller than last flush for the store
               if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
@@ -5150,17 +5152,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
       if (!map.isEmpty()) {
-        if (!force) {
-          for (Map.Entry<Store, Long> entry : map.entrySet()) {
-            // Drop the memstore contents if they are now smaller than the latest seen flushed file
-            totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
-                .getDataSize();
-          }
-        } else {
-          synchronized (storeSeqIds) {
-            // don't try to acquire write lock of updatesLock now
-            storeSeqIds.add(map);
-          }
+        for (Map.Entry<Store, Long> entry : map.entrySet()) {
+          // Drop the memstore contents if they are now smaller than the latest seen flushed file
+          totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
+              .getDataSize();
         }
       }
       // C. Finally notify anyone waiting on memstore to clear:
@@ -5844,12 +5839,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       try {
         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
           Store store = stores.get(entry.getKey());
-          KeyValueScanner scanner;
-          try {
-            scanner = store.getScanner(scan, entry.getValue(), this.readPt);
-          } catch (FileNotFoundException e) {
-            throw handleFileNotFound(e);
-          }
+          KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
           instantiatedScanners.add(scanner);
           if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
               || this.filter.isFamilyEssential(entry.getKey())) {
@@ -5873,8 +5863,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
 
+    private void handleFileNotFound(Throwable fnfe) {
+      // Try reopenning the region since we have lost some storefiles.
+      // See HBASE-17712 for more details.
+      LOG.warn("A store file got lost", fnfe);
+      if (regionUnassigner != null) {
+        regionUnassigner.unassign();
+      }
+    }
+
     private IOException handleException(List<KeyValueScanner> instantiatedScanners,
         Throwable t) {
+      if (t instanceof FileNotFoundException) {
+        handleFileNotFound(t);
+      }
       // remove scaner read point before throw the exception
       scannerReadPoints.remove(this);
       if (storeHeap != null) {
@@ -5957,14 +5959,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         throw new UnknownScannerException("Scanner was closed");
       }
       boolean moreValues = false;
-      if (outResults.isEmpty()) {
-        // Usually outResults is empty. This is true when next is called
-        // to handle scan or get operation.
-        moreValues = nextInternal(outResults, scannerContext);
-      } else {
-        List<Cell> tmpList = new ArrayList<>();
-        moreValues = nextInternal(tmpList, scannerContext);
-        outResults.addAll(tmpList);
+      try {
+        if (outResults.isEmpty()) {
+          // Usually outResults is empty. This is true when next is called
+          // to handle scan or get operation.
+          moreValues = nextInternal(outResults, scannerContext);
+        } else {
+          List<Cell> tmpList = new ArrayList<Cell>();
+          moreValues = nextInternal(tmpList, scannerContext);
+          outResults.addAll(tmpList);
+        }
+      } catch (FileNotFoundException e) {
+        handleFileNotFound(e);
+        throw e;
       }
 
       // If the size limit was reached it means a partial Result is being returned. Returning a
@@ -6014,33 +6021,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       boolean tmpKeepProgress = scannerContext.getKeepProgress();
       // Scanning between column families and thus the scope is between cells
       LimitScope limitScope = LimitScope.BETWEEN_CELLS;
-      try {
-        do {
-          // We want to maintain any progress that is made towards the limits while scanning across
-          // different column families. To do this, we toggle the keep progress flag on during calls
-          // to the StoreScanner to ensure that any progress made thus far is not wiped away.
-          scannerContext.setKeepProgress(true);
-          heap.next(results, scannerContext);
-          scannerContext.setKeepProgress(tmpKeepProgress);
-
-          nextKv = heap.peek();
-          moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
-          if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
-          if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
-            return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
-          } else if (scannerContext.checkSizeLimit(limitScope)) {
-            ScannerContext.NextState state =
-              moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
-            return scannerContext.setScannerState(state).hasMoreValues();
-          } else if (scannerContext.checkTimeLimit(limitScope)) {
-            ScannerContext.NextState state =
-              moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
-            return scannerContext.setScannerState(state).hasMoreValues();
-          }
-        } while (moreCellsInRow);
-      } catch (FileNotFoundException e) {
-        throw handleFileNotFound(e);
-      }
+      do {
+        // We want to maintain any progress that is made towards the limits while scanning across
+        // different column families. To do this, we toggle the keep progress flag on during calls
+        // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+        scannerContext.setKeepProgress(true);
+        heap.next(results, scannerContext);
+        scannerContext.setKeepProgress(tmpKeepProgress);
+
+        nextKv = heap.peek();
+        moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
+        if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
+        if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
+          return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+        } else if (scannerContext.checkSizeLimit(limitScope)) {
+          ScannerContext.NextState state =
+              moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+          return scannerContext.setScannerState(state).hasMoreValues();
+        } else if (scannerContext.checkTimeLimit(limitScope)) {
+          ScannerContext.NextState state =
+              moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+          return scannerContext.setScannerState(state).hasMoreValues();
+        }
+      } while (moreCellsInRow);
       return nextKv != null;
     }
 
@@ -6389,35 +6392,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           result = this.joinedHeap.requestSeek(kv, true, true) || result;
         }
       } catch (FileNotFoundException e) {
-        throw handleFileNotFound(e);
+        handleFileNotFound(e);
+        throw e;
       } finally {
         closeRegionOperation();
       }
       return result;
     }
 
-    private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
-      // tries to refresh the store files, otherwise shutdown the RS.
-      // TODO: add support for abort() of a single region and trigger reassignment.
-      try {
-        region.refreshStoreFiles(true);
-        return new IOException("unable to read store file");
-      } catch (IOException e) {
-        String msg = "a store file got lost: " + fnfe.getMessage();
-        LOG.error("unable to refresh store files", e);
-        abortRegionServer(msg);
-        return new NotServingRegionException(
-          getRegionInfo().getRegionNameAsString() + " is closing");
-      }
-    }
-
-    private void abortRegionServer(String msg) throws IOException {
-      if (rsServices instanceof HRegionServer) {
-        ((HRegionServer)rsServices).abort(msg);
-      }
-      throw new UnsupportedOperationException("not able to abort RS after: " + msg);
-    }
-
     @Override
     public void shipped() throws IOException {
       if (storeHeap != null) {
@@ -7220,29 +7202,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
-  // dropMemstoreContentsForSeqId() would acquire write lock of updatesLock
-  // We perform this operation outside of the read lock of updatesLock to avoid dead lock
-  // See HBASE-16304
-  @SuppressWarnings("unchecked")
-  private void dropMemstoreContents() throws IOException {
-    MemstoreSize totalFreedSize = new MemstoreSize();
-    while (!storeSeqIds.isEmpty()) {
-      Map<Store, Long> map = null;
-      synchronized (storeSeqIds) {
-        if (storeSeqIds.isEmpty()) break;
-        map = storeSeqIds.remove(storeSeqIds.size()-1);
-      }
-      for (Map.Entry<Store, Long> entry : map.entrySet()) {
-        // Drop the memstore contents if they are now smaller than the latest seen flushed file
-        totalFreedSize
-            .incMemstoreSize(dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()));
-      }
-    }
-    if (totalFreedSize.getDataSize() > 0) {
-      LOG.debug("Freed " + totalFreedSize.getDataSize() + " bytes from memstore");
-    }
-  }
-
   @Override
   public Result increment(Increment mutation, long nonceGroup, long nonce)
   throws IOException {
@@ -7306,10 +7265,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         writeEntry = null;
       } finally {
         this.updatesLock.readLock().unlock();
-        // For increment/append, a region scanner for doing a get operation could throw
-        // FileNotFoundException. So we call dropMemstoreContents() in finally block
-        // after releasing read lock
-        dropMemstoreContents();
       }
       // If results is null, then client asked that we not return the calculated results.
       return results != null && returnResults? Result.create(results): Result.EMPTY_RESULT;

http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index cbf6561..be4cca0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3651,4 +3651,9 @@ public class HRegionServer extends HasThread implements
     return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
       .regionLock(regionInfos, description, abort);
   }
+
+  @Override
+  public void unassign(byte[] regionName) throws IOException {
+    clusterConnection.getAdmin().unassign(regionName, false);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index c92124c..3382263 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -273,4 +273,14 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
    */
   EntityLock regionLock(List<HRegionInfo> regionInfos, String description,
       Abortable abort) throws IOException;
+
+  /**
+   * Unassign the given region from the current regionserver and assign it randomly. Could still be
+   * assigned to us. This is used to solve some tough problems for which you need to reset the state
+   * of a region. For example, if you hit FileNotFound exception and want to refresh the store file
+   * list.
+   * <p>
+   * See HBASE-17712 for more details.
+   */
+  void unassign(byte[] regionName) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
new file mode 100644
index 0000000..b347b4b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to unssign a region when we hit FNFE.
+ */
+@InterfaceAudience.Private
+class RegionUnassigner {
+
+  private static final Log LOG = LogFactory.getLog(RegionUnassigner.class);
+
+  private final RegionServerServices rsServices;
+
+  private final HRegionInfo regionInfo;
+
+  private boolean unassigning = false;
+
+  RegionUnassigner(RegionServerServices rsServices, HRegionInfo regionInfo) {
+    this.rsServices = rsServices;
+    this.regionInfo = regionInfo;
+  }
+
+  synchronized void unassign() {
+    if (unassigning) {
+      return;
+    }
+    unassigning = true;
+    new Thread("Unassign-" + regionInfo) {
+
+      @Override
+      public void run() {
+        LOG.info("Unassign " + regionInfo.getRegionNameAsString());
+        try {
+          rsServices.unassign(regionInfo.getRegionName());
+        } catch (IOException e) {
+          LOG.warn("Unassigned " + regionInfo.getRegionNameAsString() + " failed", e);
+        } finally {
+          synchronized (RegionUnassigner.this) {
+            unassigning = false;
+          }
+        }
+      }
+    }.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index c3e96cf..69ca1c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -615,7 +615,15 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
             break;
           }
         } else {
-          throw e.unwrapRemoteException();
+          IOException ioe = e.unwrapRemoteException();
+          // this usually means master already think we are dead so let's fail all the pending
+          // syncs. The shutdown process of RS will wait for all regions to be closed before calling
+          // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
+          // lock.
+          if (e.getMessage().contains("Parent directory doesn't exist:")) {
+            syncFutures.forEach(f -> f.done(f.getTxid(), ioe));
+          }
+          throw ioe;
         }
       } catch (NameNodeException e) {
         throw e;
@@ -696,6 +704,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     this.writer.close();
     this.writer = null;
     closeExecutor.shutdown();
+    IOException error = new IOException("WAL has been closed");
+    syncFutures.forEach(f -> f.done(f.getTxid(), error));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 7740e66..81b3489 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -356,4 +356,8 @@ public class MockRegionServerServices implements RegionServerServices {
   public SecureBulkLoadManager getSecureBulkLoadManager() {
     return null;
   }
+
+  @Override
+  public void unassign(byte[] regionName) throws IOException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index a5fe952..b653e3f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -716,4 +716,8 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   public SecureBulkLoadManager getSecureBulkLoadManager() {
     return null;
   }
+
+  @Override
+  public void unassign(byte[] regionName) throws IOException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
new file mode 100644
index 0000000..ae81a4b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * This testcase is used to ensure that the compaction marker will fail a compaction if the RS is
+ * already dead. It can not eliminate FNFE when scanning but it does reduce the possibility a lot.
+ */
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestCompactionInDeadRegionServer {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME = TableName.valueOf("test");
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  public static final class IgnoreYouAreDeadRS extends HRegionServer {
+
+    public IgnoreYouAreDeadRS(Configuration conf) throws IOException, InterruptedException {
+      super(conf);
+    }
+
+    public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) throws IOException {
+      super(conf, csm);
+    }
+
+    @Override
+    protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
+        throws IOException {
+      try {
+        super.tryRegionServerReport(reportStartTime, reportEndTime);
+      } catch (YouAreDeadException e) {
+        // ignore, do not abort
+      }
+    }
+  }
+
+  @Parameter
+  public Class<? extends WALProvider> walProvider;
+
+  @Parameters(name = "{index}: wal={0}")
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[] { FSHLogProvider.class },
+      new Object[] { AsyncFSWALProvider.class });
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, walProvider, WALProvider.class);
+    UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 2000);
+    UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, IgnoreYouAreDeadRS.class,
+      HRegionServer.class);
+    UTIL.startMiniCluster(2);
+    Table table = UTIL.createTable(TABLE_NAME, CF);
+    for (int i = 0; i < 10; i++) {
+      table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+    }
+    UTIL.getAdmin().flush(TABLE_NAME);
+    for (int i = 10; i < 20; i++) {
+      table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+    }
+    UTIL.getAdmin().flush(TABLE_NAME);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    HRegionServer rsToSuspend = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
+    HRegion region = (HRegion) rsToSuspend.getOnlineRegions(TABLE_NAME).get(0);
+    ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
+    watcher.getRecoverableZooKeeper().delete(
+      ZKUtil.joinZNode(watcher.getZNodePaths().rsZNode, rsToSuspend.getServerName().toString()),
+      -1);
+    UTIL.waitFor(60000, 1000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        for (RegionServerThread thread : UTIL.getHBaseCluster().getRegionServerThreads()) {
+          HRegionServer rs = thread.getRegionServer();
+          if (rs != rsToSuspend) {
+            return !rs.getOnlineRegions(TABLE_NAME).isEmpty();
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "The region for " + TABLE_NAME + " is still on " + rsToSuspend.getServerName();
+      }
+    });
+    try {
+      region.compact(true);
+      fail("Should fail as our wal file has already been closed, " +
+          "and walDir has also been renamed");
+    } catch (Exception e) {
+      // expected
+    }
+    Table table = UTIL.getConnection().getTable(TABLE_NAME);
+    // should not hit FNFE
+    for (int i = 0; i < 20; i++) {
+      assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
index cec5fc7..4264863 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -133,6 +135,23 @@ public class TestCorruptedRegionStoreFile {
     }
   }
 
+  private void removeStoreFile(FileSystem fs, Path tmpStoreFilePath) throws Exception {
+    try (FSDataInputStream input = fs.open(storeFiles.get(0))) {
+      fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+      LOG.info("Move file to local");
+      evictHFileCache(storeFiles.get(0));
+      // make sure that all the replicas have been deleted on DNs.
+      for (;;) {
+        try {
+          input.read(0, new byte[1], 0, 1);
+        } catch (FileNotFoundException e) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+    }
+  }
+
   @Test(timeout=180000)
   public void testLosingFileDuringScan() throws Exception {
     assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
@@ -148,9 +167,7 @@ public class TestCorruptedRegionStoreFile {
       public void beforeScanNext(Table table) throws Exception {
         // move the path away (now the region is corrupted)
         if (hasFile) {
-          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
-          LOG.info("Move file to local");
-          evictHFileCache(storeFiles.get(0));
+          removeStoreFile(fs, tmpStoreFilePath);
           hasFile = false;
         }
       }
@@ -174,9 +191,7 @@ public class TestCorruptedRegionStoreFile {
       public void beforeScan(Table table, Scan scan) throws Exception {
         // move the path away (now the region is corrupted)
         if (hasFile) {
-          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
-          LOG.info("Move file to local");
-          evictHFileCache(storeFiles.get(0));
+          removeStoreFile(fs, tmpStoreFilePath);
           hasFile = false;
         }
       }
@@ -201,7 +216,6 @@ public class TestCorruptedRegionStoreFile {
       HRegionServer rs = rst.getRegionServer();
       rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
     }
-    Thread.sleep(6000);
   }
 
   private int fullScanAndCount(final TableName tableName) throws Exception {