You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/04/09 03:30:21 UTC

[GitHub] [accumulo] keith-turner commented on a diff in pull request #2609: Implement a write thread limit

keith-turner commented on code in PR #2609:
URL: https://github.com/apache/accumulo/pull/2609#discussion_r846567403


##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +398,20 @@ private static long jitter() {
         * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
+  public Semaphore getSemaphore() {

Review Comment:
   Was wondering how to make this cheaper when the semaphore was not needed.  Thought one way was to use optional.
   
   ```suggestion
     public Optional<Semaphore> getSemaphore() {
   ```



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -722,6 +732,8 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
         }
       }
     } finally {
+      if (semaphoreCopy != null)
+        semaphoreCopy.release();

Review Comment:
   ```suggestion
         if (semaphoreCopy.isPresent())
           semaphoreCopy.get().release();
   ```



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -686,15 +685,26 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
 
   @Override
   public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
-      List<TMutation> tmutations) {
+      List<TMutation> tmutations) throws TException {
     UpdateSession us = (UpdateSession) server.sessionManager.reserveSession(updateID);
     if (us == null) {
       return;
     }
 
+    Semaphore semaphoreCopy = null;

Review Comment:
   ```suggestion
       Optional<Semaphore> semaphoreCopy = Optional.empty();
   ```



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +398,20 @@ private static long jitter() {
         * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
+  public Semaphore getSemaphore() {
+    int writeThreads =
+        getServerConfig().getConfiguration().getCount(Property.TSERV_MAX_WRITETHREADS);
+    if (writeThreads == 0)
+      writeThreads = Integer.MAX_VALUE;
+    if (sem == null || maxThreadPermits != writeThreads) {

Review Comment:
   sem and maxThreadPermits are read outside sync, should probably be volatile. Also the checks should be done again inside the sync block, because multuple threads could execute the if and get a true value and then block on the sync block.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -686,15 +685,26 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
 
   @Override
   public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
-      List<TMutation> tmutations) {
+      List<TMutation> tmutations) throws TException {
     UpdateSession us = (UpdateSession) server.sessionManager.reserveSession(updateID);
     if (us == null) {
       return;
     }
 
+    Semaphore semaphoreCopy = null;
     boolean reserved = true;
+
     try {
       KeyExtent keyExtent = KeyExtent.fromThrift(tkeyExtent);
+
+      if (TabletType.type(keyExtent) == TabletType.USER) {
+        semaphoreCopy = server.getSemaphore();
+        if (!semaphoreCopy.tryAcquire()) {

Review Comment:
   ```suggestion
           if (semaphoreCopy.isPresent() && !semaphoreCopy.get().tryAcquire()) {
   ```



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -240,6 +241,9 @@ public TabletServerMinCMetrics getMinCMetrics() {
   private final WalStateManager walMarker;
   private final ServerContext context;
 
+  private int maxThreadPermits = 0;
+  private Semaphore sem;

Review Comment:
   If using optional, could make this instance var optiona so that we can avoid having to recreated the optional object.
   
   ```suggestion
     private Optional<Semaphore> sem;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org