You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2020/11/02 15:14:22 UTC

[accumulo] branch main updated: Refactor TabletGroupWatcher (#1761)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f47da1  Refactor TabletGroupWatcher (#1761)
3f47da1 is described below

commit 3f47da1777f5d059d42d7706574ee3fd9882006b
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Nov 2 10:14:14 2020 -0500

    Refactor TabletGroupWatcher (#1761)
    
    * Create TabletLists to hold the many different data structures being
    tracked in the run method of TabletGroupWatcher
    * Create methods for some of the logic in the switch case
    * Pass TabletLists to the flush method and break flush into methods
    * Rename TabletLocationState.getServer() to getLocation() as location is
    a more meaningful name instead of server
    * Move markDeadServerLogsAsClosed from Master and make private
---
 .../server/master/state/TabletLocationState.java   |   2 +-
 .../master/state/TabletLocationStateTest.java      |   8 +-
 .../java/org/apache/accumulo/master/Master.java    |  13 -
 .../apache/accumulo/master/TabletGroupWatcher.java | 279 ++++++++++++---------
 4 files changed, 171 insertions(+), 131 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index c3a96aa..c2111ef 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -87,7 +87,7 @@ public class TabletLocationState {
     return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : "");
   }
 
-  public TServerInstance getServer() {
+  public TServerInstance getLocation() {
     TServerInstance result = null;
     if (current != null) {
       result = current;
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java
index 9b9d148..b31d138 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java
@@ -104,25 +104,25 @@ public class TabletLocationStateTest {
   @Test
   public void testGetServer_Current() throws Exception {
     tls = new TabletLocationState(keyExtent, null, current, last, null, walogs, true);
-    assertSame(current, tls.getServer());
+    assertSame(current, tls.getLocation());
   }
 
   @Test
   public void testGetServer_Future() throws Exception {
     tls = new TabletLocationState(keyExtent, future, null, last, null, walogs, true);
-    assertSame(future, tls.getServer());
+    assertSame(future, tls.getLocation());
   }
 
   @Test
   public void testGetServer_Last() throws Exception {
     tls = new TabletLocationState(keyExtent, null, null, last, null, walogs, true);
-    assertSame(last, tls.getServer());
+    assertSame(last, tls.getLocation());
   }
 
   @Test
   public void testGetServer_None() throws Exception {
     tls = new TabletLocationState(keyExtent, null, null, null, null, walogs, true);
-    assertNull(tls.getServer());
+    assertNull(tls.getLocation());
   }
 
   @Test
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/Master.java b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
index bacc583..45449f7 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
@@ -100,8 +100,6 @@ import org.apache.accumulo.server.HighlyAvailableService;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.WalStateManager;
-import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
@@ -135,7 +133,6 @@ import org.apache.accumulo.server.util.TableInfoUtil;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.accumulo.start.classloader.vfs.ContextManager;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.thrift.TException;
@@ -1675,16 +1672,6 @@ public class Master extends AbstractServer
     }
   }
 
-  public void markDeadServerLogsAsClosed(Map<TServerInstance,List<Path>> logsForDeadServers)
-      throws WalMarkerException {
-    WalStateManager mgr = new WalStateManager(getContext());
-    for (Entry<TServerInstance,List<Path>> server : logsForDeadServers.entrySet()) {
-      for (Path path : server.getValue()) {
-        mgr.closeWal(server.getKey(), path);
-      }
-    }
-  }
-
   public void updateBulkImportStatus(String directory, BulkImportState state) {
     bulkImportStatus.updateBulkImportStatus(Collections.singletonList(directory), state);
   }
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 5647554..d12eb67 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -134,6 +134,37 @@ abstract class TabletGroupWatcher extends Daemon {
     return candidates.equals(lastScanServers);
   }
 
+  /**
+   * Collection of data structures used to track Tablet assignments
+   */
+  private static class TabletLists {
+    private final List<Assignment> assignments = new ArrayList<>();
+    private final List<Assignment> assigned = new ArrayList<>();
+    private final List<TabletLocationState> assignedToDeadServers = new ArrayList<>();
+    private final List<TabletLocationState> suspendedToGoneServers = new ArrayList<>();
+    private final Map<KeyExtent,TServerInstance> unassigned = new HashMap<>();
+    private final Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>();
+    // read only lists of tablet servers
+    private final SortedMap<TServerInstance,TabletServerStatus> currentTServers;
+    private final SortedMap<TServerInstance,TabletServerStatus> destinations;
+
+    public TabletLists(Master m, SortedMap<TServerInstance,TabletServerStatus> curTServers) {
+      var destinationsMod = new TreeMap<>(curTServers);
+      // Don't move tablets to servers that are shutting down
+      destinationsMod.keySet().removeAll(m.serversToShutdown);
+      this.destinations = Collections.unmodifiableSortedMap(destinationsMod);
+      this.currentTServers = Collections.unmodifiableSortedMap(curTServers);
+    }
+
+    public void reset() {
+      assignments.clear();
+      assigned.clear();
+      assignedToDeadServers.clear();
+      suspendedToGoneServers.clear();
+      unassigned.clear();
+    }
+  }
+
   @Override
   public void run() {
     Thread.currentThread().setName("Watching " + store.name());
@@ -172,16 +203,7 @@ abstract class TabletGroupWatcher extends Daemon {
           continue;
         }
 
-        // Don't move tablets to servers that are shutting down
-        SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<>(currentTServers);
-        destinations.keySet().removeAll(master.serversToShutdown);
-
-        List<Assignment> assignments = new ArrayList<>();
-        List<Assignment> assigned = new ArrayList<>();
-        List<TabletLocationState> assignedToDeadServers = new ArrayList<>();
-        List<TabletLocationState> suspendedToGoneServers = new ArrayList<>();
-        Map<KeyExtent,TServerInstance> unassigned = new HashMap<>();
-        Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>();
+        TabletLists tLists = new TabletLists(master, currentTServers);
 
         MasterState masterState = master.getMasterState();
         int[] counts = new int[TabletState.values().length];
@@ -200,15 +222,10 @@ abstract class TabletGroupWatcher extends Daemon {
             continue;
 
           // Don't overwhelm the tablet servers with work
-          if (unassigned.size() + unloaded
+          if (tLists.unassigned.size() + unloaded
               > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
-            flushChanges(destinations, assignments, assigned, assignedToDeadServers,
-                logsForDeadServers, suspendedToGoneServers, unassigned);
-            assignments.clear();
-            assigned.clear();
-            assignedToDeadServers.clear();
-            suspendedToGoneServers.clear();
-            unassigned.clear();
+            flushChanges(tLists, wals);
+            tLists.reset();
             unloaded = 0;
             eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
           }
@@ -220,7 +237,7 @@ abstract class TabletGroupWatcher extends Daemon {
             return mStats != null ? mStats : new MergeStats(new MergeInfo());
           });
           TabletGoalState goal = master.getGoalState(tls, mergeStats.getMergeInfo());
-          TServerInstance server = tls.getServer();
+          TServerInstance location = tls.getLocation();
           TabletState state = tls.getState(currentTServers.keySet());
 
           TabletLogger.missassigned(tls.extent, goal.toString(), state.toString(), tls.future,
@@ -252,90 +269,45 @@ abstract class TabletGroupWatcher extends Daemon {
             }
             switch (state) {
               case HOSTED:
-                if (server.equals(master.migrations.get(tls.extent)))
+                if (location.equals(master.migrations.get(tls.extent)))
                   master.migrations.remove(tls.extent);
                 break;
               case ASSIGNED_TO_DEAD_SERVER:
-                assignedToDeadServers.add(tls);
-                if (server.equals(master.migrations.get(tls.extent)))
-                  master.migrations.remove(tls.extent);
-                TServerInstance tserver = tls.futureOrCurrent();
-                if (!logsForDeadServers.containsKey(tserver)) {
-                  logsForDeadServers.put(tserver, wals.getWalsInUse(tserver));
-                }
+                hostDeadTablet(tLists, tls, location, wals);
                 break;
               case SUSPENDED:
-                if (master.getSteadyTime() - tls.suspend.suspensionTime
-                    < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) {
-                  // Tablet is suspended. See if its tablet server is back.
-                  TServerInstance returnInstance = null;
-                  Iterator<TServerInstance> find = destinations
-                      .tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator();
-                  if (find.hasNext()) {
-                    TServerInstance found = find.next();
-                    if (found.getLocation().equals(tls.suspend.server)) {
-                      returnInstance = found;
-                    }
-                  }
-
-                  // Old tablet server is back. Return this tablet to its previous owner.
-                  if (returnInstance != null) {
-                    assignments.add(new Assignment(tls.extent, returnInstance));
-                  }
-                  // else - tablet server not back. Don't ask for a new assignment right now.
-
-                } else {
-                  // Treat as unassigned, ask for a new assignment.
-                  unassigned.put(tls.extent, server);
-                }
+                hostSuspendedTablet(tLists, tls, location, tableConf);
                 break;
               case UNASSIGNED:
-                // maybe it's a finishing migration
-                TServerInstance dest = master.migrations.get(tls.extent);
-                if (dest != null) {
-                  // if destination is still good, assign it
-                  if (destinations.containsKey(dest)) {
-                    assignments.add(new Assignment(tls.extent, dest));
-                  } else {
-                    // get rid of this migration
-                    master.migrations.remove(tls.extent);
-                    unassigned.put(tls.extent, server);
-                  }
-                } else {
-                  unassigned.put(tls.extent, server);
-                }
+                hostUnassignedTablet(tLists, tls.extent, location);
                 break;
               case ASSIGNED:
                 // Send another reminder
-                assigned.add(new Assignment(tls.extent, tls.future));
+                tLists.assigned.add(new Assignment(tls.extent, tls.future));
                 break;
             }
           } else {
             switch (state) {
               case SUSPENDED:
                 // Request a move to UNASSIGNED, so as to allow balancing to continue.
-                suspendedToGoneServers.add(tls);
-                cancelOfflineTableMigrations(tls);
+                tLists.suspendedToGoneServers.add(tls);
+                cancelOfflineTableMigrations(tls.extent);
                 break;
               case UNASSIGNED:
-                cancelOfflineTableMigrations(tls);
+                cancelOfflineTableMigrations(tls.extent);
                 break;
               case ASSIGNED_TO_DEAD_SERVER:
-                assignedToDeadServers.add(tls);
-                if (!logsForDeadServers.containsKey(tls.futureOrCurrent())) {
-                  logsForDeadServers.put(tls.futureOrCurrent(),
-                      wals.getWalsInUse(tls.futureOrCurrent()));
-                }
+                unassignDeadTablet(tLists, tls, wals);
                 break;
               case HOSTED:
-                TServerConnection client = master.tserverSet.getConnection(server);
+                TServerConnection client = master.tserverSet.getConnection(location);
                 if (client != null) {
                   client.unloadTablet(master.masterLock, tls.extent, goal.howUnload(),
                       master.getSteadyTime());
                   unloaded++;
                   totalUnloaded++;
                 } else {
-                  Master.log.warn("Could not connect to server {}", server);
+                  Master.log.warn("Could not connect to server {}", location);
                 }
                 break;
               case ASSIGNED:
@@ -345,8 +317,7 @@ abstract class TabletGroupWatcher extends Daemon {
           counts[state.ordinal()]++;
         }
 
-        flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers,
-            suspendedToGoneServers, unassigned);
+        flushChanges(tLists, wals);
 
         // provide stats after flushing changes to avoid race conditions w/ delete table
         stats.end(masterState);
@@ -397,11 +368,76 @@ abstract class TabletGroupWatcher extends Daemon {
     }
   }
 
-  private void cancelOfflineTableMigrations(TabletLocationState tls) {
-    TServerInstance dest = master.migrations.get(tls.extent);
-    TableState tableState = master.getTableManager().getTableState(tls.extent.tableId());
-    if (dest != null && tableState == TableState.OFFLINE) {
+  private void unassignDeadTablet(TabletLists tLists, TabletLocationState tls, WalStateManager wals)
+      throws WalMarkerException {
+    tLists.assignedToDeadServers.add(tls);
+    if (!tLists.logsForDeadServers.containsKey(tls.futureOrCurrent())) {
+      tLists.logsForDeadServers.put(tls.futureOrCurrent(),
+          wals.getWalsInUse(tls.futureOrCurrent()));
+    }
+  }
+
+  private void hostUnassignedTablet(TabletLists tLists, KeyExtent tablet,
+      TServerInstance location) {
+    // maybe it's a finishing migration
+    TServerInstance dest = master.migrations.get(tablet);
+    if (dest != null) {
+      // if destination is still good, assign it
+      if (tLists.destinations.containsKey(dest)) {
+        tLists.assignments.add(new Assignment(tablet, dest));
+      } else {
+        // get rid of this migration
+        master.migrations.remove(tablet);
+        tLists.unassigned.put(tablet, location);
+      }
+    } else {
+      tLists.unassigned.put(tablet, location);
+    }
+  }
+
+  private void hostSuspendedTablet(TabletLists tLists, TabletLocationState tls,
+      TServerInstance location, TableConfiguration tableConf) {
+    if (master.getSteadyTime() - tls.suspend.suspensionTime
+        < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) {
+      // Tablet is suspended. See if its tablet server is back.
+      TServerInstance returnInstance = null;
+      Iterator<TServerInstance> find = tLists.destinations
+          .tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator();
+      if (find.hasNext()) {
+        TServerInstance found = find.next();
+        if (found.getLocation().equals(tls.suspend.server)) {
+          returnInstance = found;
+        }
+      }
+
+      // Old tablet server is back. Return this tablet to its previous owner.
+      if (returnInstance != null) {
+        tLists.assignments.add(new Assignment(tls.extent, returnInstance));
+      }
+      // else - tablet server not back. Don't ask for a new assignment right now.
+
+    } else {
+      // Treat as unassigned, ask for a new assignment.
+      tLists.unassigned.put(tls.extent, location);
+    }
+  }
+
+  private void hostDeadTablet(TabletLists tLists, TabletLocationState tls, TServerInstance location,
+      WalStateManager wals) throws WalMarkerException {
+    tLists.assignedToDeadServers.add(tls);
+    if (location.equals(master.migrations.get(tls.extent)))
       master.migrations.remove(tls.extent);
+    TServerInstance tserver = tls.futureOrCurrent();
+    if (!tLists.logsForDeadServers.containsKey(tserver)) {
+      tLists.logsForDeadServers.put(tserver, wals.getWalsInUse(tserver));
+    }
+  }
+
+  private void cancelOfflineTableMigrations(KeyExtent extent) {
+    TServerInstance dest = master.migrations.get(extent);
+    TableState tableState = master.getTableManager().getTableState(extent.tableId());
+    if (dest != null && tableState == TableState.OFFLINE) {
+      master.migrations.remove(extent);
     }
   }
 
@@ -811,50 +847,50 @@ abstract class TabletGroupWatcher extends Daemon {
     }
   }
 
-  private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers,
-      List<Assignment> assignments, List<Assignment> assigned,
-      List<TabletLocationState> assignedToDeadServers,
-      Map<TServerInstance,List<Path>> logsForDeadServers,
-      List<TabletLocationState> suspendedToGoneServers, Map<KeyExtent,TServerInstance> unassigned)
-      throws DistributedStoreException, TException, WalMarkerException {
-    boolean tabletsSuspendable = canSuspendTablets();
-    if (!assignedToDeadServers.isEmpty()) {
-      int maxServersToShow = min(assignedToDeadServers.size(), 100);
-      Master.log.debug("{} assigned to dead servers: {}...", assignedToDeadServers.size(),
-          assignedToDeadServers.subList(0, maxServersToShow));
-      Master.log.debug("logs for dead servers: {}", logsForDeadServers);
-      if (tabletsSuspendable) {
-        store.suspend(assignedToDeadServers, logsForDeadServers, master.getSteadyTime());
+  private void handleDeadTablets(TabletLists tLists, WalStateManager wals)
+      throws WalMarkerException, DistributedStoreException {
+    var deadTablets = tLists.assignedToDeadServers;
+    var deadLogs = tLists.logsForDeadServers;
+
+    if (!deadTablets.isEmpty()) {
+      int maxServersToShow = min(deadTablets.size(), 100);
+      Master.log.debug("{} assigned to dead servers: {}...", deadTablets.size(),
+          deadTablets.subList(0, maxServersToShow));
+      Master.log.debug("logs for dead servers: {}", deadLogs);
+      if (canSuspendTablets()) {
+        store.suspend(deadTablets, deadLogs, master.getSteadyTime());
       } else {
-        store.unassign(assignedToDeadServers, logsForDeadServers);
+        store.unassign(deadTablets, deadLogs);
       }
-      master.markDeadServerLogsAsClosed(logsForDeadServers);
+      markDeadServerLogsAsClosed(wals, deadLogs);
       master.nextEvent.event(
           "Marked %d tablets as suspended because they don't have current servers",
-          assignedToDeadServers.size());
+          deadTablets.size());
     }
-    if (!suspendedToGoneServers.isEmpty()) {
-      int maxServersToShow = min(assignedToDeadServers.size(), 100);
-      Master.log.debug(assignedToDeadServers.size() + " suspended to gone servers: "
-          + assignedToDeadServers.subList(0, maxServersToShow) + "...");
-      store.unsuspend(suspendedToGoneServers);
+    if (!tLists.suspendedToGoneServers.isEmpty()) {
+      int maxServersToShow = min(deadTablets.size(), 100);
+      Master.log.debug(deadTablets.size() + " suspended to gone servers: "
+          + deadTablets.subList(0, maxServersToShow) + "...");
+      store.unsuspend(tLists.suspendedToGoneServers);
     }
+  }
 
-    if (!currentTServers.isEmpty()) {
+  private void getAssignmentsFromBalancer(TabletLists tLists,
+      Map<KeyExtent,TServerInstance> unassigned) {
+    if (!tLists.currentTServers.isEmpty()) {
       Map<KeyExtent,TServerInstance> assignedOut = new HashMap<>();
-      master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers),
-          Collections.unmodifiableMap(unassigned), assignedOut);
+      master.tabletBalancer.getAssignments(tLists.currentTServers, unassigned, assignedOut);
       for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
         if (unassigned.containsKey(assignment.getKey())) {
           if (assignment.getValue() != null) {
-            if (!currentTServers.containsKey(assignment.getValue())) {
+            if (!tLists.currentTServers.containsKey(assignment.getValue())) {
               Master.log.warn(
                   "balancer assigned {} to a tablet server that is not current {} ignoring",
                   assignment.getKey(), assignment.getValue());
               continue;
             }
 
-            assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
+            tLists.assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
           }
         } else {
           Master.log.warn(
@@ -866,13 +902,22 @@ abstract class TabletGroupWatcher extends Daemon {
       if (!unassigned.isEmpty() && assignedOut.isEmpty())
         Master.log.warn("Load balancer failed to assign any tablets");
     }
+  }
+
+  private void flushChanges(TabletLists tLists, WalStateManager wals)
+      throws DistributedStoreException, TException, WalMarkerException {
+    var unassigned = Collections.unmodifiableMap(tLists.unassigned);
 
-    if (!assignments.isEmpty()) {
-      Master.log.info(String.format("Assigning %d tablets", assignments.size()));
-      store.setFutureLocations(assignments);
+    handleDeadTablets(tLists, wals);
+
+    getAssignmentsFromBalancer(tLists, unassigned);
+
+    if (!tLists.assignments.isEmpty()) {
+      Master.log.info(String.format("Assigning %d tablets", tLists.assignments.size()));
+      store.setFutureLocations(tLists.assignments);
     }
-    assignments.addAll(assigned);
-    for (Assignment a : assignments) {
+    tLists.assignments.addAll(tLists.assigned);
+    for (Assignment a : tLists.assignments) {
       TServerConnection client = master.tserverSet.getConnection(a.server);
       if (client != null) {
         client.assignTablet(master.masterLock, a.tablet);
@@ -883,4 +928,12 @@ abstract class TabletGroupWatcher extends Daemon {
     }
   }
 
+  private static void markDeadServerLogsAsClosed(WalStateManager mgr,
+      Map<TServerInstance,List<Path>> logsForDeadServers) throws WalMarkerException {
+    for (Entry<TServerInstance,List<Path>> server : logsForDeadServers.entrySet()) {
+      for (Path path : server.getValue()) {
+        mgr.closeWal(server.getKey(), path);
+      }
+    }
+  }
 }