You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/06/20 07:33:37 UTC
[1/2] hbase git commit: Is this ok... removing synchronizations?
Repository: hbase
Updated Branches:
refs/heads/branch-1 1ac2e384b -> d34e65327
Is this ok... removing synchronizations?
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/86242e1f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/86242e1f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/86242e1f
Branch: refs/heads/branch-1
Commit: 86242e1f55da7df6a2119389897d11356e6bbc2a
Parents: 1ac2e38
Author: stack <st...@apache.org>
Authored: Mon Jun 20 08:23:35 2016 +0100
Committer: stack <st...@apache.org>
Committed: Mon Jun 20 08:23:49 2016 +0100
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 24 +++++++++++---------
.../regionserver/ReversedRegionScannerImpl.java | 2 +-
2 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/86242e1f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ec0a042..4bdd825 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -324,7 +324,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
- private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
+ // Map of outstanding region scanners to their mvcc read point. Used figuring what is oldest
+ // outstanding read point
+ private final Map<RegionScanner, Long> scannerReadPoints =
+ new ConcurrentHashMap<RegionScanner, Long>();
/**
* The sequence ID that was encountered when this region was opened.
@@ -682,7 +685,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.rsServices = rsServices;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
setHTableSpecificConf();
- this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.busyWaitDuration = conf.getLong(
"hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
@@ -5631,9 +5633,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* If the joined heap data gathering is interrupted due to scan limits, this will
* contain the row for which we are populating the values.*/
protected Cell joinedContinuationRow = null;
- private boolean filterClosed = false;
+ private volatile boolean closed = false;
- protected final int isScan;
+ protected final int scan;
protected final byte[] stopRow;
protected final HRegion region;
@@ -5671,7 +5673,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// If we are doing a get, we want to be [startRow,endRow] normally
// it is [startRow,endRow) and if startRow=endRow we get nothing.
- this.isScan = scan.isGetScan() ? -1 : 0;
+ this.scan = scan.isGetScan() ? -1 : 0;
// synchronize on scannerReadPoints so that nobody calculates
// getSmallestReadPoint, before scannerReadPoints is updated.
@@ -5751,9 +5753,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
+ public boolean next(List<Cell> outResults, ScannerContext scannerContext)
throws IOException {
- if (this.filterClosed) {
+ if (this.closed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
"after we renewed it. Could be caused by a very slow scanner " +
"or a lengthy garbage collection");
@@ -6187,11 +6189,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return currentRow == null ||
(stopRow != null &&
comparator.compareRows(stopRow, 0, stopRow.length,
- currentRow, offset, length) <= isScan);
+ currentRow, offset, length) <= scan);
}
@Override
- public synchronized void close() {
+ public void close() {
+ this.closed = true;
if (storeHeap != null) {
storeHeap.close();
storeHeap = null;
@@ -6200,11 +6203,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
joinedHeap.close();
joinedHeap = null;
}
- // no need to synchronize here.
scannerReadPoints.remove(this);
- this.filterClosed = true;
}
+ @VisibleForTesting
KeyValueHeap getStoreHeapForTesting() {
return storeHeap;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/86242e1f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
index 63c1ca6..77aaf37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
@@ -59,7 +59,7 @@ class ReversedRegionScannerImpl extends RegionScannerImpl {
protected boolean isStopRow(byte[] currentRow, int offset, short length) {
return currentRow == null
|| (super.stopRow != null && region.getComparator().compareRows(
- stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan);
+ stopRow, 0, stopRow.length, currentRow, offset, length) >= super.scan);
}
@Override
[2/2] hbase git commit: HBASE-16023 Fastpath for the FIFO
rpcscheduler AMENDMENT
Posted by st...@apache.org.
HBASE-16023 Fastpath for the FIFO rpcscheduler AMENDMENT
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d34e6532
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d34e6532
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d34e6532
Branch: refs/heads/branch-1
Commit: d34e65327e77338d24ace159b0756836fd91406d
Parents: 86242e1
Author: stack <st...@apache.org>
Authored: Mon Jun 20 08:28:39 2016 +0100
Committer: stack <st...@apache.org>
Committed: Mon Jun 20 08:28:43 2016 +0100
----------------------------------------------------------------------
.../hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d34e6532/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
index 1951dd0..1a362bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
@@ -74,7 +74,7 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
final Deque<FastPathHandler> fastPathHandlerStack;
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
- private Semaphore semaphore = new Semaphore(1);
+ private Semaphore semaphore = new Semaphore(0);
// The task we get when fast-pathing.
private CallRunner loadedCallRunner;
@@ -82,7 +82,6 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
final Deque<FastPathHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, q);
this.fastPathHandlerStack = fastPathHandlerStack;
- this.semaphore.drainPermits();
}
protected CallRunner getCallRunner() throws InterruptedException {
@@ -95,6 +94,7 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
this.fastPathHandlerStack.push(this);
this.semaphore.acquire();
cr = this.loadedCallRunner;
+ this.loadedCallRunner = null;
} else {
// No fastpath available. Block until a task comes available.
cr = super.getCallRunner();
@@ -113,4 +113,4 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
return true;
}
}
-}
\ No newline at end of file
+}