You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/06/14 20:22:17 UTC
hbase git commit: HBASE-13877 Interrupt to flush from
TableFlushProcedure causes dataloss in ITBLL
Repository: hbase
Updated Branches:
refs/heads/master 682b8ab8a -> c6dd3f965
HBASE-13877 Interrupt to flush from TableFlushProcedure causes dataloss in ITBLL
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6dd3f96
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6dd3f96
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6dd3f96
Branch: refs/heads/master
Commit: c6dd3f965ba00722a7b8418f61180e694b846610
Parents: 682b8ab
Author: Enis Soztutar <en...@apache.org>
Authored: Sun Jun 14 11:22:07 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Sun Jun 14 11:22:07 2015 -0700
----------------------------------------------------------------------
.../test/IntegrationTestBigLinkedList.java | 14 ++++++--
.../procedure/flush/FlushTableSubprocedure.java | 1 -
.../RegionServerFlushTableProcedureManager.java | 22 +++++++++----
.../hadoop/hbase/regionserver/HRegion.java | 34 +++++++++++++++++++-
.../hbase/regionserver/RSRpcServices.java | 6 ++++
.../hadoop/hbase/regionserver/Region.java | 20 ++++++------
.../hbase/regionserver/RegionMergeRequest.java | 5 +++
.../hadoop/hbase/regionserver/SplitRequest.java | 7 +++-
.../snapshot/RegionServerSnapshotManager.java | 18 ++++++++---
.../hadoop/hbase/regionserver/TestHRegion.java | 1 +
.../hbase/regionserver/wal/TestWALReplay.java | 2 ++
11 files changed, 104 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 5cfd944..065f984 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -219,7 +219,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
- private static final int MISSING_ROWS_TO_LOG = 2; // YARN complains when too many counters
+ private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
private static final int WIDTH_DEFAULT = 1000000;
private static final int WRAP_DEFAULT = 25;
@@ -665,6 +665,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
*/
public static class WALMapperSearcher extends WALMapper {
private SortedSet<byte []> keysToFind;
+ private AtomicInteger rows = new AtomicInteger(0);
@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
@@ -686,8 +687,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
boolean b = this.keysToFind.contains(row);
if (b) {
String keyStr = Bytes.toStringBinary(row);
- LOG.info("Found cell=" + cell);
- context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+ try {
+ LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
+ } catch (IOException|InterruptedException e) {
+ LOG.warn(e);
+ }
+ if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
+ context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+ }
+ context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
}
return b;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
index 8d64f2a..5723919 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Region.Operation;
/**
* This flush region implementation uses the distributed procedure framework to flush
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index 7664dee..a441a6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -33,7 +33,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
@@ -157,7 +159,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
FlushTableSubprocedurePool taskManager =
- new FlushTableSubprocedurePool(rss.getServerName().toString(), conf);
+ new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, table, taskManager);
}
@@ -195,13 +197,15 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
* failures.
*/
static class FlushTableSubprocedurePool {
+ private final Abortable abortable;
private final ExecutorCompletionService<Void> taskPool;
private final ThreadPoolExecutor executor;
private volatile boolean stopped;
private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
private final String name;
- FlushTableSubprocedurePool(String name, Configuration conf) {
+ FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
+ this.abortable = abortable;
// configure the executor service
long keepAlive = conf.getLong(
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
@@ -259,9 +263,13 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
}
// we are stopped so we can just exit.
} catch (ExecutionException e) {
- if (e.getCause() instanceof ForeignException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof ForeignException) {
LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
throw (ForeignException)e.getCause();
+ } else if (cause instanceof DroppedSnapshotException) {
+ // we have to abort the region server according to contract of flush
+ abortable.abort("Received DroppedSnapshotException, aborting", cause);
}
LOG.warn("Got Exception in FlushSubprocedurePool", e);
throw new ForeignException(name, e.getCause());
@@ -272,7 +280,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
}
/**
- * This attempts to cancel out all pending and in progress tasks (interruptions issues)
+ * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running
+ * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
* @throws InterruptedException
*/
void cancelTasks() throws InterruptedException {
@@ -289,13 +298,14 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
}
/**
- * Abruptly shutdown the thread pool. Call when exiting a region server.
+ * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be
+ * interrupted (see HBASE-13877)
*/
void stop() {
if (this.stopped) return;
this.stopped = true;
- this.executor.shutdownNow();
+ this.executor.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 262ad6a..89807b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -475,6 +475,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
* @return true if the memstores were flushed, else false.
*/
+ @Override
public boolean isFlushSucceeded() {
return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
.FLUSHED_COMPACTION_NEEDED;
@@ -484,6 +485,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
* @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
*/
+ @Override
public boolean isCompactionNeeded() {
return result == Result.FLUSHED_COMPACTION_NEEDED;
}
@@ -1272,6 +1274,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* vector if already closed and null if judged that it should not close.
*
* @throws IOException e
+ * @throws DroppedSnapshotException Thrown when replay of wal is required
+ * because a Snapshot was not properly persisted. The region is put in closing mode, and the
+ * caller MUST abort after this.
*/
public Map<byte[], List<StoreFile>> close() throws IOException {
return close(false);
@@ -1309,6 +1314,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* we are not to close at this time or we are already closed.
*
* @throws IOException e
+ * @throws DroppedSnapshotException Thrown when replay of wal is required
+ * because a Snapshot was not properly persisted. The region is put in closing mode, and the
+ * caller MUST abort after this.
*/
public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
// Only allow one thread to close at a time. Serialize them so dual
@@ -1327,6 +1335,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ /**
+ * Exposed for some very specific unit tests.
+ */
+ @VisibleForTesting
+ public void setClosing(boolean closing) {
+ this.closing.set(closing);
+ }
+
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
throws IOException {
if (isClosed()) {
@@ -1826,7 +1842,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of wal is required
- * because a Snapshot was not properly persisted.
+ * because a Snapshot was not properly persisted. The region is put in closing mode, and the
+ * caller MUST abort after this.
*/
public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
throws IOException {
@@ -2337,6 +2354,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Bytes.toStringBinary(getRegionInfo().getRegionName()));
dse.initCause(t);
status.abort("Flush failed: " + StringUtils.stringifyException(t));
+
+ // Callers for flushcache() should catch DroppedSnapshotException and abort the region server.
+ // However, since we may have the region read lock, we cannot call close(true) here since
+ // we cannot promote to a write lock. Instead we are setting closing so that all other region
+ // operations except for close will be rejected.
+ this.closing.set(true);
+
+ if (rsServices != null) {
+ // This is a safeguard against the case where the caller fails to explicitly handle aborting
+ rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
+ }
+
throw dse;
}
@@ -6407,6 +6436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return results;
}
+ @Override
public void mutateRow(RowMutations rm) throws IOException {
// Don't need nonces here - RowMutations only supports puts and deletes
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
@@ -6433,6 +6463,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* <code>rowsToLock</code> is sorted in order to avoid deadlocks.
* @throws IOException
*/
+ @Override
public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
@@ -7436,6 +7467,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** @return the coprocessor host */
+ @Override
public RegionCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index cedaa7c..5fd285a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1329,6 +1329,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
return MergeRegionsResponse.newBuilder().build();
+ } catch (DroppedSnapshotException ex) {
+ regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+ throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}
@@ -1741,6 +1744,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
((HRegion)region).forceSplit(splitPoint);
regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit());
return SplitRegionResponse.newBuilder().build();
+ } catch (DroppedSnapshotException ex) {
+ regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
+ throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 5c500b4..20a034e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -147,7 +147,7 @@ public interface Region extends ConfigurationObserver {
*/
long getOldestHfileTs(boolean majorCompactioOnly) throws IOException;
- /**
+ /**
* @return map of column family names to max sequence id that was read from storage when this
* region was opened
*/
@@ -168,7 +168,7 @@ public interface Region extends ConfigurationObserver {
///////////////////////////////////////////////////////////////////////////
// Metrics
-
+
/** @return read requests count for this region */
long getReadRequestsCount();
@@ -192,7 +192,7 @@ public interface Region extends ConfigurationObserver {
/** @return the number of mutations processed bypassing the WAL */
long getNumMutationsWithoutWAL();
-
+
/** @return the size of data processed bypassing the WAL, in bytes */
long getDataInMemoryWithoutWAL();
@@ -227,7 +227,7 @@ public interface Region extends ConfigurationObserver {
/**
* This method needs to be called before any public call that reads or
- * modifies data.
+ * modifies data.
* Acquires a read lock and checks if the region is closing or closed.
* <p>{@link #closeRegionOperation} MUST then always be called after
* the operation has completed, whether it succeeded or failed.
@@ -237,7 +237,7 @@ public interface Region extends ConfigurationObserver {
/**
* This method needs to be called before any public call that reads or
- * modifies data.
+ * modifies data.
* Acquires a read lock and checks if the region is closing or closed.
* <p>{@link #closeRegionOperation} MUST then always be called after
* the operation has completed, whether it succeeded or failed.
@@ -427,7 +427,7 @@ public interface Region extends ConfigurationObserver {
/**
* Perform atomic mutations within the region.
- *
+ *
* @param mutations The list of mutations to perform.
* <code>mutations</code> can contain operations for multiple rows.
* Caller has to ensure that all rows are contained in this region.
@@ -623,13 +623,13 @@ public interface Region extends ConfigurationObserver {
CANNOT_FLUSH_MEMSTORE_EMPTY,
CANNOT_FLUSH
}
-
+
/** @return the detailed result code */
Result getResult();
/** @return true if the memstores were flushed, else false */
boolean isFlushSucceeded();
-
+
/** @return True if the flush requested a compaction, else false */
boolean isCompactionNeeded();
}
@@ -653,6 +653,8 @@ public interface Region extends ConfigurationObserver {
*
* @throws IOException general io exceptions
* because a snapshot was not properly persisted.
+ * @throws DroppedSnapshotException Thrown when abort is required. The caller MUST catch this
+ * exception and MUST abort. Any further operation to the region may cause data loss.
*/
FlushResult flush(boolean force) throws IOException;
@@ -660,7 +662,7 @@ public interface Region extends ConfigurationObserver {
* Synchronously compact all stores in the region.
* <p>This operation could block for a long time, so don't call it from a
* time-sensitive thread.
- * <p>Note that no locks are taken to prevent possible conflicts between
+ * <p>Note that no locks are taken to prevent possible conflicts between
* compaction and splitting activities. The regionserver does not normally compact
* and split in parallel. However by calling this method you may introduce
* unexpected and unhandled concurrency. Don't do this unless you know what
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
index bc2867e..4292751 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
@@ -93,6 +94,10 @@ class RegionMergeRequest implements Runnable {
+ (this.server.isStopping() ? " stopping" : " stopped"), e);
return;
}
+ if (e instanceof DroppedSnapshotException) {
+ server.abort("Replay of WAL required. Forcing server shutdown", e);
+ return;
+ }
try {
LOG.warn("Running rollback/cleanup of failed merge of "
+ region_a +" and "+ region_b + "; " + e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index ea47e2d..c40f9b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -91,6 +92,10 @@ class SplitRequest implements Runnable {
+ (this.server.isStopping() ? " stopping" : " stopped"), e);
return;
}
+ if (e instanceof DroppedSnapshotException) {
+ server.abort("Replay of WAL required. Forcing server shutdown", e);
+ return;
+ }
try {
LOG.info("Running rollback/cleanup of failed split of " +
parent.getRegionInfo().getRegionNameAsString() + "; " + e.getMessage(), e);
@@ -148,7 +153,7 @@ class SplitRequest implements Runnable {
try {
this.tableLock.release();
} catch (IOException ex) {
- LOG.error("Could not release the table lock (something is really wrong). "
+ LOG.error("Could not release the table lock (something is really wrong). "
+ "Aborting this server to avoid holding the lock forever.");
this.server.abort("Abort; we got an error when releasing the table lock "
+ "on " + parent.getRegionInfo().getRegionNameAsString());
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
index 021c16f..f04feb1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
@@ -35,7 +35,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -184,7 +186,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
switch (snapshot.getType()) {
case FLUSH:
SnapshotSubprocedurePool taskManager =
- new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
+ new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, snapshot, taskManager);
case SKIPFLUSH:
@@ -196,7 +198,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
* To minimized the code change, class name is not changed.
*/
SnapshotSubprocedurePool taskManager2 =
- new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
+ new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, snapshot, taskManager2);
@@ -265,13 +267,15 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
* operations such as compactions and replication sinks.
*/
static class SnapshotSubprocedurePool {
+ private final Abortable abortable;
private final ExecutorCompletionService<Void> taskPool;
private final ThreadPoolExecutor executor;
private volatile boolean stopped;
private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
private final String name;
- SnapshotSubprocedurePool(String name, Configuration conf) {
+ SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
+ this.abortable = abortable;
// configure the executor service
long keepAlive = conf.getLong(
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
@@ -331,9 +335,13 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
}
// we are stopped so we can just exit.
} catch (ExecutionException e) {
- if (e.getCause() instanceof ForeignException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof ForeignException) {
LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
throw (ForeignException)e.getCause();
+ } else if (cause instanceof DroppedSnapshotException) {
+ // we have to abort the region server according to contract of flush
+ abortable.abort("Received DroppedSnapshotException, aborting", cause);
}
LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
throw new ForeignException(name, e.getCause());
@@ -371,7 +379,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
if (this.stopped) return;
this.stopped = true;
- this.executor.shutdownNow();
+ this.executor.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 8ee2212..6d1859c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -412,6 +412,7 @@ public class TestHRegion {
Assert.fail("Didn't bubble up IOE!");
} catch (DroppedSnapshotException dse) {
// What we are expecting
+ region.closing.set(false); // this is needed for the rest of the test to work
}
// Make it so all writes succeed from here on out
ffs.fault.set(false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6dd3f96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 64e81fa..c943d12 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -714,6 +714,8 @@ public class TestWALReplay {
+ t.getMessage());
// simulated to abort server
Mockito.doReturn(true).when(rsServices).isAborted();
+ region.setClosing(false); // region normally does not accept writes after
+ // DroppedSnapshotException. We mock around it for this test.
}
// writing more data
int moreRow = 10;