You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/07/17 17:59:05 UTC

[72/94] [abbrv] hbase git commit: HBASE-18358 Backport HBASE-18099 'FlushSnapshotSubprocedure should wait for concurrent Region#flush() to finish'

HBASE-18358 Backport HBASE-18099 'FlushSnapshotSubprocedure should wait for concurrent Region#flush() to finish'


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c0f743e4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c0f743e4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c0f743e4

Branch: refs/heads/HBASE-14070.HLC
Commit: c0f743e44f3e9ec8095d983215555bf8558a6ec1
Parents: cc4301c
Author: tedyu <yu...@gmail.com>
Authored: Tue Jul 11 17:26:22 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Jul 11 17:26:22 2017 -0700

----------------------------------------------------------------------
 .../snapshot/FlushSnapshotSubprocedure.java     | 30 ++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c0f743e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
index 22df895..b30d622 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver.snapshot;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Callable;
 
@@ -24,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.procedure.ProcedureMember;
@@ -52,6 +54,9 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
   private final SnapshotSubprocedurePool taskManager;
   private boolean snapshotSkipFlush = false;
 
+  // the maximum number of attempts we flush
+  final static int MAX_RETRIES = 3;
+
   public FlushSnapshotSubprocedure(ProcedureMember member,
       ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
       List<Region> regions, SnapshotDescription snapshot,
@@ -96,11 +101,26 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
           LOG.debug("take snapshot without flush memstore first");
         } else {
           LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
-          FlushResult res = region.flush(true);
-          if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
-            // CANNOT_FLUSH may mean that a flush is already on-going
-            // we need to wait for that flush to complete
-            region.waitForFlushes();
+          boolean succeeded = false;
+          long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
+          for (int i = 0; i < MAX_RETRIES; i++) {
+            FlushResult res = region.flush(true);
+            if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
+              // CANNOT_FLUSH may mean that a flush is already on-going
+              // we need to wait for that flush to complete
+              region.waitForFlushes();
+              if (region.getMaxFlushedSeqId() >= readPt) {
+                // writes at the start of the snapshot have been persisted
+                succeeded = true;
+                break;
+              }
+            } else {
+              succeeded = true;
+              break;
+            }
+          }
+          if (!succeeded) {
+            throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
           }
         }
         ((HRegion)region).addRegionToSnapshot(snapshot, monitor);