You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/12/30 20:10:04 UTC

[13/15] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	server/src/main/java/org/apache/accumulo/server/master/Master.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5e247981
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5e247981
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5e247981

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 5e247981d13b7e2616ea05c05a2a44eeefc0d897
Parents: 063c88d 20cc9f4
Author: Eric Newton <er...@gmail.com>
Authored: Mon Dec 30 14:08:18 2013 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Mon Dec 30 14:08:18 2013 -0500

----------------------------------------------------------------------
 .../java/org/apache/accumulo/master/Master.java |  1 +
 .../functional/BalanceAfterCommsFailureIT.java  | 90 ++++++++++++++++++++
 2 files changed, 91 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e247981/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 60ec46c,0000000..304366c
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -1,1215 -1,0 +1,1216 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.impl.Namespaces;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
 +import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
 +import org.apache.accumulo.core.master.thrift.MasterGoalState;
 +import org.apache.accumulo.core.master.thrift.MasterState;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.core.security.NamespacePermission;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.AgeOffStore;
 +import org.apache.accumulo.fate.Fate;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.master.recovery.RecoveryManager;
 +import org.apache.accumulo.master.state.TableCounts;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManager.FileType;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.init.Initialize;
 +import org.apache.accumulo.server.master.LiveTServerSet;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 +import org.apache.accumulo.server.master.balancer.TabletBalancer;
 +import org.apache.accumulo.server.master.state.CurrentState;
 +import org.apache.accumulo.server.master.state.DeadServerList;
 +import org.apache.accumulo.server.master.state.MergeInfo;
 +import org.apache.accumulo.server.master.state.MergeState;
 +import org.apache.accumulo.server.master.state.MetaDataStateStore;
 +import org.apache.accumulo.server.master.state.RootTabletStateStore;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletMigration;
 +import org.apache.accumulo.server.master.state.TabletState;
 +import org.apache.accumulo.server.master.state.ZooStore;
 +import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.security.handler.ZKPermHandler;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.tables.TableObserver;
 +import org.apache.accumulo.server.util.DefaultMap;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.util.TServerUtils;
 +import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.data.Stat;
 +
 +/**
 + * The Master is responsible for assigning and balancing tablets to tablet servers.
 + * 
 + * The master will also coordinate log recoveries and reports general status.
 + */
 +public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState {
 +
 +  final static Logger log = Logger.getLogger(Master.class);
 +
 +  final static int ONE_SECOND = 1000;
 +  final private static Text METADATA_TABLE_ID = new Text(MetadataTable.ID);
 +  final private static Text ROOT_TABLE_ID = new Text(RootTable.ID);
 +  final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
 +  final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
 +  final static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 +  final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 +  final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
 +  final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
 +  final static int MAX_TSERVER_WORK_CHUNK = 5000;
 +  final private static int MAX_BAD_STATUS_COUNT = 3;
 +
 +  final VolumeManager fs;
 +  final private Instance instance;
 +  final private String hostname;
 +  final LiveTServerSet tserverSet;
 +  final private List<TabletGroupWatcher> watchers = new ArrayList<TabletGroupWatcher>();
 +  final SecurityOperation security;
 +  final Map<TServerInstance,AtomicInteger> badServers = Collections.synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger()));
 +  final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<TServerInstance>());
 +  final SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
 +  final EventCoordinator nextEvent = new EventCoordinator();
 +  final private Object mergeLock = new Object();
 +  RecoveryManager recoveryManager = null;
 +
 +  ZooLock masterLock = null;
 +  private TServer clientService = null;
 +  TabletBalancer tabletBalancer;
 +
 +  private MasterState state = MasterState.INITIAL;
 +
 +  Fate<Master> fate;
 +
 +  volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections.unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
 +
 +  synchronized MasterState getMasterState() {
 +    return state;
 +  }
 +
 +  public boolean stillMaster() {
 +    return getMasterState() != MasterState.STOP;
 +  }
 +
 +  static final boolean X = true;
 +  static final boolean _ = false;
 +  // @formatter:off
 +  static final boolean transitionOK[][] = {
 +      //                              INITIAL HAVE_LOCK SAFE_MODE NORMAL UNLOAD_META UNLOAD_ROOT STOP
 +      /* INITIAL */                   {X,     X,        _,        _,      _,         _,          X},
 +      /* HAVE_LOCK */                 {_,     X,        X,        X,      _,         _,          X},
 +      /* SAFE_MODE */                 {_,     _,        X,        X,      X,         _,          X},
 +      /* NORMAL */                    {_,     _,        X,        X,      X,         _,          X},
 +      /* UNLOAD_METADATA_TABLETS */   {_,     _,        X,        X,      X,         X,          X},
 +      /* UNLOAD_ROOT_TABLET */        {_,     _,        _,        X,      X,         X,          X},
 +      /* STOP */                      {_,     _,        _,        _,      _,         X,          X}};
 +  //@formatter:on
 +  synchronized void setMasterState(MasterState newState) {
 +    if (state.equals(newState))
 +      return;
 +    if (!transitionOK[state.ordinal()][newState.ordinal()]) {
 +      log.error("Programmer error: master should not transition from " + state + " to " + newState);
 +    }
 +    MasterState oldState = state;
 +    state = newState;
 +    nextEvent.event("State changed from %s to %s", oldState, newState);
 +    if (newState == MasterState.STOP) {
 +      // Give the server a little time before shutdown so the client
 +      // thread requesting the stop can return
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +        @Override
 +        public void run() {
 +          // This frees the main thread and will cause the master to exit
 +          clientService.stop();
 +          Master.this.nextEvent.event("stopped event loop");
 +        }
 +
 +      }, 100l, 1000l);
 +    }
 +
 +    if (oldState != newState && (newState == MasterState.HAVE_LOCK)) {
 +      upgradeZookeeper();
 +    }
 +
 +    if (oldState != newState && (newState == MasterState.NORMAL)) {
 +      upgradeMetadata();
 +    }
 +  }
 +
 +  private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws Exception {
 +    String dirZPath = ZooUtil.getRoot(instance) + RootTable.ZROOT_TABLET_PATH;
 +
 +    if (!zoo.exists(dirZPath)) {
 +      Path oldPath = fs.getFullPath(FileType.TABLE, "/!0/root_tablet");
 +      if (fs.exists(oldPath)) {
 +        String newPath = fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID;
 +        fs.mkdirs(new Path(newPath));
 +        if (!fs.rename(oldPath, new Path(newPath))) {
 +          throw new IOException("Failed to move root tablet from " + oldPath + " to " + newPath);
 +        }
 +
 +        log.info("Upgrade renamed " + oldPath + " to " + newPath);
 +      }
 +
 +      Path location = null;
 +
 +      for (String basePath : ServerConstants.getTablesDirs()) {
 +        Path path = new Path(basePath + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
 +        if (fs.exists(path)) {
 +          if (location != null) {
 +            throw new IllegalStateException("Root table at multiple locations " + location + " " + path);
 +          }
 +
 +          location = path;
 +        }
 +      }
 +
 +      if (location == null)
 +        throw new IllegalStateException("Failed to find root tablet");
 +
 +      log.info("Upgrade setting root table location in zookeeper " + location);
 +      zoo.putPersistentData(dirZPath, location.toString().getBytes(), NodeExistsPolicy.FAIL);
 +    }
 +  }
 +
 +  private void upgradeZookeeper() {
 +    if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
 +      try {
 +        log.info("Upgrading zookeeper");
 +
 +        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +
 +        // create initial namespaces
 +        String namespaces = ZooUtil.getRoot(instance) + Constants.ZNAMESPACES;
 +        zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP);
 +        if (!Namespaces.exists(instance, Namespaces.ACCUMULO_NAMESPACE_ID))
 +          TableManager.prepareNewNamespaceState(instance.getInstanceID(), Namespaces.ACCUMULO_NAMESPACE_ID, Namespaces.ACCUMULO_NAMESPACE,
 +              NodeExistsPolicy.SKIP);
 +        if (!Namespaces.exists(instance, Namespaces.DEFAULT_NAMESPACE_ID))
 +          TableManager.prepareNewNamespaceState(instance.getInstanceID(), Namespaces.DEFAULT_NAMESPACE_ID, Namespaces.DEFAULT_NAMESPACE, NodeExistsPolicy.SKIP);
 +
 +        // create root table
 +        if (!Tables.exists(instance, RootTable.ID)) {
 +          TableManager.prepareNewTableState(instance.getInstanceID(), RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE,
 +              NodeExistsPolicy.SKIP);
 +          Initialize.initMetadataConfig(RootTable.ID);
 +          // ensure root user can flush root table
 +          security.grantTablePermission(SystemCredentials.get().toThrift(instance), security.getRootUsername(), RootTable.ID, TablePermission.ALTER_TABLE);
 +        }
 +
 +        // put existing tables in the correct namespaces
 +        String tables = ZooUtil.getRoot(instance) + Constants.ZTABLES;
 +        for (Entry<String,String> table : Tables.getIdToNameMap(instance).entrySet()) {
 +          String targetNamespace = (MetadataTable.ID.equals(table.getKey()) || RootTable.ID.equals(table.getKey())) ? Namespaces.ACCUMULO_NAMESPACE_ID
 +              : Namespaces.DEFAULT_NAMESPACE_ID;
 +          zoo.putPersistentData(tables + "/" + table.getKey() + Constants.ZTABLE_NAMESPACE, targetNamespace.getBytes(Constants.UTF8), NodeExistsPolicy.SKIP);
 +        }
 +
 +        // rename metadata table
 +        zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME, Tables.qualify(MetadataTable.NAME).getSecond().getBytes(Constants.UTF8),
 +            NodeExistsPolicy.OVERWRITE);
 +
 +        moveRootTabletToRootTable(zoo);
 +
 +        // add system namespace permissions to existing users
 +        ZKPermHandler perm = new ZKPermHandler();
 +        perm.initialize(instance.getInstanceID(), true);
 +        String users = ZooUtil.getRoot(instance) + "/users";
 +        for (String user : zoo.getChildren(users)) {
 +          zoo.putPersistentData(users + "/" + user + "/Namespaces", new byte[0], NodeExistsPolicy.SKIP);
 +          perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
 +        }
 +        perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
 +
 +      } catch (Exception ex) {
 +        log.fatal("Error performing upgrade", ex);
 +        System.exit(1);
 +      }
 +    }
 +  }
 +
 +  private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);
 +
 +  private final ServerConfiguration serverConfig;
 +
 +  private void upgradeMetadata() {
 +    if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
 +      if (upgradeMetadataRunning.compareAndSet(false, true)) {
 +        Runnable upgradeTask = new Runnable() {
 +          @Override
 +          public void run() {
 +            try {
 +              MetadataTableUtil.moveMetaDeleteMarkers(instance, SystemCredentials.get());
 +              Accumulo.updateAccumuloVersion(fs);
 +
 +              log.info("Upgrade complete");
 +
 +            } catch (Exception ex) {
 +              log.fatal("Error performing upgrade", ex);
 +              System.exit(1);
 +            }
 +
 +          }
 +        };
 +
 +        // need to run this in a separate thread because a lock is held that prevents metadata tablets from being assigned and this task writes to the
 +        // metadata table
 +        new Thread(upgradeTask).start();
 +      }
 +    }
 +  }
 +
 +  private int assignedOrHosted(Text tableId) {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      TableCounts count = watcher.getStats(tableId);
 +      result += count.hosted() + count.assigned();
 +    }
 +    return result;
 +  }
 +
 +  private int totalAssignedOrHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.hosted();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  private int nonMetaDataTabletsAssignedOrHosted() {
 +    return totalAssignedOrHosted() - assignedOrHosted(new Text(MetadataTable.ID)) - assignedOrHosted(new Text(RootTable.ID));
 +  }
 +
 +  private int notHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.assignedToDeadServers();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  // The number of unassigned tablets that should be assigned: displayed on the monitor page
 +  int displayUnassigned() {
 +    int result = 0;
 +    switch (getMasterState()) {
 +      case NORMAL:
 +        // Count offline tablets for online tables
 +        for (TabletGroupWatcher watcher : watchers) {
 +          TableManager manager = TableManager.getInstance();
 +          for (Entry<Text,TableCounts> entry : watcher.getStats().entrySet()) {
 +            Text tableId = entry.getKey();
 +            TableCounts counts = entry.getValue();
 +            TableState tableState = manager.getTableState(tableId.toString());
 +            if (tableState != null && tableState.equals(TableState.ONLINE)) {
 +              result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned();
 +            }
 +          }
 +        }
 +        break;
 +      case SAFE_MODE:
 +        // Count offline tablets for the metadata table
 +        for (TabletGroupWatcher watcher : watchers) {
 +          result += watcher.getStats(METADATA_TABLE_ID).unassigned();
 +        }
 +        break;
 +      case UNLOAD_METADATA_TABLETS:
 +      case UNLOAD_ROOT_TABLET:
 +        for (TabletGroupWatcher watcher : watchers) {
 +          result += watcher.getStats(METADATA_TABLE_ID).unassigned();
 +        }
 +        break;
 +      default:
 +        break;
 +    }
 +    return result;
 +  }
 +
 +  public void mustBeOnline(final String tableId) throws ThriftTableOperationException {
 +    Tables.clearCache(instance);
 +    if (!Tables.getTableState(instance, tableId).equals(TableState.ONLINE))
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online");
 +  }
 +
 +  public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
 +    return instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken());
 +  }
 +
 +  private Master(ServerConfiguration config, VolumeManager fs, String hostname) throws IOException {
 +    this.serverConfig = config;
 +    this.instance = config.getInstance();
 +    this.fs = fs;
 +    this.hostname = hostname;
 +
 +    AccumuloConfiguration aconf = serverConfig.getConfiguration();
 +
 +    log.info("Version " + Constants.VERSION);
 +    log.info("Instance " + instance.getInstanceID());
 +    ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +    security = AuditedSecurityOperation.getInstance();
 +    tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this);
 +    this.tabletBalancer = aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());
 +    this.tabletBalancer.init(serverConfig);
 +  }
 +
 +  public TServerConnection getConnection(TServerInstance server) {
 +    return tserverSet.getConnection(server);
 +  }
 +
 +  public MergeInfo getMergeInfo(KeyExtent tablet) {
 +    return getMergeInfo(tablet.getTableId());
 +  }
 +
 +  public MergeInfo getMergeInfo(Text tableId) {
 +    synchronized (mergeLock) {
 +      try {
 +        String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
 +        if (!ZooReaderWriter.getInstance().exists(path))
 +          return new MergeInfo();
 +        byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
 +        DataInputBuffer in = new DataInputBuffer();
 +        in.reset(data, data.length);
 +        MergeInfo info = new MergeInfo();
 +        info.readFields(in);
 +        return info;
 +      } catch (KeeperException.NoNodeException ex) {
 +        log.info("Error reading merge state, it probably just finished");
 +        return new MergeInfo();
 +      } catch (Exception ex) {
 +        log.warn("Unexpected error reading merge state", ex);
 +        return new MergeInfo();
 +      }
 +    }
 +  }
 +
 +  public void setMergeState(MergeInfo info, MergeState state) throws IOException, KeeperException, InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + info.getExtent().getTableId().toString() + "/merge";
 +      info.setState(state);
 +      if (state.equals(MergeState.NONE)) {
 +        ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP);
 +      } else {
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try {
 +          info.write(out);
 +        } catch (IOException ex) {
 +          throw new RuntimeException("Unlikely", ex);
 +        }
 +        ZooReaderWriter.getInstance().putPersistentData(path, out.getData(),
 +            state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE);
 +      }
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
 +  }
 +
 +  public void clearMergeState(Text tableId) throws IOException, KeeperException, InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
 +      ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP);
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s cleared", tableId);
 +  }
 +
 +  void setMasterGoalState(MasterGoalState state) {
 +    try {
 +      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(),
 +          NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception ex) {
 +      log.error("Unable to set master goal state in zookeeper");
 +    }
 +  }
 +
 +  MasterGoalState getMasterGoalState() {
 +    while (true)
 +      try {
 +        byte[] data = ZooReaderWriter.getInstance().getData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, null);
 +        return MasterGoalState.valueOf(new String(data));
 +      } catch (Exception e) {
 +        log.error("Problem getting real goal state: " + e);
 +        UtilWaitThread.sleep(1000);
 +      }
 +  }
 +
 +  public boolean hasCycled(long time) {
 +    for (TabletGroupWatcher watcher : watchers) {
 +      if (watcher.stats.lastScanFinished() < time)
 +        return false;
 +    }
 +
 +    return true;
 +  }
 +
 +  public void clearMigrations(String tableId) {
 +    synchronized (migrations) {
 +      Iterator<KeyExtent> iterator = migrations.keySet().iterator();
 +      while (iterator.hasNext()) {
 +        KeyExtent extent = iterator.next();
 +        if (extent.getTableId().toString().equals(tableId)) {
 +          iterator.remove();
 +        }
 +      }
 +    }
 +  }
 +
 +  static enum TabletGoalState {
 +    HOSTED, UNASSIGNED, DELETED
 +  };
 +
 +  TabletGoalState getSystemGoalState(TabletLocationState tls) {
 +    switch (getMasterState()) {
 +      case NORMAL:
 +        return TabletGoalState.HOSTED;
 +      case HAVE_LOCK: // fall-through intended
 +      case INITIAL: // fall-through intended
 +      case SAFE_MODE:
 +        if (tls.extent.isMeta())
 +          return TabletGoalState.HOSTED;
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_METADATA_TABLETS:
 +        if (tls.extent.isRootTablet())
 +          return TabletGoalState.HOSTED;
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_ROOT_TABLET:
 +        return TabletGoalState.UNASSIGNED;
 +      case STOP:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        throw new IllegalStateException("Unknown Master State");
 +    }
 +  }
 +
 +  TabletGoalState getTableGoalState(KeyExtent extent) {
 +    TableState tableState = TableManager.getInstance().getTableState(extent.getTableId().toString());
 +    if (tableState == null)
 +      return TabletGoalState.DELETED;
 +    switch (tableState) {
 +      case DELETING:
 +        return TabletGoalState.DELETED;
 +      case OFFLINE:
 +      case NEW:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        return TabletGoalState.HOSTED;
 +    }
 +  }
 +
 +  TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) {
 +    KeyExtent extent = tls.extent;
 +    // Shutting down?
 +    TabletGoalState state = getSystemGoalState(tls);
 +    if (state == TabletGoalState.HOSTED) {
 +      if (tls.current != null && serversToShutdown.contains(tls.current)) {
 +        return TabletGoalState.UNASSIGNED;
 +      }
 +      // Handle merge transitions
 +      if (mergeInfo.getExtent() != null) {
 +        log.debug("mergeInfo overlaps: " + extent + " " + mergeInfo.overlaps(extent));
 +        if (mergeInfo.overlaps(extent)) {
 +          switch (mergeInfo.getState()) {
 +            case NONE:
 +            case COMPLETE:
 +              break;
 +            case STARTED:
 +            case SPLITTING:
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_CHOPPED:
 +              if (tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) {
 +                if (tls.chopped)
 +                  return TabletGoalState.UNASSIGNED;
 +              } else {
 +                if (tls.chopped && tls.walogs.isEmpty())
 +                  return TabletGoalState.UNASSIGNED;
 +              }
 +
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_OFFLINE:
 +            case MERGING:
 +              return TabletGoalState.UNASSIGNED;
 +          }
 +        }
 +      }
 +
 +      // taking table offline?
 +      state = getTableGoalState(extent);
 +      if (state == TabletGoalState.HOSTED) {
 +        // Maybe this tablet needs to be migrated
 +        TServerInstance dest = migrations.get(extent);
 +        if (dest != null && tls.current != null && !dest.equals(tls.current)) {
 +          return TabletGoalState.UNASSIGNED;
 +        }
 +      }
 +    }
 +    return state;
 +  }
 +
 +  private class MigrationCleanupThread extends Daemon {
 +
 +    @Override
 +    public void run() {
 +      setName("Migration Cleanup Thread");
 +      while (stillMaster()) {
 +        if (!migrations.isEmpty()) {
 +          try {
 +            cleanupMutations();
 +          } catch (Exception ex) {
 +            log.error("Error cleaning up migrations", ex);
 +          }
 +        }
 +        UtilWaitThread.sleep(TIME_BETWEEN_MIGRATION_CLEANUPS);
 +      }
 +    }
 +
 +    // If a migrating tablet splits, and the tablet dies before sending the
 +    // master a message, the migration will refer to a non-existing tablet,
 +    // so it can never complete. Periodically scan the metadata table and
 +    // remove any migrating tablets that no longer exist.
 +    private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +      Connector connector = getConnector();
 +      Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +      Set<KeyExtent> found = new HashSet<KeyExtent>();
 +      for (Entry<Key,Value> entry : scanner) {
 +        KeyExtent extent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
 +        if (migrations.containsKey(extent)) {
 +          found.add(extent);
 +        }
 +      }
 +      migrations.keySet().retainAll(found);
 +    }
 +  }
 +
 +  private class StatusThread extends Daemon {
 +
 +    @Override
 +    public void run() {
 +      setName("Status Thread");
 +      EventCoordinator.Listener eventListener = nextEvent.getListener();
 +      while (stillMaster()) {
 +        long wait = DEFAULT_WAIT_FOR_WATCHER;
 +        try {
 +          switch (getMasterGoalState()) {
 +            case NORMAL:
 +              setMasterState(MasterState.NORMAL);
 +              break;
 +            case SAFE_MODE:
 +              if (getMasterState() == MasterState.NORMAL) {
 +                setMasterState(MasterState.SAFE_MODE);
 +              }
 +              if (getMasterState() == MasterState.HAVE_LOCK) {
 +                setMasterState(MasterState.SAFE_MODE);
 +              }
 +              break;
 +            case CLEAN_STOP:
 +              switch (getMasterState()) {
 +                case NORMAL:
 +                  setMasterState(MasterState.SAFE_MODE);
 +                  break;
 +                case SAFE_MODE: {
 +                  int count = nonMetaDataTabletsAssignedOrHosted();
 +                  log.debug(String.format("There are %d non-metadata tablets assigned or hosted", count));
 +                  if (count == 0)
 +                    setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
 +                }
 +                  break;
 +                case UNLOAD_METADATA_TABLETS: {
 +                  int count = assignedOrHosted(METADATA_TABLE_ID);
 +                  log.debug(String.format("There are %d metadata tablets assigned or hosted", count));
 +                  if (count == 0)
 +                    setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +                }
 +                  break;
 +                case UNLOAD_ROOT_TABLET: {
 +                  int count = assignedOrHosted(METADATA_TABLE_ID);
 +                  if (count > 0) {
 +                    log.debug(String.format("%d metadata tablets online", count));
 +                    setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +                  }
 +                  int root_count = assignedOrHosted(ROOT_TABLE_ID);
 +                  if (root_count > 0)
 +                    log.debug("The root tablet is still assigned or hosted");
 +                  if (count + root_count == 0) {
 +                    Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
 +                    log.debug("stopping " + currentServers.size() + " tablet servers");
 +                    for (TServerInstance server : currentServers) {
 +                      try {
 +                        serversToShutdown.add(server);
 +                        tserverSet.getConnection(server).fastHalt(masterLock);
 +                      } catch (TException e) {
 +                        // its probably down, and we don't care
 +                      } finally {
 +                        tserverSet.remove(server);
 +                      }
 +                    }
 +                    if (currentServers.size() == 0)
 +                      setMasterState(MasterState.STOP);
 +                  }
 +                }
 +                  break;
 +                default:
 +                  break;
 +              }
 +          }
 +          wait = updateStatus();
 +          eventListener.waitForEvents(wait);
 +        } catch (Throwable t) {
 +          log.error("Error balancing tablets", t);
 +          UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS);
 +        }
 +      }
 +    }
 +
 +    private long updateStatus() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +      tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation());
 +      checkForHeldServer(tserverStatus);
 +
 +      if (!badServers.isEmpty()) {
 +        log.debug("not balancing because the balance information is out-of-date " + badServers.keySet());
 +      } else if (notHosted() > 0) {
 +        log.debug("not balancing because there are unhosted tablets");
 +      } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) {
 +        log.debug("not balancing because the master is attempting to stop cleanly");
 +      } else if (!serversToShutdown.isEmpty()) {
 +        log.debug("not balancing while shutting down servers " + serversToShutdown);
 +      } else {
 +        return balanceTablets();
 +      }
 +      return DEFAULT_WAIT_FOR_WATCHER;
 +    }
 +
 +    private void checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> tserverStatus) {
 +      TServerInstance instance = null;
 +      int crazyHoldTime = 0;
 +      int someHoldTime = 0;
 +      final long maxWait = getSystemConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE);
 +      for (Entry<TServerInstance,TabletServerStatus> entry : tserverStatus.entrySet()) {
 +        if (entry.getValue().getHoldTime() > 0) {
 +          someHoldTime++;
 +          if (entry.getValue().getHoldTime() > maxWait) {
 +            instance = entry.getKey();
 +            crazyHoldTime++;
 +          }
 +        }
 +      }
 +      if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 1) {
 +        log.warn("Tablet server " + instance + " exceeded maximum hold time: attempting to kill it");
 +        try {
 +          TServerConnection connection = tserverSet.getConnection(instance);
 +          if (connection != null)
 +            connection.fastHalt(masterLock);
 +        } catch (TException e) {
 +          log.error(e, e);
 +        }
 +        tserverSet.remove(instance);
 +      }
 +    }
 +
 +    private long balanceTablets() {
 +      List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
 +      Set<KeyExtent> migrationsCopy = new HashSet<KeyExtent>();
 +      synchronized (migrations) {
 +        migrationsCopy.addAll(migrations.keySet());
 +      }
 +      long wait = tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus), Collections.unmodifiableSet(migrationsCopy), migrationsOut);
 +
 +      for (TabletMigration m : TabletBalancer.checkMigrationSanity(tserverStatus.keySet(), migrationsOut)) {
 +        if (migrations.containsKey(m.tablet)) {
 +          log.warn("balancer requested migration more than once, skipping " + m);
 +          continue;
 +        }
 +        migrations.put(m.tablet, m.newServer);
 +        log.debug("migration " + m);
 +      }
 +      if (migrationsOut.size() > 0) {
 +        nextEvent.event("Migrating %d more tablets, %d total", migrationsOut.size(), migrations.size());
 +      }
 +      return wait;
 +    }
 +
 +  }
 +
 +  private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation() {
 +    long start = System.currentTimeMillis();
 +    SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>();
 +    Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
 +    for (TServerInstance server : currentServers) {
 +      try {
 +        Thread t = Thread.currentThread();
 +        String oldName = t.getName();
 +        try {
 +          t.setName("Getting status from " + server);
 +          TServerConnection connection = tserverSet.getConnection(server);
 +          if (connection == null)
 +            throw new IOException("No connection to " + server);
 +          TabletServerStatus status = connection.getTableMap(false);
 +          result.put(server, status);
 +        } finally {
 +          t.setName(oldName);
 +        }
 +      } catch (Exception ex) {
 +        log.error("unable to get tablet server status " + server + " " + ex.toString());
 +        log.debug("unable to get tablet server status " + server, ex);
 +        if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
 +          log.warn("attempting to stop " + server);
 +          try {
 +            TServerConnection connection = tserverSet.getConnection(server);
 +            if (connection != null)
 +              connection.halt(masterLock);
 +          } catch (TTransportException e) {
 +            // ignore: it's probably down
 +          } catch (Exception e) {
 +            log.info("error talking to troublesome tablet server ", e);
 +          }
 +          badServers.remove(server);
 +          tserverSet.remove(server);
 +        }
 +      }
 +    }
 +    synchronized (badServers) {
 +      badServers.keySet().retainAll(currentServers);
++      badServers.keySet().removeAll(result.keySet());
 +    }
 +    log.debug(String.format("Finished gathering information from %d servers in %.2f seconds", result.size(), (System.currentTimeMillis() - start) / 1000.));
 +    return result;
 +  }
 +
 +  public void run() throws IOException, InterruptedException, KeeperException {
 +    final String zroot = ZooUtil.getRoot(instance);
 +
 +    getMasterLock(zroot + Constants.ZMASTER_LOCK);
 +
 +    recoveryManager = new RecoveryManager(this);
 +
 +    TableManager.getInstance().addObserver(this);
 +
 +    StatusThread statusThread = new StatusThread();
 +    statusThread.start();
 +
 +    MigrationCleanupThread migrationCleanupThread = new MigrationCleanupThread();
 +    migrationCleanupThread.start();
 +
 +    tserverSet.startListeningForTabletServerChanges();
 +
 +    try {
 +      final AgeOffStore<Master> store = new AgeOffStore<Master>(new org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance) + Constants.ZFATE,
 +          ZooReaderWriter.getRetryingInstance()), 1000 * 60 * 60 * 8);
 +
 +      int threads = this.getConfiguration().getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);
 +
 +      fate = new Fate<Master>(this, store, threads);
 +
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +
 +        @Override
 +        public void run() {
 +          store.ageOff();
 +        }
 +      }, 63000, 63000);
 +    } catch (KeeperException e) {
 +      throw new IOException(e);
 +    } catch (InterruptedException e) {
 +      throw new IOException(e);
 +    }
 +
 +    ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
 +      @Override
 +      public void process(WatchedEvent event) {
 +        nextEvent.event("Noticed recovery changes", event.getType());
 +        try {
 +          // watcher only fires once, add it back
 +          ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, this);
 +        } catch (Exception e) {
 +          log.error("Failed to add log recovery watcher back", e);
 +        }
 +      }
 +    });
 +
 +    Credentials systemCreds = SystemCredentials.get();
 +    watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(instance, systemCreds, this), null));
 +    watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(instance, systemCreds, this), watchers.get(0)));
 +    watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1)));
 +    for (TabletGroupWatcher watcher : watchers) {
 +      watcher.start();
 +    }
 +
 +    Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler(this)));
 +    ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master",
 +        "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
 +    clientService = sa.server;
 +    String address = sa.address.toString();
 +    log.info("Setting master lock data to " + address);
 +    masterLock.replaceLockData(address.getBytes());
 +
 +    while (!clientService.isServing()) {
 +      UtilWaitThread.sleep(100);
 +    }
 +    while (clientService.isServing()) {
 +      UtilWaitThread.sleep(500);
 +    }
 +    log.info("Shutting down fate.");
 +    fate.shutdown();
 +
 +    final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
 +    statusThread.join(remaining(deadline));
 +
 +    // quit, even if the tablet servers somehow jam up and the watchers
 +    // don't stop
 +    for (TabletGroupWatcher watcher : watchers) {
 +      watcher.join(remaining(deadline));
 +    }
 +    log.info("exiting");
 +  }
 +
 +  private long remaining(long deadline) {
 +    return Math.max(1, deadline - System.currentTimeMillis());
 +  }
 +
 +  public ZooLock getMasterLock() {
 +    return masterLock;
 +  }
 +
 +  private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher {
 +
 +    boolean acquiredLock = false;
 +    boolean failedToAcquireLock = false;
 +
 +    @Override
 +    public void lostLock(LockLossReason reason) {
 +      Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
 +    }
 +
 +    @Override
 +    public void unableToMonitorLockNode(final Throwable e) {
 +      Halt.halt(-1, new Runnable() {
 +        @Override
 +        public void run() {
 +          log.fatal("No longer able to monitor master lock node", e);
 +        }
 +      });
 +
 +    }
 +
 +    @Override
 +    public synchronized void acquiredLock() {
 +
 +      if (acquiredLock || failedToAcquireLock) {
 +        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1);
 +      }
 +
 +      acquiredLock = true;
 +      notifyAll();
 +    }
 +
 +    @Override
 +    public synchronized void failedToAcquireLock(Exception e) {
 +      log.warn("Failed to get master lock " + e);
 +
 +      if (acquiredLock) {
 +        Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1);
 +      }
 +
 +      failedToAcquireLock = true;
 +      notifyAll();
 +    }
 +
 +    public synchronized void waitForChange() {
 +      while (!acquiredLock && !failedToAcquireLock) {
 +        try {
 +          wait();
 +        } catch (InterruptedException e) {}
 +      }
 +    }
 +  }
 +
 +  private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException {
 +    log.info("trying to get master lock");
 +
 +    final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT);
 +
 +    while (true) {
 +
 +      MasterLockWatcher masterLockWatcher = new MasterLockWatcher();
 +      masterLock = new ZooLock(zMasterLoc);
 +      masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes());
 +
 +      masterLockWatcher.waitForChange();
 +
 +      if (masterLockWatcher.acquiredLock) {
 +        break;
 +      }
 +
 +      if (!masterLockWatcher.failedToAcquireLock) {
 +        throw new IllegalStateException("master lock in unknown state");
 +      }
 +
 +      masterLock.tryToCancelAsyncLockOrUnlock();
 +
 +      UtilWaitThread.sleep(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS);
 +    }
 +
 +    setMasterState(MasterState.HAVE_LOCK);
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    try {
 +      SecurityUtil.serverLogin();
 +
 +      VolumeManager fs = VolumeManagerImpl.get();
 +      ServerOpts opts = new ServerOpts();
 +      opts.parseArgs("master", args);
 +      String hostname = opts.getAddress();
 +      Instance instance = HdfsZooInstance.getInstance();
 +      ServerConfiguration conf = new ServerConfiguration(instance);
 +      Accumulo.init(fs, conf, "master");
 +      Master master = new Master(conf, fs, hostname);
 +      Accumulo.enableTracing(hostname, "master");
 +      master.run();
 +    } catch (Exception ex) {
 +      log.error("Unexpected exception, exiting", ex);
 +      System.exit(1);
 +    }
 +  }
 +
 +  @Override
 +  public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
 +    DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS);
 +    if (added.size() > 0) {
 +      log.info("New servers: " + added);
 +      for (TServerInstance up : added)
 +        obit.delete(up.hostPort());
 +    }
 +    for (TServerInstance dead : deleted) {
 +      String cause = "unexpected failure";
 +      if (serversToShutdown.contains(dead))
 +        cause = "clean shutdown"; // maybe an incorrect assumption
 +      if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP))
 +        obit.post(dead.hostPort(), cause);
 +    }
 +
 +    Set<TServerInstance> unexpected = new HashSet<TServerInstance>(deleted);
 +    unexpected.removeAll(this.serversToShutdown);
 +    if (unexpected.size() > 0) {
 +      if (stillMaster() && !getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) {
 +        log.warn("Lost servers " + unexpected);
 +      }
 +    }
 +    serversToShutdown.removeAll(deleted);
 +    badServers.keySet().removeAll(deleted);
 +    // clear out any bad server with the same host/port as a new server
 +    synchronized (badServers) {
 +      cleanListByHostAndPort(badServers.keySet(), deleted, added);
 +    }
 +    synchronized (serversToShutdown) {
 +      cleanListByHostAndPort(serversToShutdown, deleted, added);
 +    }
 +
 +    synchronized (migrations) {
 +      Iterator<Entry<KeyExtent,TServerInstance>> iter = migrations.entrySet().iterator();
 +      while (iter.hasNext()) {
 +        Entry<KeyExtent,TServerInstance> entry = iter.next();
 +        if (deleted.contains(entry.getValue())) {
 +          log.info("Canceling migration of " + entry.getKey() + " to " + entry.getValue());
 +          iter.remove();
 +        }
 +      }
 +    }
 +    nextEvent.event("There are now %d tablet servers", current.size());
 +  }
 +
 +  private static void cleanListByHostAndPort(Collection<TServerInstance> badServers, Set<TServerInstance> deleted, Set<TServerInstance> added) {
 +    Iterator<TServerInstance> badIter = badServers.iterator();
 +    while (badIter.hasNext()) {
 +      TServerInstance bad = badIter.next();
 +      for (TServerInstance add : added) {
 +        if (bad.hostPort().equals(add.hostPort())) {
 +          badIter.remove();
 +          break;
 +        }
 +      }
 +      for (TServerInstance del : deleted) {
 +        if (bad.hostPort().equals(del.hostPort())) {
 +          badIter.remove();
 +          break;
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void stateChanged(String tableId, TableState state) {
 +    nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state);
 +  }
 +
 +  @Override
 +  public void initialize(Map<String,TableState> tableIdToStateMap) {}
 +
 +  @Override
 +  public void sessionExpired() {}
 +
 +  @Override
 +  public Set<String> onlineTables() {
 +    Set<String> result = new HashSet<String>();
 +    if (getMasterState() != MasterState.NORMAL) {
 +      if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS)
 +        result.add(MetadataTable.ID);
 +      if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET)
 +        result.add(RootTable.ID);
 +      return result;
 +    }
 +    TableManager manager = TableManager.getInstance();
 +
 +    for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
 +      TableState state = manager.getTableState(tableId);
 +      if (state != null) {
 +        if (state == TableState.ONLINE)
 +          result.add(tableId);
 +      }
 +    }
 +    return result;
 +  }
 +
 +  @Override
 +  public Set<TServerInstance> onlineTabletServers() {
 +    return tserverSet.getCurrentServers();
 +  }
 +
 +  @Override
 +  public Collection<MergeInfo> merges() {
 +    List<MergeInfo> result = new ArrayList<MergeInfo>();
 +    for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
 +      result.add(getMergeInfo(new Text(tableId)));
 +    }
 +    return result;
 +  }
 +
 +  // recovers state from the persistent transaction to shutdown a server
 +  public void shutdownTServer(TServerInstance server) {
 +    nextEvent.event("Tablet Server shutdown requested for %s", server);
 +    serversToShutdown.add(server);
 +  }
 +
 +  public EventCoordinator getEventCoordinator() {
 +    return nextEvent;
 +  }
 +
 +  public Instance getInstance() {
 +    return this.instance;
 +  }
 +
 +  public AccumuloConfiguration getSystemConfiguration() {
 +    return serverConfig.getConfiguration();
 +  }
 +
 +  public ServerConfiguration getConfiguration() {
 +    return serverConfig;
 +  }
 +
 +  public VolumeManager getFileSystem() {
 +    return this.fs;
 +  }
 +
 +  public void assignedTablet(KeyExtent extent) {
 +    if (extent.isMeta()) {
 +      if (getMasterState().equals(MasterState.UNLOAD_ROOT_TABLET)) {
 +        setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
 +      }
 +    }
 +    if (extent.isRootTablet()) {
 +      // probably too late, but try anyhow
 +      if (getMasterState().equals(MasterState.STOP)) {
 +        setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e247981/test/src/test/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
index 0000000,0000000..bf2f6b6
new file mode 100644
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
@@@ -1,0 -1,0 +1,90 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test.functional;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.List;
++import java.util.SortedSet;
++import java.util.TreeSet;
++
++import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.impl.MasterClient;
++import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.conf.Property;
++import org.apache.accumulo.core.master.thrift.MasterClientService;
++import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
++import org.apache.accumulo.core.master.thrift.TableInfo;
++import org.apache.accumulo.core.master.thrift.TabletServerStatus;
++import org.apache.accumulo.core.security.Credentials;
++import org.apache.accumulo.fate.util.UtilWaitThread;
++import org.apache.accumulo.minicluster.MiniAccumuloConfig;
++import org.apache.accumulo.trace.instrument.Tracer;
++import org.apache.hadoop.io.Text;
++import org.junit.Test;
++
++public class BalanceAfterCommsFailureIT extends ConfigurableMacIT {
++  
++  @Override
++  public void configure(MiniAccumuloConfig cfg) {
++    cfg.setSiteConfig(Collections.singletonMap(Property.GENERAL_RPC_TIMEOUT.getKey(), "2s"));
++  }
++  
++  @Test(timeout = 2 * 60 * 1000)
++  public void test() throws Exception {
++    Connector c = this.getConnector();
++    c.tableOperations().create("test");
++    assertEquals(0, Runtime.getRuntime().exec(new String[]{"pkill", "-SIGSTOP", "-f", "TabletServer"}).waitFor());
++    UtilWaitThread.sleep(20 * 1000);
++    assertEquals(0, Runtime.getRuntime().exec(new String[]{"pkill", "-SIGCONT", "-f", "TabletServer"}).waitFor());
++    SortedSet<Text> splits = new TreeSet<Text>();
++    for (String split : "a b c d e f g h i j k l m n o p q r s t u v w x y z".split(" ")) {
++      splits.add(new Text(split));
++    }
++    c.tableOperations().addSplits("test", splits);
++    UtilWaitThread.sleep(10 * 1000);
++    checkBalance(c);
++  }
++
++  private void checkBalance(Connector c) throws Exception {
++    Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
++    
++    MasterClientService.Iface client = null;
++    MasterMonitorInfo stats = null;
++    try {
++      client = MasterClient.getConnectionWithRetry(c.getInstance());
++      stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstance()));
++    } finally {
++      if (client != null)
++        MasterClient.close(client);
++    }
++    List<Integer> counts = new ArrayList<Integer>();
++    for (TabletServerStatus server : stats.tServerInfo) {
++      int count = 0;
++      for (TableInfo table : server.tableMap.values()) {
++        count += table.onlineTablets;
++      }
++      counts.add(count);
++    }
++    assertTrue(counts.size() > 1);
++    for (int i = 1; i < counts.size(); i++)
++      assertTrue(Math.abs(counts.get(0) - counts.get(i)) <= counts.size());
++  }
++}