You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/08/03 09:36:06 UTC

[3/9] git commit: ACCUMULO-2694 Fix handling of tablet migrations for offline tables.

ACCUMULO-2694 Fix handling of tablet migrations for offline tables.

* Adds a funtional test that fails due to not rebalancing
* Fix master to clear migrations when it learns that a table has gone offline
* Update master to periodically clean up migrations for offline tables
* Fix balancers to make sure they log if they can't balance.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6bbe1216
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6bbe1216
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6bbe1216

Branch: refs/heads/master
Commit: 6bbe12165d81067651cc2d8c1c545fb31580d1a3
Parents: f848178
Author: Sean Busbey <bu...@cloudera.com>
Authored: Fri Apr 18 16:44:54 2014 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Aug 1 02:46:55 2014 -0500

----------------------------------------------------------------------
 server/pom.xml                                  |  4 +
 .../apache/accumulo/server/master/Master.java   | 34 +++++--
 .../master/balancer/ChaoticLoadBalancer.java    | 10 +-
 .../master/balancer/DefaultLoadBalancer.java    | 12 ++-
 .../server/master/balancer/TabletBalancer.java  | 91 ++++++++++++++++++
 test/system/auto/stress/migrations.py           | 98 ++++++++++++++++++++
 6 files changed, 240 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index bd61fe6..fb526af 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -34,6 +34,10 @@
       <artifactId>gson</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
       <groupId>jline</groupId>
       <artifactId>jline</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index a2ad2e6..12f8fed 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -1924,7 +1924,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
       while (stillMaster()) {
         if (!migrations.isEmpty()) {
           try {
-            cleanupMutations();
+            cleanupOfflineMigrations();
+            cleanupNonexistentMigrations(getConnector());
           } catch (Exception ex) {
             log.error("Error cleaning up migrations", ex);
           }
@@ -1933,12 +1934,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
       }
     }
     
-    // If a migrating tablet splits, and the tablet dies before sending the
-    // master a message, the migration will refer to a non-existing tablet,
-    // so it can never complete. Periodically scan the metadata table and
-    // remove any migrating tablets that no longer exist.
-    private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-      Connector connector = getConnector();
+    /**
+     * If a migrating tablet splits, and the tablet dies before sending the
+     * master a message, the migration will refer to a non-existing tablet,
+     * so it can never complete. Periodically scan the metadata table and
+     * remove any migrating tablets that no longer exist.
+     */
+    private void cleanupNonexistentMigrations(final Connector connector) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
       Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
       Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
       Set<KeyExtent> found = new HashSet<KeyExtent>();
@@ -1950,6 +1952,21 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
       }
       migrations.keySet().retainAll(found);
     }
+
+    /**
+     * If migrating a tablet for a table that is offline, the migration
+     * can never succeed because no tablet server will load the tablet.
+     * check for offline tables and remove their migrations.
+     */
+    private void cleanupOfflineMigrations() {
+      TableManager manager = TableManager.getInstance();
+      for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
+        TableState state = manager.getTableState(tableId);
+        if (TableState.OFFLINE == state) {
+          clearMigrations(tableId);
+        }
+      }
+    }
   }
   
   private class StatusThread extends Daemon {
@@ -2418,6 +2435,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
   @Override
   public void stateChanged(String tableId, TableState state) {
     nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state);
+    if (TableState.OFFLINE == state) {
+      clearMigrations(tableId);
+    }
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
index e14008a..92eca13 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
 /**
@@ -40,6 +41,7 @@ import org.apache.thrift.TException;
  * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
  */
 public class ChaoticLoadBalancer extends TabletBalancer {
+  private static final Logger log = Logger.getLogger(ChaoticLoadBalancer.class);
   Random r = new Random();
   
   @Override
@@ -75,6 +77,8 @@ public class ChaoticLoadBalancer extends TabletBalancer {
     }
   }
   
+  protected final OutstandingMigrations outstandingMigrations = new OutstandingMigrations(log);
+
   /**
    * Will balance randomly, maintaining distribution
    */
@@ -83,8 +87,12 @@ public class ChaoticLoadBalancer extends TabletBalancer {
     Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
     List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
 
-    if (!migrations.isEmpty())
+    if (!migrations.isEmpty()) {
+      outstandingMigrations.migrations = migrations;
+      constraintNotMet(outstandingMigrations);
       return 100;
+    }
+    resetBalancerErrors();
 
     boolean moveMetadata = r.nextInt(4) == 0;
     long totalTablets = 0;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
index 1fcab46..9a970e7 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
@@ -302,16 +302,26 @@ public class DefaultLoadBalancer extends TabletBalancer {
       assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue()));
     }
   }
-  
+
+  private static final NoTservers NO_SERVERS = new NoTservers(log);
+
+  protected final OutstandingMigrations outstandingMigrations = new OutstandingMigrations(log);
+
   @Override
   public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
     // do we have any servers?
     if (current.size() > 0) {
       // Don't migrate if we have migrations in progress
       if (migrations.size() == 0) {
+        resetBalancerErrors();
         if (getMigrations(current, migrationsOut))
           return 1 * 1000;
+      } else {
+        outstandingMigrations.migrations = migrations;
+        constraintNotMet(outstandingMigrations);
       }
+    } else {
+      constraintNotMet(NO_SERVERS);
     }
     return 5 * 1000;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index 69387d3..16c5dbc 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@ -17,11 +17,14 @@
 package org.apache.accumulo.server.master.balancer;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 
+import com.google.common.collect.Iterables;
+
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -67,6 +70,11 @@ public abstract class TabletBalancer {
   
   /**
    * Ask the balancer if any migrations are necessary.
+   *
+   * If the balancer is going to self-abort due to some environmental constraint (e.g. it requires some minimum number of tservers, or a maximum number
+   * of outstanding migrations), it should issue a log message to alert operators. The message should be at WARN normally and at ERROR if the balancer knows that the
+   * problem can not self correct. It should not issue these messages more than once a minute. Subclasses can use the convenience methods of {@link #constraintNotMet()} and
+   * {@link #balanceSuccessful()} to accomplish this logging.
    * 
    * @param current
    *          The current table-summary state of all the online tablet servers. Read-only.
@@ -79,6 +87,89 @@ public abstract class TabletBalancer {
    *         This method will not be called when there are unassigned tablets.
    */
   public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
+
+  private static final long ONE_SECOND = 1000l;
+  private boolean stuck = false;
+  private long stuckNotificationTime = -1l;
+
+  protected static final long TIME_BETWEEN_BALANCER_WARNINGS = 60 * ONE_SECOND;
+
+  /**
+   * A deferred call descendent TabletBalancers use to log why they can't continue.
+   * The call is deferred so that TabletBalancer can limit how often messages happen.
+   *
+   * Implementations should be reused as much as possible.
+   *
+   * Be sure to pass in a properly scoped Logger instance so that messages indicate
+   * what part of the system is having trouble.
+   */
+  protected static abstract class BalancerProblem implements Runnable {
+    protected final Logger balancerLog;
+    public BalancerProblem(Logger logger) {
+      balancerLog = logger;
+    }
+  }
+
+  /**
+   * If a TabletBalancer requires active tservers, it should use this problem to indicate when there are none.
+   * NoTservers is safe to share with anyone who uses the same Logger. TabletBalancers should have a single
+   * static instance.
+   */
+  protected static class NoTservers extends BalancerProblem {
+    public NoTservers(Logger logger) {
+      super(logger);
+    }
+
+    @Override
+    public void run() {
+      balancerLog.warn("Not balancing because we don't have any tservers");
+    }
+  }
+
+  /**
+   * If a TabletBalancer only balances when there are no outstanding migrations, it should use this problem
+   * to indicate when they exist.
+   *
+   * Iff a TabletBalancer makes use of the migrations member to provide samples, then OutstandingMigrations
+   * is not thread safe.
+   */
+  protected static class OutstandingMigrations extends BalancerProblem {
+    public Set<KeyExtent> migrations = Collections.<KeyExtent>emptySet();
+
+    public OutstandingMigrations(Logger logger) {
+      super(logger);
+    }
+
+    @Override
+    public void run() {
+      balancerLog.warn("Not balancing due to " + migrations.size() + " outstanding migrations.");
+      /* TODO ACCUMULO-2938 redact key extents in this output to avoid leaking protected information. */
+      balancerLog.debug("Sample up to 10 outstanding migrations: " + Iterables.limit(migrations, 10));
+    }
+  }
+
+  /**
+   * Warn that a Balancer can't work because of some external restriction.
+   * Will not call the provided logging handler  more often than TIME_BETWEEN_BALANCER_WARNINGS
+   */
+  protected void constraintNotMet(BalancerProblem cause) {
+    if (!stuck) {
+      stuck = true;
+      stuckNotificationTime = System.currentTimeMillis();
+    } else {
+      if ((System.currentTimeMillis() - stuckNotificationTime) > TIME_BETWEEN_BALANCER_WARNINGS) {
+        cause.run();
+        stuckNotificationTime = System.currentTimeMillis();
+      }
+    }
+  }
+
+  /**
+   * Resets logging about problems meeting an external constraint on balancing.
+   */
+  protected void resetBalancerErrors() {
+    stuck = false;
+  }
   
   /**
    * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/test/system/auto/stress/migrations.py
----------------------------------------------------------------------
diff --git a/test/system/auto/stress/migrations.py b/test/system/auto/stress/migrations.py
index d07d7a8..9c94b88 100755
--- a/test/system/auto/stress/migrations.py
+++ b/test/system/auto/stress/migrations.py
@@ -76,7 +76,105 @@ class ChaoticBalancerIntegrity(SunnyDayTest):
         self.shell(self.masterHost(), 'flush -t test_ingest')
         self.waitForStop(self.verify(self.masterHost(), self.options.rows), 60)
 
+# Check for ACCUMULO-2694
+class BalanceInPresenceOfOfflineTable(SunnyDayTest):
+    """Start a new table, create many splits, and offline before they can rebalance. Then try to have a different table balance"""
+
+    order = 98
+
+    settings = TestUtilsMixin.settings.copy()
+    settings.update({
+        'tserver.memory.maps.max':'10K',
+        'tserver.compaction.major.delay': 0,
+        })
+    tableSettings = SunnyDayTest.tableSettings.copy()
+    tableSettings['test_ingest'] = {
+        'table.split.threshold': '10K',
+        }
+    def setUp(self):
+        # ensure we have two servers
+        if len(self.options.hosts) == 1:
+            self.options.hosts.append('localhost')
+        self.options.hosts = self.options.hosts[:2]
+
+        TestUtilsMixin.setUp(self);
+
+        # create a table with 200 splits
+        import tempfile
+        fileno, filename = tempfile.mkstemp()
+        fp = os.fdopen(fileno, "wb")
+        try:
+            for i in range(200):
+                fp.write("%08x\n" % (i * 1000))
+        finally:
+            fp.close()
+        self.createTable('unused', filename)
+        out,err,code = self.shell(self.masterHost(), 'offline -t unused\n')
+        self.processResult(out,err,code);
+
+        # create an empty table
+        self.createTable('test_ingest')
+
+    def runTest(self):
+
+        # start test ingestion
+        log.info("Starting Test Ingester")
+        self.ingester = self.ingest(self.masterHost(),
+                                    200000,
+                                    size=self.options.size)
+        self.waitForStop(self.ingester, self.timeout_factor * 120)
+        self.shell(self.masterHost(), 'flush -t test_ingest\n')
+        self.waitForStop(self.verify(self.masterHost(), self.options.rows), 60)
+
+        # let the server split tablets and move them around
+        # Keep retrying until the wait period for migration cleanup has passed
+        # which is hard coded to 5 minutes. :/
+        startTime = time.clock()
+        currentWait = 10
+        balancingWorked = False
+        while ((time.clock() - startTime) < 5*60+15):
+            self.sleep(currentWait)
+            # If we end up needing to sleep again, back off.
+            currentWait *= 2
+
+            # fetch the list of tablets from each server
+            h = self.runOn(self.masterHost(),
+                           [self.accumulo_sh(),
+                            'org.apache.accumulo.test.GetMasterStats'])
+            out, err = h.communicate()
+            servers = {}
+            server = None
+            # if balanced based on ingest, the table that split due to ingest
+            # will be split evenly on both servers, not just one
+            table = ''
+            tableId = self.getTableId('test_ingest');
+            for line in out.split('\n'):
+                if line.find(' Name: ') == 0:
+                    server = line[7:]
+                    servers.setdefault(server, 0)
+                if line.find('Table: ') >= 0:
+                    table = line.split(' ')[-1]
+                if line.find('    Tablets: ') == 0:
+                    if table == tableId:
+                       servers[server] += int(line.split()[-1])
+            log.info("Tablet counts " + repr(servers))
+
+            # we have two servers
+            if len(servers.values()) == 2:
+              servers = servers.values()
+              # a server has more than 10 splits
+              if servers[0] > 10:
+                # the ratio is roughly even
+                ratio = min(servers) / float(max(servers))
+                if ratio > 0.5:
+                  balancingWorked = True
+                  break
+            log.debug("tablets not balanced, sleeping for %d seconds" % currentWait)
+
+        self.assert_(balancingWorked)
+
 def suite():
     result = unittest.TestSuite()
     result.addTest(ChaoticBalancerIntegrity())
+    result.addTest(BalanceInPresenceOfOfflineTable())
     return result