You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/06/14 20:22:17 UTC

hbase git commit: HBASE-13877 Interrupt to flush from TableFlushProcedure causes dataloss in ITBLL

Repository: hbase
Updated Branches:
  refs/heads/master 682b8ab8a -> c6dd3f965


HBASE-13877 Interrupt to flush from TableFlushProcedure causes dataloss in ITBLL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6dd3f96
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6dd3f96
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6dd3f96

Branch: refs/heads/master
Commit: c6dd3f965ba00722a7b8418f61180e694b846610
Parents: 682b8ab
Author: Enis Soztutar <en...@apache.org>
Authored: Sun Jun 14 11:22:07 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Sun Jun 14 11:22:07 2015 -0700

----------------------------------------------------------------------
 .../test/IntegrationTestBigLinkedList.java      | 14 ++++++--
 .../procedure/flush/FlushTableSubprocedure.java |  1 -
 .../RegionServerFlushTableProcedureManager.java | 22 +++++++++----
 .../hadoop/hbase/regionserver/HRegion.java      | 34 +++++++++++++++++++-
 .../hbase/regionserver/RSRpcServices.java       |  6 ++++
 .../hadoop/hbase/regionserver/Region.java       | 20 ++++++------
 .../hbase/regionserver/RegionMergeRequest.java  |  5 +++
 .../hadoop/hbase/regionserver/SplitRequest.java |  7 +++-
 .../snapshot/RegionServerSnapshotManager.java   | 18 ++++++++---
 .../hadoop/hbase/regionserver/TestHRegion.java  |  1 +
 .../hbase/regionserver/wal/TestWALReplay.java   |  2 ++
 11 files changed, 104 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 5cfd944..065f984 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -219,7 +219,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
 
   protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
 
-  private static final int MISSING_ROWS_TO_LOG = 2; // YARN complains when too many counters
+  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
 
   private static final int WIDTH_DEFAULT = 1000000;
   private static final int WRAP_DEFAULT = 25;
@@ -665,6 +665,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
        */
       public static class WALMapperSearcher extends WALMapper {
         private SortedSet<byte []> keysToFind;
+        private AtomicInteger rows = new AtomicInteger(0);
 
         @Override
         public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
@@ -686,8 +687,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
           boolean b = this.keysToFind.contains(row);
           if (b) {
             String keyStr = Bytes.toStringBinary(row);
-            LOG.info("Found cell=" + cell);
-            context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+            try {
+              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
+            } catch (IOException|InterruptedException e) {
+              LOG.warn(e);
+            }
+            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
+              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+            }
+            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
           }
           return b;
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
index 8d64f2a..5723919 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember;
 import org.apache.hadoop.hbase.procedure.Subprocedure;
 import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Region.Operation;
 
 /**
  * This flush region implementation uses the distributed procedure framework to flush

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index 7664dee..a441a6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -33,7 +33,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
@@ -157,7 +159,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
         FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
 
     FlushTableSubprocedurePool taskManager =
-        new FlushTableSubprocedurePool(rss.getServerName().toString(), conf);
+        new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
     return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
       timeoutMillis, involvedRegions, table, taskManager);
   }
@@ -195,13 +197,15 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
    * failures.
    */
   static class FlushTableSubprocedurePool {
+    private final Abortable abortable;
     private final ExecutorCompletionService<Void> taskPool;
     private final ThreadPoolExecutor executor;
     private volatile boolean stopped;
     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
     private final String name;
 
-    FlushTableSubprocedurePool(String name, Configuration conf) {
+    FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
+      this.abortable = abortable;
       // configure the executor service
       long keepAlive = conf.getLong(
         RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
@@ -259,9 +263,13 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
         }
         // we are stopped so we can just exit.
       } catch (ExecutionException e) {
-        if (e.getCause() instanceof ForeignException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof ForeignException) {
           LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
           throw (ForeignException)e.getCause();
+        } else if (cause instanceof DroppedSnapshotException) {
+          // we have to abort the region server according to contract of flush
+          abortable.abort("Received DroppedSnapshotException, aborting", cause);
         }
         LOG.warn("Got Exception in FlushSubprocedurePool", e);
         throw new ForeignException(name, e.getCause());
@@ -272,7 +280,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
     }
 
     /**
-     * This attempts to cancel out all pending and in progress tasks (interruptions issues)
+     * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running
+     * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
      * @throws InterruptedException
      */
     void cancelTasks() throws InterruptedException {
@@ -289,13 +298,14 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
     }
 
     /**
-     * Abruptly shutdown the thread pool.  Call when exiting a region server.
+     * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be
+     * interrupted (see HBASE-13877)
      */
     void stop() {
       if (this.stopped) return;
 
       this.stopped = true;
-      this.executor.shutdownNow();
+      this.executor.shutdown();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 262ad6a..89807b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -475,6 +475,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
      * @return true if the memstores were flushed, else false.
      */
+    @Override
     public boolean isFlushSucceeded() {
       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
           .FLUSHED_COMPACTION_NEEDED;
@@ -484,6 +485,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
      */
+    @Override
     public boolean isCompactionNeeded() {
       return result == Result.FLUSHED_COMPACTION_NEEDED;
     }
@@ -1272,6 +1274,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * vector if already closed and null if judged that it should not close.
    *
    * @throws IOException e
+   * @throws DroppedSnapshotException Thrown when replay of wal is required
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and the
+   * caller MUST abort after this.
    */
   public Map<byte[], List<StoreFile>> close() throws IOException {
     return close(false);
@@ -1309,6 +1314,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * we are not to close at this time or we are already closed.
    *
    * @throws IOException e
+   * @throws DroppedSnapshotException Thrown when replay of wal is required
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and the
+   * caller MUST abort after this.
    */
   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
     // Only allow one thread to close at a time. Serialize them so dual
@@ -1327,6 +1335,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  /**
+   * Exposed for some very specific unit tests.
+   */
+  @VisibleForTesting
+  public void setClosing(boolean closing) {
+    this.closing.set(closing);
+  }
+
   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
       throws IOException {
     if (isClosed()) {
@@ -1826,7 +1842,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    *
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of wal is required
-   * because a Snapshot was not properly persisted.
+   * because a Snapshot was not properly persisted. The region is put in closing mode, and the
+   * caller MUST abort after this.
    */
   public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
       throws IOException {
@@ -2337,6 +2354,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           Bytes.toStringBinary(getRegionInfo().getRegionName()));
       dse.initCause(t);
       status.abort("Flush failed: " + StringUtils.stringifyException(t));
+
+      // Callers for flushcache() should catch DroppedSnapshotException and abort the region server.
+      // However, since we may have the region read lock, we cannot call close(true) here since
+      // we cannot promote to a write lock. Instead we are setting closing so that all other region
+      // operations except for close will be rejected.
+      this.closing.set(true);
+
+      if (rsServices != null) {
+        // This is a safeguard against the case where the caller fails to explicitly handle aborting
+        rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
+      }
+
       throw dse;
     }
 
@@ -6407,6 +6436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return results;
   }
 
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     // Don't need nonces here - RowMutations only supports puts and deletes
     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
@@ -6433,6 +6463,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
    * @throws IOException
    */
+  @Override
   public void mutateRowsWithLocks(Collection<Mutation> mutations,
       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
@@ -7436,6 +7467,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
 
   /** @return the coprocessor host */
+  @Override
   public RegionCoprocessorHost getCoprocessorHost() {
     return coprocessorHost;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index cedaa7c..5fd285a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1329,6 +1329,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
       return MergeRegionsResponse.newBuilder().build();
+    } catch (DroppedSnapshotException ex) {
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }
@@ -1741,6 +1744,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       ((HRegion)region).forceSplit(splitPoint);
       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit());
       return SplitRegionResponse.newBuilder().build();
+    } catch (DroppedSnapshotException ex) {
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 5c500b4..20a034e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -147,7 +147,7 @@ public interface Region extends ConfigurationObserver {
    */
   long getOldestHfileTs(boolean majorCompactioOnly) throws IOException;
 
-  /** 
+  /**
    * @return map of column family names to max sequence id that was read from storage when this
    * region was opened
    */
@@ -168,7 +168,7 @@ public interface Region extends ConfigurationObserver {
 
   ///////////////////////////////////////////////////////////////////////////
   // Metrics
-  
+
   /** @return read requests count for this region */
   long getReadRequestsCount();
 
@@ -192,7 +192,7 @@ public interface Region extends ConfigurationObserver {
 
   /** @return the number of mutations processed bypassing the WAL */
   long getNumMutationsWithoutWAL();
-  
+
   /** @return the size of data processed bypassing the WAL, in bytes */
   long getDataInMemoryWithoutWAL();
 
@@ -227,7 +227,7 @@ public interface Region extends ConfigurationObserver {
 
   /**
    * This method needs to be called before any public call that reads or
-   * modifies data. 
+   * modifies data.
    * Acquires a read lock and checks if the region is closing or closed.
    * <p>{@link #closeRegionOperation} MUST then always be called after
    * the operation has completed, whether it succeeded or failed.
@@ -237,7 +237,7 @@ public interface Region extends ConfigurationObserver {
 
   /**
    * This method needs to be called before any public call that reads or
-   * modifies data. 
+   * modifies data.
    * Acquires a read lock and checks if the region is closing or closed.
    * <p>{@link #closeRegionOperation} MUST then always be called after
    * the operation has completed, whether it succeeded or failed.
@@ -427,7 +427,7 @@ public interface Region extends ConfigurationObserver {
 
   /**
    * Perform atomic mutations within the region.
-   * 
+   *
    * @param mutations The list of mutations to perform.
    * <code>mutations</code> can contain operations for multiple rows.
    * Caller has to ensure that all rows are contained in this region.
@@ -623,13 +623,13 @@ public interface Region extends ConfigurationObserver {
       CANNOT_FLUSH_MEMSTORE_EMPTY,
       CANNOT_FLUSH
     }
-    
+
     /** @return the detailed result code */
     Result getResult();
 
     /** @return true if the memstores were flushed, else false */
     boolean isFlushSucceeded();
-    
+
     /** @return True if the flush requested a compaction, else false */
     boolean isCompactionNeeded();
   }
@@ -653,6 +653,8 @@ public interface Region extends ConfigurationObserver {
    *
    * @throws IOException general io exceptions
    * because a snapshot was not properly persisted.
+   * @throws DroppedSnapshotException Thrown when abort is required. The caller MUST catch this
+   * exception and MUST abort. Any further operation to the region may cause data loss.
    */
   FlushResult flush(boolean force) throws IOException;
 
@@ -660,7 +662,7 @@ public interface Region extends ConfigurationObserver {
    * Synchronously compact all stores in the region.
    * <p>This operation could block for a long time, so don't call it from a
    * time-sensitive thread.
-   * <p>Note that no locks are taken to prevent possible conflicts between 
+   * <p>Note that no locks are taken to prevent possible conflicts between
    * compaction and splitting activities. The regionserver does not normally compact
    * and split in parallel. However by calling this method you may introduce
    * unexpected and unhandled concurrency. Don't do this unless you know what

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
index bc2867e..4292751 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.ipc.RemoteException;
@@ -93,6 +94,10 @@ class RegionMergeRequest implements Runnable {
                   + (this.server.isStopping() ? " stopping" : " stopped"), e);
           return;
         }
+        if (e instanceof DroppedSnapshotException) {
+          server.abort("Replay of WAL required. Forcing server shutdown", e);
+          return;
+        }
         try {
           LOG.warn("Running rollback/cleanup of failed merge of "
                   + region_a +" and "+ region_b + "; " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index ea47e2d..c40f9b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -91,6 +92,10 @@ class SplitRequest implements Runnable {
                   + (this.server.isStopping() ? " stopping" : " stopped"), e);
           return;
         }
+        if (e instanceof DroppedSnapshotException) {
+          server.abort("Replay of WAL required. Forcing server shutdown", e);
+          return;
+        }
         try {
           LOG.info("Running rollback/cleanup of failed split of " +
             parent.getRegionInfo().getRegionNameAsString() + "; " + e.getMessage(), e);
@@ -148,7 +153,7 @@ class SplitRequest implements Runnable {
       try {
         this.tableLock.release();
       } catch (IOException ex) {
-        LOG.error("Could not release the table lock (something is really wrong). " 
+        LOG.error("Could not release the table lock (something is really wrong). "
            + "Aborting this server to avoid holding the lock forever.");
         this.server.abort("Abort; we got an error when releasing the table lock "
                          + "on " + parent.getRegionInfo().getRegionNameAsString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
index 021c16f..f04feb1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
@@ -35,7 +35,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -184,7 +186,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
     switch (snapshot.getType()) {
     case FLUSH:
       SnapshotSubprocedurePool taskManager =
-        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
+        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
       return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
           timeoutMillis, involvedRegions, snapshot, taskManager);
     case SKIPFLUSH:
@@ -196,7 +198,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
          * To minimized the code change, class name is not changed.
          */
         SnapshotSubprocedurePool taskManager2 =
-            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
+            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
         return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
             timeoutMillis, involvedRegions, snapshot, taskManager2);
 
@@ -265,13 +267,15 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
    * operations such as compactions and replication  sinks.
    */
   static class SnapshotSubprocedurePool {
+    private final Abortable abortable;
     private final ExecutorCompletionService<Void> taskPool;
     private final ThreadPoolExecutor executor;
     private volatile boolean stopped;
     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
     private final String name;
 
-    SnapshotSubprocedurePool(String name, Configuration conf) {
+    SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
+      this.abortable = abortable;
       // configure the executor service
       long keepAlive = conf.getLong(
         RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
@@ -331,9 +335,13 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
         }
         // we are stopped so we can just exit.
       } catch (ExecutionException e) {
-        if (e.getCause() instanceof ForeignException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof ForeignException) {
           LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
           throw (ForeignException)e.getCause();
+        } else if (cause instanceof DroppedSnapshotException) {
+          // we have to abort the region server according to contract of flush
+          abortable.abort("Received DroppedSnapshotException, aborting", cause);
         }
         LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
         throw new ForeignException(name, e.getCause());
@@ -371,7 +379,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
       if (this.stopped) return;
 
       this.stopped = true;
-      this.executor.shutdownNow();
+      this.executor.shutdown();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 8ee2212..6d1859c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -412,6 +412,7 @@ public class TestHRegion {
             Assert.fail("Didn't bubble up IOE!");
           } catch (DroppedSnapshotException dse) {
             // What we are expecting
+            region.closing.set(false); // this is needed for the rest of the test to work
           }
           // Make it so all writes succeed from here on out
           ffs.fault.set(false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 64e81fa..c943d12 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -714,6 +714,8 @@ public class TestWALReplay {
           + t.getMessage());
       // simulated to abort server
       Mockito.doReturn(true).when(rsServices).isAborted();
+      region.setClosing(false); // region normally does not accept writes after
+      // DroppedSnapshotException. We mock around it for this test.
     }
     // writing more data
     int moreRow = 10;