You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/04/09 03:30:21 UTC
[GitHub] [accumulo] keith-turner commented on a diff in pull request #2609: Implement a write thread limit
keith-turner commented on code in PR #2609:
URL: https://github.com/apache/accumulo/pull/2609#discussion_r846567403
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +398,20 @@ private static long jitter() {
* TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
}
+ public Semaphore getSemaphore() {
Review Comment:
Was wondering how to make this cheaper when the semaphore was not needed. Thought one way was to use optional.
```suggestion
public Optional<Semaphore> getSemaphore() {
```
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -722,6 +732,8 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
}
}
} finally {
+ if (semaphoreCopy != null)
+ semaphoreCopy.release();
Review Comment:
```suggestion
if (semaphoreCopy.isPresent())
semaphoreCopy.get().release();
```
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -686,15 +685,26 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
@Override
public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
- List<TMutation> tmutations) {
+ List<TMutation> tmutations) throws TException {
UpdateSession us = (UpdateSession) server.sessionManager.reserveSession(updateID);
if (us == null) {
return;
}
+ Semaphore semaphoreCopy = null;
Review Comment:
```suggestion
Optional<Semaphore> semaphoreCopy = Optional.empty();
```
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +398,20 @@ private static long jitter() {
* TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
}
+ public Semaphore getSemaphore() {
+ int writeThreads =
+ getServerConfig().getConfiguration().getCount(Property.TSERV_MAX_WRITETHREADS);
+ if (writeThreads == 0)
+ writeThreads = Integer.MAX_VALUE;
+ if (sem == null || maxThreadPermits != writeThreads) {
Review Comment:
sem and maxThreadPermits are read outside sync, should probably be volatile. Also the checks should be done again inside the sync block, because multuple threads could execute the if and get a true value and then block on the sync block.
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -686,15 +685,26 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
@Override
public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
- List<TMutation> tmutations) {
+ List<TMutation> tmutations) throws TException {
UpdateSession us = (UpdateSession) server.sessionManager.reserveSession(updateID);
if (us == null) {
return;
}
+ Semaphore semaphoreCopy = null;
boolean reserved = true;
+
try {
KeyExtent keyExtent = KeyExtent.fromThrift(tkeyExtent);
+
+ if (TabletType.type(keyExtent) == TabletType.USER) {
+ semaphoreCopy = server.getSemaphore();
+ if (!semaphoreCopy.tryAcquire()) {
Review Comment:
```suggestion
if (semaphoreCopy.isPresent() && !semaphoreCopy.get().tryAcquire()) {
```
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -240,6 +241,9 @@ public TabletServerMinCMetrics getMinCMetrics() {
private final WalStateManager walMarker;
private final ServerContext context;
+ private int maxThreadPermits = 0;
+ private Semaphore sem;
Review Comment:
If using optional, could make this instance var optiona so that we can avoid having to recreated the optional object.
```suggestion
private Optional<Semaphore> sem;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org