You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2020/11/02 15:14:22 UTC
[accumulo] branch main updated: Refactor TabletGroupWatcher (#1761)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 3f47da1 Refactor TabletGroupWatcher (#1761)
3f47da1 is described below
commit 3f47da1777f5d059d42d7706574ee3fd9882006b
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Nov 2 10:14:14 2020 -0500
Refactor TabletGroupWatcher (#1761)
* Create TabletLists to hold the many different data structures being
tracked in the run method of TabletGroupWatcher
* Create methods for some of the logic in the switch case
* Pass TabletLists to the flush method and break flush into methods
* Rename TabletLocationState.getServer() to getLocation() as location is
a more meaningful name instead of server
* Move markDeadServerLogsAsClosed from Master and make private
---
.../server/master/state/TabletLocationState.java | 2 +-
.../master/state/TabletLocationStateTest.java | 8 +-
.../java/org/apache/accumulo/master/Master.java | 13 -
.../apache/accumulo/master/TabletGroupWatcher.java | 279 ++++++++++++---------
4 files changed, 171 insertions(+), 131 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index c3a96aa..c2111ef 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -87,7 +87,7 @@ public class TabletLocationState {
return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : "");
}
- public TServerInstance getServer() {
+ public TServerInstance getLocation() {
TServerInstance result = null;
if (current != null) {
result = current;
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java
index 9b9d148..b31d138 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java
@@ -104,25 +104,25 @@ public class TabletLocationStateTest {
@Test
public void testGetServer_Current() throws Exception {
tls = new TabletLocationState(keyExtent, null, current, last, null, walogs, true);
- assertSame(current, tls.getServer());
+ assertSame(current, tls.getLocation());
}
@Test
public void testGetServer_Future() throws Exception {
tls = new TabletLocationState(keyExtent, future, null, last, null, walogs, true);
- assertSame(future, tls.getServer());
+ assertSame(future, tls.getLocation());
}
@Test
public void testGetServer_Last() throws Exception {
tls = new TabletLocationState(keyExtent, null, null, last, null, walogs, true);
- assertSame(last, tls.getServer());
+ assertSame(last, tls.getLocation());
}
@Test
public void testGetServer_None() throws Exception {
tls = new TabletLocationState(keyExtent, null, null, null, null, walogs, true);
- assertNull(tls.getServer());
+ assertNull(tls.getLocation());
}
@Test
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/Master.java b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
index bacc583..45449f7 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
@@ -100,8 +100,6 @@ import org.apache.accumulo.server.HighlyAvailableService;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.WalStateManager;
-import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.master.LiveTServerSet;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
@@ -135,7 +133,6 @@ import org.apache.accumulo.server.util.TableInfoUtil;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.start.classloader.vfs.ContextManager;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.thrift.TException;
@@ -1675,16 +1672,6 @@ public class Master extends AbstractServer
}
}
- public void markDeadServerLogsAsClosed(Map<TServerInstance,List<Path>> logsForDeadServers)
- throws WalMarkerException {
- WalStateManager mgr = new WalStateManager(getContext());
- for (Entry<TServerInstance,List<Path>> server : logsForDeadServers.entrySet()) {
- for (Path path : server.getValue()) {
- mgr.closeWal(server.getKey(), path);
- }
- }
- }
-
public void updateBulkImportStatus(String directory, BulkImportState state) {
bulkImportStatus.updateBulkImportStatus(Collections.singletonList(directory), state);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 5647554..d12eb67 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -134,6 +134,37 @@ abstract class TabletGroupWatcher extends Daemon {
return candidates.equals(lastScanServers);
}
+ /**
+ * Collection of data structures used to track Tablet assignments
+ */
+ private static class TabletLists {
+ private final List<Assignment> assignments = new ArrayList<>();
+ private final List<Assignment> assigned = new ArrayList<>();
+ private final List<TabletLocationState> assignedToDeadServers = new ArrayList<>();
+ private final List<TabletLocationState> suspendedToGoneServers = new ArrayList<>();
+ private final Map<KeyExtent,TServerInstance> unassigned = new HashMap<>();
+ private final Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>();
+ // read only lists of tablet servers
+ private final SortedMap<TServerInstance,TabletServerStatus> currentTServers;
+ private final SortedMap<TServerInstance,TabletServerStatus> destinations;
+
+ public TabletLists(Master m, SortedMap<TServerInstance,TabletServerStatus> curTServers) {
+ var destinationsMod = new TreeMap<>(curTServers);
+ // Don't move tablets to servers that are shutting down
+ destinationsMod.keySet().removeAll(m.serversToShutdown);
+ this.destinations = Collections.unmodifiableSortedMap(destinationsMod);
+ this.currentTServers = Collections.unmodifiableSortedMap(curTServers);
+ }
+
+ public void reset() {
+ assignments.clear();
+ assigned.clear();
+ assignedToDeadServers.clear();
+ suspendedToGoneServers.clear();
+ unassigned.clear();
+ }
+ }
+
@Override
public void run() {
Thread.currentThread().setName("Watching " + store.name());
@@ -172,16 +203,7 @@ abstract class TabletGroupWatcher extends Daemon {
continue;
}
- // Don't move tablets to servers that are shutting down
- SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<>(currentTServers);
- destinations.keySet().removeAll(master.serversToShutdown);
-
- List<Assignment> assignments = new ArrayList<>();
- List<Assignment> assigned = new ArrayList<>();
- List<TabletLocationState> assignedToDeadServers = new ArrayList<>();
- List<TabletLocationState> suspendedToGoneServers = new ArrayList<>();
- Map<KeyExtent,TServerInstance> unassigned = new HashMap<>();
- Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>();
+ TabletLists tLists = new TabletLists(master, currentTServers);
MasterState masterState = master.getMasterState();
int[] counts = new int[TabletState.values().length];
@@ -200,15 +222,10 @@ abstract class TabletGroupWatcher extends Daemon {
continue;
// Don't overwhelm the tablet servers with work
- if (unassigned.size() + unloaded
+ if (tLists.unassigned.size() + unloaded
> Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
- flushChanges(destinations, assignments, assigned, assignedToDeadServers,
- logsForDeadServers, suspendedToGoneServers, unassigned);
- assignments.clear();
- assigned.clear();
- assignedToDeadServers.clear();
- suspendedToGoneServers.clear();
- unassigned.clear();
+ flushChanges(tLists, wals);
+ tLists.reset();
unloaded = 0;
eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
}
@@ -220,7 +237,7 @@ abstract class TabletGroupWatcher extends Daemon {
return mStats != null ? mStats : new MergeStats(new MergeInfo());
});
TabletGoalState goal = master.getGoalState(tls, mergeStats.getMergeInfo());
- TServerInstance server = tls.getServer();
+ TServerInstance location = tls.getLocation();
TabletState state = tls.getState(currentTServers.keySet());
TabletLogger.missassigned(tls.extent, goal.toString(), state.toString(), tls.future,
@@ -252,90 +269,45 @@ abstract class TabletGroupWatcher extends Daemon {
}
switch (state) {
case HOSTED:
- if (server.equals(master.migrations.get(tls.extent)))
+ if (location.equals(master.migrations.get(tls.extent)))
master.migrations.remove(tls.extent);
break;
case ASSIGNED_TO_DEAD_SERVER:
- assignedToDeadServers.add(tls);
- if (server.equals(master.migrations.get(tls.extent)))
- master.migrations.remove(tls.extent);
- TServerInstance tserver = tls.futureOrCurrent();
- if (!logsForDeadServers.containsKey(tserver)) {
- logsForDeadServers.put(tserver, wals.getWalsInUse(tserver));
- }
+ hostDeadTablet(tLists, tls, location, wals);
break;
case SUSPENDED:
- if (master.getSteadyTime() - tls.suspend.suspensionTime
- < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) {
- // Tablet is suspended. See if its tablet server is back.
- TServerInstance returnInstance = null;
- Iterator<TServerInstance> find = destinations
- .tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator();
- if (find.hasNext()) {
- TServerInstance found = find.next();
- if (found.getLocation().equals(tls.suspend.server)) {
- returnInstance = found;
- }
- }
-
- // Old tablet server is back. Return this tablet to its previous owner.
- if (returnInstance != null) {
- assignments.add(new Assignment(tls.extent, returnInstance));
- }
- // else - tablet server not back. Don't ask for a new assignment right now.
-
- } else {
- // Treat as unassigned, ask for a new assignment.
- unassigned.put(tls.extent, server);
- }
+ hostSuspendedTablet(tLists, tls, location, tableConf);
break;
case UNASSIGNED:
- // maybe it's a finishing migration
- TServerInstance dest = master.migrations.get(tls.extent);
- if (dest != null) {
- // if destination is still good, assign it
- if (destinations.containsKey(dest)) {
- assignments.add(new Assignment(tls.extent, dest));
- } else {
- // get rid of this migration
- master.migrations.remove(tls.extent);
- unassigned.put(tls.extent, server);
- }
- } else {
- unassigned.put(tls.extent, server);
- }
+ hostUnassignedTablet(tLists, tls.extent, location);
break;
case ASSIGNED:
// Send another reminder
- assigned.add(new Assignment(tls.extent, tls.future));
+ tLists.assigned.add(new Assignment(tls.extent, tls.future));
break;
}
} else {
switch (state) {
case SUSPENDED:
// Request a move to UNASSIGNED, so as to allow balancing to continue.
- suspendedToGoneServers.add(tls);
- cancelOfflineTableMigrations(tls);
+ tLists.suspendedToGoneServers.add(tls);
+ cancelOfflineTableMigrations(tls.extent);
break;
case UNASSIGNED:
- cancelOfflineTableMigrations(tls);
+ cancelOfflineTableMigrations(tls.extent);
break;
case ASSIGNED_TO_DEAD_SERVER:
- assignedToDeadServers.add(tls);
- if (!logsForDeadServers.containsKey(tls.futureOrCurrent())) {
- logsForDeadServers.put(tls.futureOrCurrent(),
- wals.getWalsInUse(tls.futureOrCurrent()));
- }
+ unassignDeadTablet(tLists, tls, wals);
break;
case HOSTED:
- TServerConnection client = master.tserverSet.getConnection(server);
+ TServerConnection client = master.tserverSet.getConnection(location);
if (client != null) {
client.unloadTablet(master.masterLock, tls.extent, goal.howUnload(),
master.getSteadyTime());
unloaded++;
totalUnloaded++;
} else {
- Master.log.warn("Could not connect to server {}", server);
+ Master.log.warn("Could not connect to server {}", location);
}
break;
case ASSIGNED:
@@ -345,8 +317,7 @@ abstract class TabletGroupWatcher extends Daemon {
counts[state.ordinal()]++;
}
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers,
- suspendedToGoneServers, unassigned);
+ flushChanges(tLists, wals);
// provide stats after flushing changes to avoid race conditions w/ delete table
stats.end(masterState);
@@ -397,11 +368,76 @@ abstract class TabletGroupWatcher extends Daemon {
}
}
- private void cancelOfflineTableMigrations(TabletLocationState tls) {
- TServerInstance dest = master.migrations.get(tls.extent);
- TableState tableState = master.getTableManager().getTableState(tls.extent.tableId());
- if (dest != null && tableState == TableState.OFFLINE) {
+ private void unassignDeadTablet(TabletLists tLists, TabletLocationState tls, WalStateManager wals)
+ throws WalMarkerException {
+ tLists.assignedToDeadServers.add(tls);
+ if (!tLists.logsForDeadServers.containsKey(tls.futureOrCurrent())) {
+ tLists.logsForDeadServers.put(tls.futureOrCurrent(),
+ wals.getWalsInUse(tls.futureOrCurrent()));
+ }
+ }
+
+ private void hostUnassignedTablet(TabletLists tLists, KeyExtent tablet,
+ TServerInstance location) {
+ // maybe it's a finishing migration
+ TServerInstance dest = master.migrations.get(tablet);
+ if (dest != null) {
+ // if destination is still good, assign it
+ if (tLists.destinations.containsKey(dest)) {
+ tLists.assignments.add(new Assignment(tablet, dest));
+ } else {
+ // get rid of this migration
+ master.migrations.remove(tablet);
+ tLists.unassigned.put(tablet, location);
+ }
+ } else {
+ tLists.unassigned.put(tablet, location);
+ }
+ }
+
+ private void hostSuspendedTablet(TabletLists tLists, TabletLocationState tls,
+ TServerInstance location, TableConfiguration tableConf) {
+ if (master.getSteadyTime() - tls.suspend.suspensionTime
+ < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) {
+ // Tablet is suspended. See if its tablet server is back.
+ TServerInstance returnInstance = null;
+ Iterator<TServerInstance> find = tLists.destinations
+ .tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator();
+ if (find.hasNext()) {
+ TServerInstance found = find.next();
+ if (found.getLocation().equals(tls.suspend.server)) {
+ returnInstance = found;
+ }
+ }
+
+ // Old tablet server is back. Return this tablet to its previous owner.
+ if (returnInstance != null) {
+ tLists.assignments.add(new Assignment(tls.extent, returnInstance));
+ }
+ // else - tablet server not back. Don't ask for a new assignment right now.
+
+ } else {
+ // Treat as unassigned, ask for a new assignment.
+ tLists.unassigned.put(tls.extent, location);
+ }
+ }
+
+ private void hostDeadTablet(TabletLists tLists, TabletLocationState tls, TServerInstance location,
+ WalStateManager wals) throws WalMarkerException {
+ tLists.assignedToDeadServers.add(tls);
+ if (location.equals(master.migrations.get(tls.extent)))
master.migrations.remove(tls.extent);
+ TServerInstance tserver = tls.futureOrCurrent();
+ if (!tLists.logsForDeadServers.containsKey(tserver)) {
+ tLists.logsForDeadServers.put(tserver, wals.getWalsInUse(tserver));
+ }
+ }
+
+ private void cancelOfflineTableMigrations(KeyExtent extent) {
+ TServerInstance dest = master.migrations.get(extent);
+ TableState tableState = master.getTableManager().getTableState(extent.tableId());
+ if (dest != null && tableState == TableState.OFFLINE) {
+ master.migrations.remove(extent);
}
}
@@ -811,50 +847,50 @@ abstract class TabletGroupWatcher extends Daemon {
}
}
- private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers,
- List<Assignment> assignments, List<Assignment> assigned,
- List<TabletLocationState> assignedToDeadServers,
- Map<TServerInstance,List<Path>> logsForDeadServers,
- List<TabletLocationState> suspendedToGoneServers, Map<KeyExtent,TServerInstance> unassigned)
- throws DistributedStoreException, TException, WalMarkerException {
- boolean tabletsSuspendable = canSuspendTablets();
- if (!assignedToDeadServers.isEmpty()) {
- int maxServersToShow = min(assignedToDeadServers.size(), 100);
- Master.log.debug("{} assigned to dead servers: {}...", assignedToDeadServers.size(),
- assignedToDeadServers.subList(0, maxServersToShow));
- Master.log.debug("logs for dead servers: {}", logsForDeadServers);
- if (tabletsSuspendable) {
- store.suspend(assignedToDeadServers, logsForDeadServers, master.getSteadyTime());
+ private void handleDeadTablets(TabletLists tLists, WalStateManager wals)
+ throws WalMarkerException, DistributedStoreException {
+ var deadTablets = tLists.assignedToDeadServers;
+ var deadLogs = tLists.logsForDeadServers;
+
+ if (!deadTablets.isEmpty()) {
+ int maxServersToShow = min(deadTablets.size(), 100);
+ Master.log.debug("{} assigned to dead servers: {}...", deadTablets.size(),
+ deadTablets.subList(0, maxServersToShow));
+ Master.log.debug("logs for dead servers: {}", deadLogs);
+ if (canSuspendTablets()) {
+ store.suspend(deadTablets, deadLogs, master.getSteadyTime());
} else {
- store.unassign(assignedToDeadServers, logsForDeadServers);
+ store.unassign(deadTablets, deadLogs);
}
- master.markDeadServerLogsAsClosed(logsForDeadServers);
+ markDeadServerLogsAsClosed(wals, deadLogs);
master.nextEvent.event(
"Marked %d tablets as suspended because they don't have current servers",
- assignedToDeadServers.size());
+ deadTablets.size());
}
- if (!suspendedToGoneServers.isEmpty()) {
- int maxServersToShow = min(assignedToDeadServers.size(), 100);
- Master.log.debug(assignedToDeadServers.size() + " suspended to gone servers: "
- + assignedToDeadServers.subList(0, maxServersToShow) + "...");
- store.unsuspend(suspendedToGoneServers);
+ if (!tLists.suspendedToGoneServers.isEmpty()) {
+ int maxServersToShow = min(deadTablets.size(), 100);
+ Master.log.debug(deadTablets.size() + " suspended to gone servers: "
+ + deadTablets.subList(0, maxServersToShow) + "...");
+ store.unsuspend(tLists.suspendedToGoneServers);
}
+ }
- if (!currentTServers.isEmpty()) {
+ private void getAssignmentsFromBalancer(TabletLists tLists,
+ Map<KeyExtent,TServerInstance> unassigned) {
+ if (!tLists.currentTServers.isEmpty()) {
Map<KeyExtent,TServerInstance> assignedOut = new HashMap<>();
- master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers),
- Collections.unmodifiableMap(unassigned), assignedOut);
+ master.tabletBalancer.getAssignments(tLists.currentTServers, unassigned, assignedOut);
for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
if (unassigned.containsKey(assignment.getKey())) {
if (assignment.getValue() != null) {
- if (!currentTServers.containsKey(assignment.getValue())) {
+ if (!tLists.currentTServers.containsKey(assignment.getValue())) {
Master.log.warn(
"balancer assigned {} to a tablet server that is not current {} ignoring",
assignment.getKey(), assignment.getValue());
continue;
}
- assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
+ tLists.assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
}
} else {
Master.log.warn(
@@ -866,13 +902,22 @@ abstract class TabletGroupWatcher extends Daemon {
if (!unassigned.isEmpty() && assignedOut.isEmpty())
Master.log.warn("Load balancer failed to assign any tablets");
}
+ }
+
+ private void flushChanges(TabletLists tLists, WalStateManager wals)
+ throws DistributedStoreException, TException, WalMarkerException {
+ var unassigned = Collections.unmodifiableMap(tLists.unassigned);
- if (!assignments.isEmpty()) {
- Master.log.info(String.format("Assigning %d tablets", assignments.size()));
- store.setFutureLocations(assignments);
+ handleDeadTablets(tLists, wals);
+
+ getAssignmentsFromBalancer(tLists, unassigned);
+
+ if (!tLists.assignments.isEmpty()) {
+ Master.log.info(String.format("Assigning %d tablets", tLists.assignments.size()));
+ store.setFutureLocations(tLists.assignments);
}
- assignments.addAll(assigned);
- for (Assignment a : assignments) {
+ tLists.assignments.addAll(tLists.assigned);
+ for (Assignment a : tLists.assignments) {
TServerConnection client = master.tserverSet.getConnection(a.server);
if (client != null) {
client.assignTablet(master.masterLock, a.tablet);
@@ -883,4 +928,12 @@ abstract class TabletGroupWatcher extends Daemon {
}
}
+ private static void markDeadServerLogsAsClosed(WalStateManager mgr,
+ Map<TServerInstance,List<Path>> logsForDeadServers) throws WalMarkerException {
+ for (Entry<TServerInstance,List<Path>> server : logsForDeadServers.entrySet()) {
+ for (Path path : server.getValue()) {
+ mgr.closeWal(server.getKey(), path);
+ }
+ }
+ }
}