You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2020/01/02 20:37:01 UTC

[accumulo] 01/02: Merge branch '1.9'

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 431e168ff28b99e8201b68158c381a71db7d6762
Merge: ad10737 828a321
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 2 15:14:47 2020 -0500

    Merge branch '1.9'

 .../java/org/apache/accumulo/master/Master.java    |  4 ++-
 .../accumulo/master/recovery/RecoveryManager.java  | 33 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 3 deletions(-)

diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 40faf68,7332998..1041387
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -159,36 -168,40 +159,38 @@@ import edu.umd.cs.findbugs.annotations.
  
  /**
   * The Master is responsible for assigning and balancing tablets to tablet servers.
 - *
 + * <p>
   * The master will also coordinate log recoveries and reports general status.
   */
 -public class Master extends AccumuloServerContext
 -    implements LiveTServerSet.Listener, TableObserver, CurrentState {
 +public class Master extends AbstractServer
 +    implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService {
  
 -  final static Logger log = LoggerFactory.getLogger(Master.class);
 +  static final Logger log = LoggerFactory.getLogger(Master.class);
  
 -  final static int ONE_SECOND = 1000;
 -  final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
 +  static final int ONE_SECOND = 1000;
 +  static final long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
+   // made this less than TIME_TO_WAIT_BETWEEN_SCANS, so that the cache is cleared between cycles
 -  final static long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = (3 * TIME_TO_WAIT_BETWEEN_SCANS) / 4;
 -  final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
 -  final static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 -  final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 -  final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
 -  final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
 -  final static int MAX_TSERVER_WORK_CHUNK = 5000;
 -  final private static int MAX_BAD_STATUS_COUNT = 3;
++  static final long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = (3 * TIME_TO_WAIT_BETWEEN_SCANS) / 4;
 +  private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
 +  static final long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 +  private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 +  private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
 +  private static final int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
 +  static final int MAX_TSERVER_WORK_CHUNK = 5000;
 +  private static final int MAX_BAD_STATUS_COUNT = 3;
  
    final VolumeManager fs;
 -  final private String hostname;
 -  final private Object balancedNotifier = new Object();
 +  private final Object balancedNotifier = new Object();
    final LiveTServerSet tserverSet;
 -  final private List<TabletGroupWatcher> watchers = new ArrayList<>();
 +  private final List<TabletGroupWatcher> watchers = new ArrayList<>();
    final SecurityOperation security;
 -  final Map<TServerInstance,AtomicInteger> badServers = Collections
 -      .synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger()));
 -  final Set<TServerInstance> serversToShutdown =
 -      Collections.synchronizedSet(new HashSet<TServerInstance>());
 +  final Map<TServerInstance,AtomicInteger> badServers =
 +      Collections.synchronizedMap(new HashMap<TServerInstance,AtomicInteger>());
 +  final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
    final SortedMap<KeyExtent,TServerInstance> migrations =
 -      Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
 +      Collections.synchronizedSortedMap(new TreeMap<>());
    final EventCoordinator nextEvent = new EventCoordinator();
 -  final private Object mergeLock = new Object();
 +  private final Object mergeLock = new Object();
    private ReplicationDriver replicationWorkDriver;
    private WorkDriver replicationWorkAssigner;
    RecoveryManager recoveryManager = null;
@@@ -981,49 -1239,14 +983,49 @@@
      return info;
    }
  
 -  public void run() throws IOException, InterruptedException, KeeperException {
 -    final String zroot = ZooUtil.getRoot(getInstance());
 +  @Override
 +  public void run() {
 +    final ServerContext context = getContext();
 +    final String zroot = getZooKeeperRoot();
 +
 +    // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health
 +    // when a hot-standby
 +    //
 +    // Start the Master's Client service
 +    clientHandler = new MasterClientServiceHandler(this);
 +    // Ensure that calls before the master gets the lock fail
 +    Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this);
 +    Iface rpcProxy = TraceUtil.wrapService(haProxy);
 +    final Processor<Iface> processor;
 +    if (context.getThriftServerType() == ThriftServerType.SASL) {
 +      Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(),
 +          getConfiguration());
 +      processor = new Processor<>(tcredsProxy);
 +    } else {
 +      processor = new Processor<>(rpcProxy);
 +    }
 +    ServerAddress sa;
 +    try {
 +      sa = TServerUtils.startServer(getMetricsSystem(), context, getHostname(),
 +          Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null,
 +          Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
 +          Property.GENERAL_MAX_MESSAGE_SIZE);
 +    } catch (UnknownHostException e) {
 +      throw new IllegalStateException("Unable to start server on host " + getHostname(), e);
 +    }
 +    clientService = sa.server;
 +    log.info("Started Master client service at {}", sa.address);
  
 -    getMasterLock(zroot + Constants.ZMASTER_LOCK);
 +    // block until we can obtain the ZK lock for the master
 +    try {
 +      getMasterLock(zroot + Constants.ZMASTER_LOCK);
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new IllegalStateException("Exception getting master lock", e);
 +    }
  
-     recoveryManager = new RecoveryManager(this);
+     recoveryManager = new RecoveryManager(this, TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE);
  
 -    TableManager.getInstance().addObserver(this);
 +    context.getTableManager().addObserver(this);
  
      StatusThread statusThread = new StatusThread();
      statusThread.start();
diff --cc server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index e5fef4e,bfc98aa..f380778
--- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@@ -63,14 -69,24 +70,23 @@@ public class RecoveryManager 
    private Master master;
    private ZooCache zooCache;
  
-   public RecoveryManager(Master master) {
+   public RecoveryManager(Master master, long timeToCacheExistsInMillis) {
      this.master = master;
+     existenceCache =
+         CacheBuilder.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS)
+             .maximumWeight(10_000_000).weigher(new Weigher<Path,Boolean>() {
+               @Override
+               public int weigh(Path path, Boolean exist) {
+                 return path.toString().length();
+               }
+             }).build();
+ 
      executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
 -    zooCache = new ZooCache();
 +    zooCache = new ZooCache(master.getContext().getZooReaderWriter(), null);
      try {
 -      AccumuloConfiguration aconf = master.getConfiguration();
        List<String> workIDs =
 -          new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY,
 -              aconf).getWorkQueued();
 +          new DistributedWorkQueue(master.getZooKeeperRoot() + Constants.ZRECOVERY,
 +              master.getConfiguration()).getWorkQueued();
        sortsQueued.addAll(workIDs);
      } catch (Exception e) {
        log.warn("{}", e.getMessage(), e);
@@@ -128,10 -144,23 +144,23 @@@
        sortsQueued.add(sortId);
      }
  
 -    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
 -    log.info("Created zookeeper entry " + path + " with data " + work);
 +    final String path = master.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId;
 +    log.info("Created zookeeper entry {} with data {}", path, work);
    }
  
+   private boolean exists(final Path path) throws IOException {
+     try {
+       return existenceCache.get(path, new Callable<Boolean>() {
+         @Override
+         public Boolean call() throws Exception {
+           return master.getFileSystem().exists(path);
+         }
+       });
+     } catch (ExecutionException e) {
+       throw new IOException(e);
+     }
+   }
+ 
    public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs)
        throws IOException {
      boolean recoveryNeeded = false;