You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/08/03 09:36:06 UTC
[3/9] git commit: ACCUMULO-2694 Fix handling of tablet migrations for
offline tables.
ACCUMULO-2694 Fix handling of tablet migrations for offline tables.
* Adds a funtional test that fails due to not rebalancing
* Fix master to clear migrations when it learns that a table has gone offline
* Update master to periodically clean up migrations for offline tables
* Fix balancers to make sure they log if they can't balance.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6bbe1216
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6bbe1216
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6bbe1216
Branch: refs/heads/master
Commit: 6bbe12165d81067651cc2d8c1c545fb31580d1a3
Parents: f848178
Author: Sean Busbey <bu...@cloudera.com>
Authored: Fri Apr 18 16:44:54 2014 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Aug 1 02:46:55 2014 -0500
----------------------------------------------------------------------
server/pom.xml | 4 +
.../apache/accumulo/server/master/Master.java | 34 +++++--
.../master/balancer/ChaoticLoadBalancer.java | 10 +-
.../master/balancer/DefaultLoadBalancer.java | 12 ++-
.../server/master/balancer/TabletBalancer.java | 91 ++++++++++++++++++
test/system/auto/stress/migrations.py | 98 ++++++++++++++++++++
6 files changed, 240 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index bd61fe6..fb526af 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -34,6 +34,10 @@
<artifactId>gson</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index a2ad2e6..12f8fed 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -1924,7 +1924,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
while (stillMaster()) {
if (!migrations.isEmpty()) {
try {
- cleanupMutations();
+ cleanupOfflineMigrations();
+ cleanupNonexistentMigrations(getConnector());
} catch (Exception ex) {
log.error("Error cleaning up migrations", ex);
}
@@ -1933,12 +1934,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
}
}
- // If a migrating tablet splits, and the tablet dies before sending the
- // master a message, the migration will refer to a non-existing tablet,
- // so it can never complete. Periodically scan the metadata table and
- // remove any migrating tablets that no longer exist.
- private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- Connector connector = getConnector();
+ /**
+ * If a migrating tablet splits, and the tablet dies before sending the
+ * master a message, the migration will refer to a non-existing tablet,
+ * so it can never complete. Periodically scan the metadata table and
+ * remove any migrating tablets that no longer exist.
+ */
+ private void cleanupNonexistentMigrations(final Connector connector) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
Set<KeyExtent> found = new HashSet<KeyExtent>();
@@ -1950,6 +1952,21 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
}
migrations.keySet().retainAll(found);
}
+
+ /**
+ * If migrating a tablet for a table that is offline, the migration
+ * can never succeed because no tablet server will load the tablet.
+ * check for offline tables and remove their migrations.
+ */
+ private void cleanupOfflineMigrations() {
+ TableManager manager = TableManager.getInstance();
+ for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
+ TableState state = manager.getTableState(tableId);
+ if (TableState.OFFLINE == state) {
+ clearMigrations(tableId);
+ }
+ }
+ }
}
private class StatusThread extends Daemon {
@@ -2418,6 +2435,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
@Override
public void stateChanged(String tableId, TableState state) {
nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state);
+ if (TableState.OFFLINE == state) {
+ clearMigrations(tableId);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
index e14008a..92eca13 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.log4j.Logger;
import org.apache.thrift.TException;
/**
@@ -40,6 +41,7 @@ import org.apache.thrift.TException;
* designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
*/
public class ChaoticLoadBalancer extends TabletBalancer {
+ private static final Logger log = Logger.getLogger(ChaoticLoadBalancer.class);
Random r = new Random();
@Override
@@ -75,6 +77,8 @@ public class ChaoticLoadBalancer extends TabletBalancer {
}
}
+ protected final OutstandingMigrations outstandingMigrations = new OutstandingMigrations(log);
+
/**
* Will balance randomly, maintaining distribution
*/
@@ -83,8 +87,12 @@ public class ChaoticLoadBalancer extends TabletBalancer {
Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
- if (!migrations.isEmpty())
+ if (!migrations.isEmpty()) {
+ outstandingMigrations.migrations = migrations;
+ constraintNotMet(outstandingMigrations);
return 100;
+ }
+ resetBalancerErrors();
boolean moveMetadata = r.nextInt(4) == 0;
long totalTablets = 0;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
index 1fcab46..9a970e7 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
@@ -302,16 +302,26 @@ public class DefaultLoadBalancer extends TabletBalancer {
assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue()));
}
}
-
+
+ private static final NoTservers NO_SERVERS = new NoTservers(log);
+
+ protected final OutstandingMigrations outstandingMigrations = new OutstandingMigrations(log);
+
@Override
public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
// do we have any servers?
if (current.size() > 0) {
// Don't migrate if we have migrations in progress
if (migrations.size() == 0) {
+ resetBalancerErrors();
if (getMigrations(current, migrationsOut))
return 1 * 1000;
+ } else {
+ outstandingMigrations.migrations = migrations;
+ constraintNotMet(outstandingMigrations);
}
+ } else {
+ constraintNotMet(NO_SERVERS);
}
return 5 * 1000;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index 69387d3..16c5dbc 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@ -17,11 +17,14 @@
package org.apache.accumulo.server.master.balancer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
+import com.google.common.collect.Iterables;
+
import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
@@ -67,6 +70,11 @@ public abstract class TabletBalancer {
/**
* Ask the balancer if any migrations are necessary.
+ *
+ * If the balancer is going to self-abort due to some environmental constraint (e.g. it requires some minimum number of tservers, or a maximum number
+ * of outstanding migrations), it should issue a log message to alert operators. The message should be at WARN normally and at ERROR if the balancer knows that the
+ * problem can not self correct. It should not issue these messages more than once a minute. Subclasses can use the convenience methods of {@link #constraintNotMet()} and
+ * {@link #balanceSuccessful()} to accomplish this logging.
*
* @param current
* The current table-summary state of all the online tablet servers. Read-only.
@@ -79,6 +87,89 @@ public abstract class TabletBalancer {
* This method will not be called when there are unassigned tablets.
*/
public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
+
+ private static final long ONE_SECOND = 1000l;
+ private boolean stuck = false;
+ private long stuckNotificationTime = -1l;
+
+ protected static final long TIME_BETWEEN_BALANCER_WARNINGS = 60 * ONE_SECOND;
+
+ /**
+ * A deferred call descendent TabletBalancers use to log why they can't continue.
+ * The call is deferred so that TabletBalancer can limit how often messages happen.
+ *
+ * Implementations should be reused as much as possible.
+ *
+ * Be sure to pass in a properly scoped Logger instance so that messages indicate
+ * what part of the system is having trouble.
+ */
+ protected static abstract class BalancerProblem implements Runnable {
+ protected final Logger balancerLog;
+ public BalancerProblem(Logger logger) {
+ balancerLog = logger;
+ }
+ }
+
+ /**
+ * If a TabletBalancer requires active tservers, it should use this problem to indicate when there are none.
+ * NoTservers is safe to share with anyone who uses the same Logger. TabletBalancers should have a single
+ * static instance.
+ */
+ protected static class NoTservers extends BalancerProblem {
+ public NoTservers(Logger logger) {
+ super(logger);
+ }
+
+ @Override
+ public void run() {
+ balancerLog.warn("Not balancing because we don't have any tservers");
+ }
+ }
+
+ /**
+ * If a TabletBalancer only balances when there are no outstanding migrations, it should use this problem
+ * to indicate when they exist.
+ *
+ * Iff a TabletBalancer makes use of the migrations member to provide samples, then OutstandingMigrations
+ * is not thread safe.
+ */
+ protected static class OutstandingMigrations extends BalancerProblem {
+ public Set<KeyExtent> migrations = Collections.<KeyExtent>emptySet();
+
+ public OutstandingMigrations(Logger logger) {
+ super(logger);
+ }
+
+ @Override
+ public void run() {
+ balancerLog.warn("Not balancing due to " + migrations.size() + " outstanding migrations.");
+ /* TODO ACCUMULO-2938 redact key extents in this output to avoid leaking protected information. */
+ balancerLog.debug("Sample up to 10 outstanding migrations: " + Iterables.limit(migrations, 10));
+ }
+ }
+
+ /**
+ * Warn that a Balancer can't work because of some external restriction.
+ * Will not call the provided logging handler more often than TIME_BETWEEN_BALANCER_WARNINGS
+ */
+ protected void constraintNotMet(BalancerProblem cause) {
+ if (!stuck) {
+ stuck = true;
+ stuckNotificationTime = System.currentTimeMillis();
+ } else {
+ if ((System.currentTimeMillis() - stuckNotificationTime) > TIME_BETWEEN_BALANCER_WARNINGS) {
+ cause.run();
+ stuckNotificationTime = System.currentTimeMillis();
+ }
+ }
+ }
+
+ /**
+ * Resets logging about problems meeting an external constraint on balancing.
+ */
+ protected void resetBalancerErrors() {
+ stuck = false;
+ }
/**
* Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/test/system/auto/stress/migrations.py
----------------------------------------------------------------------
diff --git a/test/system/auto/stress/migrations.py b/test/system/auto/stress/migrations.py
index d07d7a8..9c94b88 100755
--- a/test/system/auto/stress/migrations.py
+++ b/test/system/auto/stress/migrations.py
@@ -76,7 +76,105 @@ class ChaoticBalancerIntegrity(SunnyDayTest):
self.shell(self.masterHost(), 'flush -t test_ingest')
self.waitForStop(self.verify(self.masterHost(), self.options.rows), 60)
+# Check for ACCUMULO-2694
+class BalanceInPresenceOfOfflineTable(SunnyDayTest):
+ """Start a new table, create many splits, and offline before they can rebalance. Then try to have a different table balance"""
+
+ order = 98
+
+ settings = TestUtilsMixin.settings.copy()
+ settings.update({
+ 'tserver.memory.maps.max':'10K',
+ 'tserver.compaction.major.delay': 0,
+ })
+ tableSettings = SunnyDayTest.tableSettings.copy()
+ tableSettings['test_ingest'] = {
+ 'table.split.threshold': '10K',
+ }
+ def setUp(self):
+ # ensure we have two servers
+ if len(self.options.hosts) == 1:
+ self.options.hosts.append('localhost')
+ self.options.hosts = self.options.hosts[:2]
+
+ TestUtilsMixin.setUp(self);
+
+ # create a table with 200 splits
+ import tempfile
+ fileno, filename = tempfile.mkstemp()
+ fp = os.fdopen(fileno, "wb")
+ try:
+ for i in range(200):
+ fp.write("%08x\n" % (i * 1000))
+ finally:
+ fp.close()
+ self.createTable('unused', filename)
+ out,err,code = self.shell(self.masterHost(), 'offline -t unused\n')
+ self.processResult(out,err,code);
+
+ # create an empty table
+ self.createTable('test_ingest')
+
+ def runTest(self):
+
+ # start test ingestion
+ log.info("Starting Test Ingester")
+ self.ingester = self.ingest(self.masterHost(),
+ 200000,
+ size=self.options.size)
+ self.waitForStop(self.ingester, self.timeout_factor * 120)
+ self.shell(self.masterHost(), 'flush -t test_ingest\n')
+ self.waitForStop(self.verify(self.masterHost(), self.options.rows), 60)
+
+ # let the server split tablets and move them around
+ # Keep retrying until the wait period for migration cleanup has passed
+ # which is hard coded to 5 minutes. :/
+ startTime = time.clock()
+ currentWait = 10
+ balancingWorked = False
+ while ((time.clock() - startTime) < 5*60+15):
+ self.sleep(currentWait)
+ # If we end up needing to sleep again, back off.
+ currentWait *= 2
+
+ # fetch the list of tablets from each server
+ h = self.runOn(self.masterHost(),
+ [self.accumulo_sh(),
+ 'org.apache.accumulo.test.GetMasterStats'])
+ out, err = h.communicate()
+ servers = {}
+ server = None
+ # if balanced based on ingest, the table that split due to ingest
+ # will be split evenly on both servers, not just one
+ table = ''
+ tableId = self.getTableId('test_ingest');
+ for line in out.split('\n'):
+ if line.find(' Name: ') == 0:
+ server = line[7:]
+ servers.setdefault(server, 0)
+ if line.find('Table: ') >= 0:
+ table = line.split(' ')[-1]
+ if line.find(' Tablets: ') == 0:
+ if table == tableId:
+ servers[server] += int(line.split()[-1])
+ log.info("Tablet counts " + repr(servers))
+
+ # we have two servers
+ if len(servers.values()) == 2:
+ servers = servers.values()
+ # a server has more than 10 splits
+ if servers[0] > 10:
+ # the ratio is roughly even
+ ratio = min(servers) / float(max(servers))
+ if ratio > 0.5:
+ balancingWorked = True
+ break
+ log.debug("tablets not balanced, sleeping for %d seconds" % currentWait)
+
+ self.assert_(balancingWorked)
+
def suite():
result = unittest.TestSuite()
result.addTest(ChaoticBalancerIntegrity())
+ result.addTest(BalanceInPresenceOfOfflineTable())
return result