You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/05/25 11:41:38 UTC

hbase git commit: HBASE-18099 FlushSnapshotSubprocedure should wait for concurrent Region#flush() to finish

Repository: hbase
Updated Branches:
  refs/heads/master a3c5a7448 -> 3e426b2f8


HBASE-18099 FlushSnapshotSubprocedure should wait for concurrent Region#flush() to finish


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3e426b2f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3e426b2f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3e426b2f

Branch: refs/heads/master
Commit: 3e426b2f851f40dbc4e8d53c607cddebdb8a73e0
Parents: a3c5a74
Author: tedyu <yu...@gmail.com>
Authored: Thu May 25 04:41:29 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Thu May 25 04:41:29 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 31 ++++++++++++++++++++
 .../hadoop/hbase/regionserver/Region.java       |  3 ++
 .../snapshot/FlushSnapshotSubprocedure.java     |  8 ++++-
 3 files changed, 41 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3e426b2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a4a7537..f58729d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1757,6 +1757,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  @Override
+  public void waitForFlushes() {
+    synchronized (writestate) {
+      if (this.writestate.readOnly) {
+        // we should not wait for replayed flushed if we are read only (for example in case the
+        // region is a secondary replica).
+        return;
+      }
+      if (!writestate.flushing) return;
+      long start = System.currentTimeMillis();
+      boolean interrupted = false;
+      try {
+        while (writestate.flushing) {
+          LOG.debug("waiting for cache flush to complete for region " + this);
+          try {
+            writestate.wait();
+          } catch (InterruptedException iex) {
+            // essentially ignore and propagate the interrupt back up
+            LOG.warn("Interrupted while waiting");
+            interrupted = true;
+          }
+        }
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      long duration = System.currentTimeMillis() - start;
+      LOG.debug("Waited " + duration + " ms for flush to complete");
+    }
+  }
   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
       final String threadNamePrefix) {
     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/3e426b2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 295b825..5ff5e52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -745,4 +745,7 @@ public interface Region extends ConfigurationObserver {
   /** Wait for all current flushes and compactions of the region to complete */
   void waitForFlushesAndCompactions();
 
+  /** Wait for all current flushes of the region to complete
+   */
+  void waitForFlushes();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3e426b2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
index 9c42e4d..22df895 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember;
 import org.apache.hadoop.hbase.procedure.Subprocedure;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -95,7 +96,12 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
           LOG.debug("take snapshot without flush memstore first");
         } else {
           LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
-          region.flush(true);
+          FlushResult res = region.flush(true);
+          if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
+            // CANNOT_FLUSH may mean that a flush is already on-going
+            // we need to wait for that flush to complete
+            region.waitForFlushes();
+          }
         }
         ((HRegion)region).addRegionToSnapshot(snapshot, monitor);
         if (snapshotSkipFlush) {