You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2020/01/16 17:32:20 UTC

[accumulo] branch master updated (012d291 -> 66f296e)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git.


    from 012d291  Fix predicate for TabletFile in Gatherer
     add e01a86a  fixes #1456 by adding rate limiter to shutdown queue (#1457)
     new bf3d3d4  Merge branch 'accumulo-1456'
     new 66f296e  Merge branch 'master' of github.com:apache/accumulo

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/accumulo/master/Master.java    | 30 +++++++++++++++-------
 1 file changed, 21 insertions(+), 9 deletions(-)


[accumulo] 02/02: Merge branch 'master' of github.com:apache/accumulo

Posted by kt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 66f296e6b89d1ceb8a8a527e70dbe0e63ea84cb3
Merge: bf3d3d4 012d291
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 16 12:28:46 2020 -0500

    Merge branch 'master' of github.com:apache/accumulo

 .../main/java/org/apache/accumulo/core/summary/Gatherer.java   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)


[accumulo] 01/02: Merge branch 'accumulo-1456'

Posted by kt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit bf3d3d4c3cc12c4c1d016d7003dffc185a7dc057
Merge: 5007382 e01a86a
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 16 12:28:27 2020 -0500

    Merge branch 'accumulo-1456'

 .../java/org/apache/accumulo/master/Master.java    | 30 +++++++++++++++-------
 1 file changed, 21 insertions(+), 9 deletions(-)

diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 1041387,e3cd379..2ebc3cc
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -153,44 -162,48 +153,46 @@@ import org.apache.zookeeper.data.Stat
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import com.google.common.base.Optional;
  import com.google.common.collect.ImmutableSortedMap;
 -import com.google.common.collect.Iterables;
+ import com.google.common.util.concurrent.RateLimiter;
  
 +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 +
  /**
   * The Master is responsible for assigning and balancing tablets to tablet servers.
 - *
 + * <p>
   * The master will also coordinate log recoveries and reports general status.
   */
 -public class Master extends AccumuloServerContext
 -    implements LiveTServerSet.Listener, TableObserver, CurrentState {
 +public class Master extends AbstractServer
 +    implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService {
  
 -  final static Logger log = LoggerFactory.getLogger(Master.class);
 +  static final Logger log = LoggerFactory.getLogger(Master.class);
  
 -  final static int ONE_SECOND = 1000;
 -  final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
 +  static final int ONE_SECOND = 1000;
 +  static final long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
    // made this less than TIME_TO_WAIT_BETWEEN_SCANS, so that the cache is cleared between cycles
 -  final static long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = (3 * TIME_TO_WAIT_BETWEEN_SCANS) / 4;
 -  final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
 -  final static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 -  final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 -  final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
 -  final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
 -  final static int MAX_TSERVER_WORK_CHUNK = 5000;
 -  final private static int MAX_BAD_STATUS_COUNT = 3;
 -  final private static double MAX_SHUTDOWNS_PER_SEC = 10D / 60D;
 +  static final long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = (3 * TIME_TO_WAIT_BETWEEN_SCANS) / 4;
 +  private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
 +  static final long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 +  private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 +  private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
 +  private static final int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
 +  static final int MAX_TSERVER_WORK_CHUNK = 5000;
 +  private static final int MAX_BAD_STATUS_COUNT = 3;
++  private static final double MAX_SHUTDOWNS_PER_SEC = 10D / 60D;
  
    final VolumeManager fs;
 -  final private String hostname;
 -  final private Object balancedNotifier = new Object();
 +  private final Object balancedNotifier = new Object();
    final LiveTServerSet tserverSet;
 -  final private List<TabletGroupWatcher> watchers = new ArrayList<>();
 +  private final List<TabletGroupWatcher> watchers = new ArrayList<>();
    final SecurityOperation security;
 -  final Map<TServerInstance,AtomicInteger> badServers = Collections
 -      .synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger()));
 -  final Set<TServerInstance> serversToShutdown =
 -      Collections.synchronizedSet(new HashSet<TServerInstance>());
 +  final Map<TServerInstance,AtomicInteger> badServers =
 +      Collections.synchronizedMap(new HashMap<TServerInstance,AtomicInteger>());
 +  final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
    final SortedMap<KeyExtent,TServerInstance> migrations =
 -      Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
 +      Collections.synchronizedSortedMap(new TreeMap<>());
    final EventCoordinator nextEvent = new EventCoordinator();
 -  final private Object mergeLock = new Object();
 +  private final Object mergeLock = new Object();
    private ReplicationDriver replicationWorkDriver;
    private WorkDriver replicationWorkAssigner;
    RecoveryManager recoveryManager = null;
@@@ -918,45 -1183,47 +921,53 @@@
          // unresponsive tservers.
          sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS);
        }
 -      tp.submit(new Runnable() {
 -        @Override
 -        public void run() {
 +      tp.submit(() -> {
 +        try {
 +          Thread t = Thread.currentThread();
 +          String oldName = t.getName();
            try {
 -            Thread t = Thread.currentThread();
 -            String oldName = t.getName();
 -            try {
 -              t.setName("Getting status from " + server);
 -              TServerConnection connection = tserverSet.getConnection(server);
 -              if (connection == null)
 -                throw new IOException("No connection to " + server);
 -              TabletServerStatus status = connection.getTableMap(false);
 -              result.put(server, status);
 -            } finally {
 -              t.setName(oldName);
 +            String message = "Getting status from " + server;
 +            t.setName(message);
 +            long startForServer = System.currentTimeMillis();
 +            log.trace(message);
 +            TServerConnection connection1 = tserverSet.getConnection(server);
 +            if (connection1 == null) {
 +              throw new IOException("No connection to " + server);
              }
 -          } catch (Exception ex) {
 -            log.error("unable to get tablet server status " + server + " " + ex.toString());
 -            log.debug("unable to get tablet server status " + server, ex);
 -            // Attempt to shutdown server only if able to acquire. If unable, this tablet server
 -            // will be removed from the badServers set below and status will be reattempted again
 -            // MAX_BAD_STATUS_COUNT times
 -            if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
 -              if (shutdownServerRateLimiter.tryAcquire()) {
 -                log.warn("attempting to stop " + server);
 -                try {
 -                  TServerConnection connection = tserverSet.getConnection(server);
 -                  if (connection != null) {
 -                    connection.halt(masterLock);
 -                  }
 -                } catch (TTransportException e) {
 -                  // ignore: it's probably down
 -                } catch (Exception e) {
 -                  log.info("error talking to troublesome tablet server ", e);
 +            TabletServerStatus status = connection1.getTableMap(false);
 +            result.put(server, status);
 +
 +            long duration = System.currentTimeMillis() - startForServer;
 +            log.trace("Got status from {} in {} ms", server, duration);
 +
 +          } finally {
 +            t.setName(oldName);
 +          }
 +        } catch (Exception ex) {
 +          log.error("unable to get tablet server status {} {}", server, ex.toString());
 +          log.debug("unable to get tablet server status {}", server, ex);
++          // Attempt to shutdown server only if able to acquire. If unable, this tablet server
++          // will be removed from the badServers set below and status will be reattempted again
++          // MAX_BAD_STATUS_COUNT times
 +          if (badServers.computeIfAbsent(server, k -> new AtomicInteger(0)).incrementAndGet()
 +              > MAX_BAD_STATUS_COUNT) {
-             log.warn("attempting to stop {}", server);
-             try {
-               TServerConnection connection2 = tserverSet.getConnection(server);
-               if (connection2 != null) {
-                 connection2.halt(masterLock);
++            if (shutdownServerRateLimiter.tryAcquire()) {
++              log.warn("attempting to stop {}", server);
++              try {
++                TServerConnection connection2 = tserverSet.getConnection(server);
++                if (connection2 != null) {
++                  connection2.halt(masterLock);
+                 }
 -              } else {
 -                log.warn("Unable to shutdown {} as over the shutdown limit of {} per minute",
 -                    server, MAX_SHUTDOWNS_PER_SEC * 60);
++              } catch (TTransportException e1) {
++                // ignore: it's probably down
++              } catch (Exception e2) {
++                log.info("error talking to troublesome tablet server", e2);
                }
-             } catch (TTransportException e1) {
-               // ignore: it's probably down
-             } catch (Exception e2) {
-               log.info("error talking to troublesome tablet server", e2);
 -              badServers.remove(server);
++            } else {
++              log.warn("Unable to shutdown {} as over the shutdown limit of {} per minute", server,
++                  MAX_SHUTDOWNS_PER_SEC * 60);
              }
 +            badServers.remove(server);
            }
          }
        });
@@@ -1035,32 -1267,22 +1046,33 @@@
  
      tserverSet.startListeningForTabletServerChanges();
  
 -    ZooReaderWriter zReaderWriter = ZooReaderWriter.getInstance();
 +    try {
 +      blockForTservers();
 +    } catch (InterruptedException ex) {
 +      Thread.currentThread().interrupt();
 +    }
  
 -    zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
 -      @Override
 -      public void process(WatchedEvent event) {
 -        nextEvent.event("Noticed recovery changes", event.getType());
 -        try {
 -          // watcher only fires once, add it back
 -          ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, this);
 -        } catch (Exception e) {
 -          log.error("Failed to add log recovery watcher back", e);
 +    ZooReaderWriter zReaderWriter = context.getZooReaderWriter();
 +
 +    try {
 +      zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
 +        @Override
 +        public void process(WatchedEvent event) {
 +          nextEvent.event("Noticed recovery changes %s", event.getType());
 +          try {
 +            // watcher only fires once, add it back
 +            zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, this);
 +          } catch (Exception e) {
 +            log.error("Failed to add log recovery watcher back", e);
 +          }
          }
 -      }
 -    });
 +      });
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new IllegalStateException("Unable to read " + zroot + Constants.ZRECOVERY, e);
 +    }
 +
 +    watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(context, this), null) {
+ 
 -    watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(this, this), null) {
        @Override
        boolean canSuspendTablets() {
          // Always allow user data tablets to enter suspended state.