You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2020/01/02 20:37:01 UTC
[accumulo] 01/02: Merge branch '1.9'
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 431e168ff28b99e8201b68158c381a71db7d6762
Merge: ad10737 828a321
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 2 15:14:47 2020 -0500
Merge branch '1.9'
.../java/org/apache/accumulo/master/Master.java | 4 ++-
.../accumulo/master/recovery/RecoveryManager.java | 33 ++++++++++++++++++++--
2 files changed, 34 insertions(+), 3 deletions(-)
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 40faf68,7332998..1041387
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -159,36 -168,40 +159,38 @@@ import edu.umd.cs.findbugs.annotations.
/**
* The Master is responsible for assigning and balancing tablets to tablet servers.
- *
+ * <p>
* The master will also coordinate log recoveries and reports general status.
*/
-public class Master extends AccumuloServerContext
- implements LiveTServerSet.Listener, TableObserver, CurrentState {
+public class Master extends AbstractServer
+ implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService {
- final static Logger log = LoggerFactory.getLogger(Master.class);
+ static final Logger log = LoggerFactory.getLogger(Master.class);
- final static int ONE_SECOND = 1000;
- final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
+ static final int ONE_SECOND = 1000;
+ static final long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
+ // made this less than TIME_TO_WAIT_BETWEEN_SCANS, so that the cache is cleared between cycles
- final static long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = (3 * TIME_TO_WAIT_BETWEEN_SCANS) / 4;
- final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
- final static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
- final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
- final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
- final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
- final static int MAX_TSERVER_WORK_CHUNK = 5000;
- final private static int MAX_BAD_STATUS_COUNT = 3;
++ static final long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = (3 * TIME_TO_WAIT_BETWEEN_SCANS) / 4;
+ private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
+ static final long WAIT_BETWEEN_ERRORS = ONE_SECOND;
+ private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
+ private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
+ private static final int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
+ static final int MAX_TSERVER_WORK_CHUNK = 5000;
+ private static final int MAX_BAD_STATUS_COUNT = 3;
final VolumeManager fs;
- final private String hostname;
- final private Object balancedNotifier = new Object();
+ private final Object balancedNotifier = new Object();
final LiveTServerSet tserverSet;
- final private List<TabletGroupWatcher> watchers = new ArrayList<>();
+ private final List<TabletGroupWatcher> watchers = new ArrayList<>();
final SecurityOperation security;
- final Map<TServerInstance,AtomicInteger> badServers = Collections
- .synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger()));
- final Set<TServerInstance> serversToShutdown =
- Collections.synchronizedSet(new HashSet<TServerInstance>());
+ final Map<TServerInstance,AtomicInteger> badServers =
+ Collections.synchronizedMap(new HashMap<TServerInstance,AtomicInteger>());
+ final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final SortedMap<KeyExtent,TServerInstance> migrations =
- Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
+ Collections.synchronizedSortedMap(new TreeMap<>());
final EventCoordinator nextEvent = new EventCoordinator();
- final private Object mergeLock = new Object();
+ private final Object mergeLock = new Object();
private ReplicationDriver replicationWorkDriver;
private WorkDriver replicationWorkAssigner;
RecoveryManager recoveryManager = null;
@@@ -981,49 -1239,14 +983,49 @@@
return info;
}
- public void run() throws IOException, InterruptedException, KeeperException {
- final String zroot = ZooUtil.getRoot(getInstance());
+ @Override
+ public void run() {
+ final ServerContext context = getContext();
+ final String zroot = getZooKeeperRoot();
+
+ // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health
+ // when a hot-standby
+ //
+ // Start the Master's Client service
+ clientHandler = new MasterClientServiceHandler(this);
+ // Ensure that calls before the master gets the lock fail
+ Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this);
+ Iface rpcProxy = TraceUtil.wrapService(haProxy);
+ final Processor<Iface> processor;
+ if (context.getThriftServerType() == ThriftServerType.SASL) {
+ Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(),
+ getConfiguration());
+ processor = new Processor<>(tcredsProxy);
+ } else {
+ processor = new Processor<>(rpcProxy);
+ }
+ ServerAddress sa;
+ try {
+ sa = TServerUtils.startServer(getMetricsSystem(), context, getHostname(),
+ Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null,
+ Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
+ Property.GENERAL_MAX_MESSAGE_SIZE);
+ } catch (UnknownHostException e) {
+ throw new IllegalStateException("Unable to start server on host " + getHostname(), e);
+ }
+ clientService = sa.server;
+ log.info("Started Master client service at {}", sa.address);
- getMasterLock(zroot + Constants.ZMASTER_LOCK);
+ // block until we can obtain the ZK lock for the master
+ try {
+ getMasterLock(zroot + Constants.ZMASTER_LOCK);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Exception getting master lock", e);
+ }
- recoveryManager = new RecoveryManager(this);
+ recoveryManager = new RecoveryManager(this, TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE);
- TableManager.getInstance().addObserver(this);
+ context.getTableManager().addObserver(this);
StatusThread statusThread = new StatusThread();
statusThread.start();
diff --cc server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index e5fef4e,bfc98aa..f380778
--- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@@ -63,14 -69,24 +70,23 @@@ public class RecoveryManager
private Master master;
private ZooCache zooCache;
- public RecoveryManager(Master master) {
+ public RecoveryManager(Master master, long timeToCacheExistsInMillis) {
this.master = master;
+ existenceCache =
+ CacheBuilder.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS)
+ .maximumWeight(10_000_000).weigher(new Weigher<Path,Boolean>() {
+ @Override
+ public int weigh(Path path, Boolean exist) {
+ return path.toString().length();
+ }
+ }).build();
+
executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
- zooCache = new ZooCache();
+ zooCache = new ZooCache(master.getContext().getZooReaderWriter(), null);
try {
- AccumuloConfiguration aconf = master.getConfiguration();
List<String> workIDs =
- new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY,
- aconf).getWorkQueued();
+ new DistributedWorkQueue(master.getZooKeeperRoot() + Constants.ZRECOVERY,
+ master.getConfiguration()).getWorkQueued();
sortsQueued.addAll(workIDs);
} catch (Exception e) {
log.warn("{}", e.getMessage(), e);
@@@ -128,10 -144,23 +144,23 @@@
sortsQueued.add(sortId);
}
- final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
- log.info("Created zookeeper entry " + path + " with data " + work);
+ final String path = master.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId;
+ log.info("Created zookeeper entry {} with data {}", path, work);
}
+ private boolean exists(final Path path) throws IOException {
+ try {
+ return existenceCache.get(path, new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return master.getFileSystem().exists(path);
+ }
+ });
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+
public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs)
throws IOException {
boolean recoveryNeeded = false;