You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/06/20 07:33:37 UTC

[1/2] hbase git commit: Is this ok... removing synchronizations?

Repository: hbase
Updated Branches:
  refs/heads/branch-1 1ac2e384b -> d34e65327


Is this ok... removing synchronizations?


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/86242e1f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/86242e1f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/86242e1f

Branch: refs/heads/branch-1
Commit: 86242e1f55da7df6a2119389897d11356e6bbc2a
Parents: 1ac2e38
Author: stack <st...@apache.org>
Authored: Mon Jun 20 08:23:35 2016 +0100
Committer: stack <st...@apache.org>
Committed: Mon Jun 20 08:23:49 2016 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 24 +++++++++++---------
 .../regionserver/ReversedRegionScannerImpl.java |  2 +-
 2 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/86242e1f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ec0a042..4bdd825 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -324,7 +324,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
 
-  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
+  // Map of outstanding region scanners to their mvcc read point. Used figuring what is oldest
+  // outstanding read point
+  private final Map<RegionScanner, Long> scannerReadPoints =
+      new ConcurrentHashMap<RegionScanner, Long>();
 
   /**
    * The sequence ID that was encountered when this region was opened.
@@ -682,7 +685,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.rsServices = rsServices;
     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
     setHTableSpecificConf();
-    this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
 
     this.busyWaitDuration = conf.getLong(
       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
@@ -5631,9 +5633,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
      * If the joined heap data gathering is interrupted due to scan limits, this will
      * contain the row for which we are populating the values.*/
     protected Cell joinedContinuationRow = null;
-    private boolean filterClosed = false;
+    private volatile boolean closed = false;
 
-    protected final int isScan;
+    protected final int scan;
     protected final byte[] stopRow;
     protected final HRegion region;
 
@@ -5671,7 +5673,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       // If we are doing a get, we want to be [startRow,endRow] normally
       // it is [startRow,endRow) and if startRow=endRow we get nothing.
-      this.isScan = scan.isGetScan() ? -1 : 0;
+      this.scan = scan.isGetScan() ? -1 : 0;
 
       // synchronize on scannerReadPoints so that nobody calculates
       // getSmallestReadPoint, before scannerReadPoints is updated.
@@ -5751,9 +5753,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     @Override
-    public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
+    public boolean next(List<Cell> outResults, ScannerContext scannerContext)
     throws IOException {
-      if (this.filterClosed) {
+      if (this.closed) {
         throw new UnknownScannerException("Scanner was closed (timed out?) " +
             "after we renewed it. Could be caused by a very slow scanner " +
             "or a lengthy garbage collection");
@@ -6187,11 +6189,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return currentRow == null ||
           (stopRow != null &&
           comparator.compareRows(stopRow, 0, stopRow.length,
-            currentRow, offset, length) <= isScan);
+            currentRow, offset, length) <= scan);
     }
 
     @Override
-    public synchronized void close() {
+    public void close() {
+      this.closed = true;
       if (storeHeap != null) {
         storeHeap.close();
         storeHeap = null;
@@ -6200,11 +6203,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         joinedHeap.close();
         joinedHeap = null;
       }
-      // no need to synchronize here.
       scannerReadPoints.remove(this);
-      this.filterClosed = true;
     }
 
+    @VisibleForTesting
     KeyValueHeap getStoreHeapForTesting() {
       return storeHeap;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/86242e1f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
index 63c1ca6..77aaf37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
@@ -59,7 +59,7 @@ class ReversedRegionScannerImpl extends RegionScannerImpl {
   protected boolean isStopRow(byte[] currentRow, int offset, short length) {
     return currentRow == null
         || (super.stopRow != null && region.getComparator().compareRows(
-            stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan);
+            stopRow, 0, stopRow.length, currentRow, offset, length) >= super.scan);
   }
 
   @Override


[2/2] hbase git commit: HBASE-16023 Fastpath for the FIFO rpcscheduler AMENDMENT

Posted by st...@apache.org.
HBASE-16023 Fastpath for the FIFO rpcscheduler AMENDMENT


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d34e6532
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d34e6532
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d34e6532

Branch: refs/heads/branch-1
Commit: d34e65327e77338d24ace159b0756836fd91406d
Parents: 86242e1
Author: stack <st...@apache.org>
Authored: Mon Jun 20 08:28:39 2016 +0100
Committer: stack <st...@apache.org>
Committed: Mon Jun 20 08:28:43 2016 +0100

----------------------------------------------------------------------
 .../hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d34e6532/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
index 1951dd0..1a362bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
@@ -74,7 +74,7 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
     // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
     final Deque<FastPathHandler> fastPathHandlerStack;
     // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
-    private Semaphore semaphore = new Semaphore(1);
+    private Semaphore semaphore = new Semaphore(0);
     // The task we get when fast-pathing.
     private CallRunner loadedCallRunner;
 
@@ -82,7 +82,6 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
         final Deque<FastPathHandler> fastPathHandlerStack) {
       super(name, handlerFailureThreshhold, q);
       this.fastPathHandlerStack = fastPathHandlerStack;
-      this.semaphore.drainPermits();
     }
 
     protected CallRunner getCallRunner() throws InterruptedException {
@@ -95,6 +94,7 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
           this.fastPathHandlerStack.push(this);
           this.semaphore.acquire();
           cr = this.loadedCallRunner;
+          this.loadedCallRunner = null;
         } else {
           // No fastpath available. Block until a task comes available.
           cr = super.getCallRunner();
@@ -113,4 +113,4 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
       return true;
     }
   }
-}
\ No newline at end of file
+}