You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/09/30 20:35:43 UTC

[GitHub] [accumulo] milleruntime commented on pull request #2296: Fix TransactionRunner, revert changes made in earlier commit

milleruntime commented on pull request #2296:
URL: https://github.com/apache/accumulo/pull/2296#issuecomment-931645728


   Since this is difficult to review, I checked out your branch and did a diff with what was in Fate.java before your changes:
   ```diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java
   index cf02c56cf6..cd53546b74 100644
   --- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
   +++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
   @@ -21,6 +21,9 @@ package org.apache.accumulo.fate;
    import java.io.IOException;
    import java.util.EnumSet;
    import java.util.concurrent.ExecutorService;
   +import java.util.concurrent.RejectedExecutionException;
   +import java.util.concurrent.ScheduledThreadPoolExecutor;
   +import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.function.Function;
   @@ -50,6 +53,7 @@ public class Fate<T> {
    
      private final TStore<T> store;
      private final T environment;
   +  private ScheduledThreadPoolExecutor fatePoolWatcher;
      private ExecutorService executor;
    
      private static final EnumSet<TStatus> FINISHED_STATES =
   @@ -113,7 +117,6 @@ public class Fate<T> {
                store.unreserve(tid, deferTime);
              }
            }
   -
          }
        }
    
   @@ -226,11 +229,33 @@ public class Fate<T> {
       * Launches the specified number of worker threads.
       */
      public void startTransactionRunners(AccumuloConfiguration conf) {
   -    int numThreads = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
   -    executor = ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
   -    for (int i = 0; i < numThreads; i++) {
   -      executor.execute(new TransactionRunner());
   -    }
   +    final ThreadPoolExecutor pool = (ThreadPoolExecutor) ThreadPools.createExecutorService(conf,
   +        Property.MANAGER_FATE_THREADPOOL_SIZE);
   +    fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
   +    fatePoolWatcher.schedule(() -> {
   +      // resize the pool if the property changed
   +      ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
   +      // If the pool grew, then ensure that there is a TransactionRunner for each thread
   +      int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - pool.getQueue().size();
   +      if (needed > 0) {
   +        for (int i = 0; i < needed; i++) {
   +          try {
   +            pool.execute(new TransactionRunner());
   +          } catch (RejectedExecutionException e) {
   +            // RejectedExecutionException could be shutting down
   +            if (pool.isShutdown()) {
   +              // The exception is expected in this case, no need to spam the logs.
   +              log.trace("Error adding transaction runner to FaTE executor pool.", e);
   +            } else {
   +              // This is bad, FaTE may no longer work!
   +              log.error("Error adding transaction runner to FaTE executor pool.", e);
   +            }
   +            break;
   +          }
   +        }
   +      }
   +    }, 3, TimeUnit.SECONDS);
   +    executor = pool;
      }
    
      // get a transaction id back to the requester before doing any work
   @@ -325,6 +350,7 @@ public class Fate<T> {
       */
      public void shutdown() {
        keepRunning.set(false);
   +    fatePoolWatcher.shutdown();
        executor.shutdown();
      }
   ```
   
   I assume this is a follow-up fix for your fix in #2278 but it's hard to tell without a description or link to the earlier commit. If so, I think this revert is OK. What do you think, do these changes look correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org