You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/12/30 20:10:01 UTC

[10/15] Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/accumulo/blob/20cc9f4e/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/Master.java
index 5cb928b,0000000..100a351
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@@ -1,2400 -1,0 +1,2401 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master;
 +
 +import static java.lang.Math.min;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.TableOperationsImpl;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.thrift.TKeyExtent;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.master.thrift.MasterClientService;
 +import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
 +import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
 +import org.apache.accumulo.core.master.thrift.MasterGoalState;
 +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 +import org.apache.accumulo.core.master.thrift.MasterState;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletLoadState;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.master.thrift.TabletSplit;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.AgeOffStore;
 +import org.apache.accumulo.fate.Fate;
 +import org.apache.accumulo.fate.TStore.TStatus;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 +import org.apache.accumulo.server.master.balancer.TabletBalancer;
 +import org.apache.accumulo.server.master.recovery.RecoveryManager;
 +import org.apache.accumulo.server.master.state.Assignment;
 +import org.apache.accumulo.server.master.state.CurrentState;
 +import org.apache.accumulo.server.master.state.DeadServerList;
 +import org.apache.accumulo.server.master.state.DistributedStoreException;
 +import org.apache.accumulo.server.master.state.MergeInfo;
 +import org.apache.accumulo.server.master.state.MergeState;
 +import org.apache.accumulo.server.master.state.MergeStats;
 +import org.apache.accumulo.server.master.state.MetaDataStateStore;
 +import org.apache.accumulo.server.master.state.RootTabletStateStore;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TableCounts;
 +import org.apache.accumulo.server.master.state.TableStats;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletMigration;
 +import org.apache.accumulo.server.master.state.TabletServerState;
 +import org.apache.accumulo.server.master.state.TabletState;
 +import org.apache.accumulo.server.master.state.TabletStateStore;
 +import org.apache.accumulo.server.master.state.ZooStore;
 +import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 +import org.apache.accumulo.server.master.state.tables.TableManager;
 +import org.apache.accumulo.server.master.state.tables.TableObserver;
 +import org.apache.accumulo.server.master.tableOps.BulkImport;
 +import org.apache.accumulo.server.master.tableOps.CancelCompactions;
 +import org.apache.accumulo.server.master.tableOps.ChangeTableState;
 +import org.apache.accumulo.server.master.tableOps.CloneTable;
 +import org.apache.accumulo.server.master.tableOps.CompactRange;
 +import org.apache.accumulo.server.master.tableOps.CreateTable;
 +import org.apache.accumulo.server.master.tableOps.DeleteTable;
 +import org.apache.accumulo.server.master.tableOps.ExportTable;
 +import org.apache.accumulo.server.master.tableOps.ImportTable;
 +import org.apache.accumulo.server.master.tableOps.RenameTable;
 +import org.apache.accumulo.server.master.tableOps.TableRangeOp;
 +import org.apache.accumulo.server.master.tableOps.TraceRepo;
 +import org.apache.accumulo.server.master.tserverOps.ShutdownTServer;
 +import org.apache.accumulo.server.monitor.Monitor;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SecurityConstants;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.tabletserver.TabletTime;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.accumulo.server.util.AddressUtil;
 +import org.apache.accumulo.server.util.DefaultMap;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.MetadataTable;
 +import org.apache.accumulo.server.util.SystemPropUtil;
 +import org.apache.accumulo.server.util.TServerUtils;
 +import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 +import org.apache.accumulo.server.util.TablePropUtil;
 +import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 +import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 +import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.data.Stat;
 +
 +/**
 + * The Master is responsible for assigning and balancing tablets to tablet servers.
 + * 
 + * The master will also coordinate log recoveries and reports general status.
 + */
 +public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState {
 +  
 +  final private static Logger log = Logger.getLogger(Master.class);
 +  
 +  final private static int ONE_SECOND = 1000;
 +  final private static Text METADATA_TABLE_ID = new Text(Constants.METADATA_TABLE_ID);
 +  final private static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
 +  final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
 +  final private static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 +  final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 +  final private static int MAX_CLEANUP_WAIT_TIME = 1000;
 +  final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = 1000;
 +  final private static int MAX_TSERVER_WORK_CHUNK = 5000;
 +  final private static int MAX_BAD_STATUS_COUNT = 3;
 +  
 +  final private FileSystem fs;
 +  final private Instance instance;
 +  final private String hostname;
 +  final private LiveTServerSet tserverSet;
 +  final private List<TabletGroupWatcher> watchers = new ArrayList<TabletGroupWatcher>();
 +  final private SecurityOperation security;
 +  final private Map<TServerInstance,AtomicInteger> badServers = Collections.synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger()));
 +  final private Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<TServerInstance>());
 +  final private SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
 +  final private EventCoordinator nextEvent = new EventCoordinator();
 +  final private Object mergeLock = new Object();
 +  private RecoveryManager recoveryManager = null;
 +  
 +  private ZooLock masterLock = null;
 +  private TServer clientService = null;
 +  private TabletBalancer tabletBalancer;
 +  
 +  private MasterState state = MasterState.INITIAL;
 +  
 +  private Fate<Master> fate;
 +  
 +  volatile private SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections
 +      .unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
 +  
 +  private final Set<String> recoveriesInProgress = Collections.synchronizedSet(new HashSet<String>());
 +  
 +  synchronized private MasterState getMasterState() {
 +    return state;
 +  }
 +  
 +  public boolean stillMaster() {
 +    return getMasterState() != MasterState.STOP;
 +  }
 +  
 +  static final boolean X = true;
 +  static final boolean _ = false;
 +  // @formatter:off
 +  static final boolean transitionOK[][] = {
 +      //                              INITIAL HAVE_LOCK SAFE_MODE NORMAL UNLOAD_META UNLOAD_ROOT STOP
 +      /* INITIAL */                   {X,     X,        _,        _,      _,         _,          X},
 +      /* HAVE_LOCK */                 {_,     X,        X,        X,      _,         _,          X},
 +      /* SAFE_MODE */                 {_,     _,        X,        X,      X,         _,          X},
 +      /* NORMAL */                    {_,     _,        X,        X,      X,         _,          X},
 +      /* UNLOAD_METADATA_TABLETS */   {_,     _,        X,        X,      X,         X,          X},
 +      /* UNLOAD_ROOT_TABLET */        {_,     _,        _,        X,      _,         X,          X},
 +      /* STOP */                      {_,     _,        _,        _,      _,         _,          X}};
 +  //@formatter:on
 +  synchronized private void setMasterState(MasterState newState) {
 +    if (state.equals(newState))
 +      return;
 +    if (!transitionOK[state.ordinal()][newState.ordinal()]) {
 +      log.error("Programmer error: master should not transition from " + state + " to " + newState);
 +    }
 +    MasterState oldState = state;
 +    state = newState;
 +    nextEvent.event("State changed from %s to %s", oldState, newState);
 +    if (newState == MasterState.STOP) {
 +      // Give the server a little time before shutdown so the client
 +      // thread requesting the stop can return
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +        @Override
 +        public void run() {
 +          // This frees the main thread and will cause the master to exit
 +          clientService.stop();
 +          Master.this.nextEvent.event("stopped event loop");
 +        }
 +        
 +      }, 100l, 1000l);
 +    }
 +    
 +    if (oldState != newState && (newState == MasterState.HAVE_LOCK)) {
 +      upgradeZookeeper();
 +    }
 +    
 +    if (oldState != newState && (newState == MasterState.NORMAL)) {
 +      upgradeMetadata();
 +    }
 +  }
 +  
 +  private void upgradeZookeeper() {
 +    if (Accumulo.getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
 +      try {
 +        log.info("Upgrading zookeeper");
 +        
 +        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +        
 +        zoo.recursiveDelete(ZooUtil.getRoot(instance) + "/loggers", NodeMissingPolicy.SKIP);
 +        zoo.recursiveDelete(ZooUtil.getRoot(instance) + "/dead/loggers", NodeMissingPolicy.SKIP);
 +
 +        zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZRECOVERY, new byte[] {'0'}, NodeExistsPolicy.SKIP);
 +        
 +        for (String id : Tables.getIdToNameMap(instance).keySet()) {
 +          
 +          zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + id + Constants.ZTABLE_COMPACT_CANCEL_ID, "0".getBytes(),
 +              NodeExistsPolicy.SKIP);
 +        }
 +      } catch (Exception ex) {
 +        log.fatal("Error performing upgrade", ex);
 +        System.exit(1);
 +      }
 +    }
 +  }
 +  
 +  private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);
 +  
 +  private final ServerConfiguration serverConfig;
 +  
 +  private void upgradeMetadata() {
 +    if (Accumulo.getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
 +      if (upgradeMetadataRunning.compareAndSet(false, true)) {
 +        Runnable upgradeTask = new Runnable() {
 +          @Override
 +          public void run() {
 +            try {
 +              MetadataTable.moveMetaDeleteMarkers(instance, SecurityConstants.getSystemCredentials());
 +              Accumulo.updateAccumuloVersion(fs);
 +              
 +              log.info("Upgrade complete");
 +              
 +            } catch (Exception ex) {
 +              log.fatal("Error performing upgrade", ex);
 +              System.exit(1);
 +            }
 +            
 +          }
 +        };
 +        
 +        // need to run this in a separate thread because a lock is held that prevents !METADATA tablets from being assigned and this task writes to the
 +        // !METADATA table
 +        new Thread(upgradeTask).start();
 +      }
 +    }
 +  }
 +  
 +  private int assignedOrHosted(Text tableId) {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      TableCounts count = watcher.getStats(tableId);
 +      result += count.hosted() + count.assigned();
 +    }
 +    return result;
 +  }
 +  
 +  private int totalAssignedOrHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.hosted();
 +      }
 +    }
 +    return result;
 +  }
 +  
 +  private int nonMetaDataTabletsAssignedOrHosted() {
 +    return totalAssignedOrHosted() - assignedOrHosted(new Text(Constants.METADATA_TABLE_ID));
 +  }
 +  
 +  private int notHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.assignedToDeadServers();
 +      }
 +    }
 +    return result;
 +  }
 +  
 +  // The number of unassigned tablets that should be assigned: displayed on the monitor page
 +  private int displayUnassigned() {
 +    int result = 0;
 +    Text meta = new Text(Constants.METADATA_TABLE_ID);
 +    switch (getMasterState()) {
 +      case NORMAL:
 +        // Count offline tablets for online tables
 +        for (TabletGroupWatcher watcher : watchers) {
 +          TableManager manager = TableManager.getInstance();
 +          for (Entry<Text,TableCounts> entry : watcher.getStats().entrySet()) {
 +            Text tableId = entry.getKey();
 +            TableCounts counts = entry.getValue();
 +            TableState tableState = manager.getTableState(tableId.toString());
 +            if (tableState != null && tableState.equals(TableState.ONLINE)) {
 +              result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned();
 +            }
 +          }
 +        }
 +        break;
 +      case SAFE_MODE:
 +        // Count offline tablets for the METADATA table
 +        for (TabletGroupWatcher watcher : watchers) {
 +          result += watcher.getStats(meta).unassigned();
 +        }
 +        break;
 +      case UNLOAD_METADATA_TABLETS:
 +      case UNLOAD_ROOT_TABLET:
 +        for (TabletGroupWatcher watcher : watchers) {
 +          result += watcher.getStats(meta).unassigned();
 +        }
 +        break;
 +      default:
 +        break;
 +    }
 +    return result;
 +  }
 +  
 +  private void checkNotMetadataTable(String tableName, TableOperation operation) throws ThriftTableOperationException {
 +    if (tableName.compareTo(Constants.METADATA_TABLE_NAME) == 0) {
 +      String why = "Table names cannot be == " + Constants.METADATA_TABLE_NAME;
 +      log.warn(why);
 +      throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.OTHER, why);
 +    }
 +  }
 +  
 +  private void checkTableName(String tableName, TableOperation operation) throws ThriftTableOperationException {
 +    if (!tableName.matches(Constants.VALID_TABLE_NAME_REGEX)) {
 +      String why = "Table names must only contain word characters (letters, digits, and underscores): " + tableName;
 +      log.warn(why);
 +      throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.OTHER, why);
 +    }
 +    if (Tables.getNameToIdMap(HdfsZooInstance.getInstance()).containsKey(tableName)) {
 +      String why = "Table name already exists: " + tableName;
 +      throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.EXISTS, why);
 +    }
 +    
 +  }
 +  
 +  public void mustBeOnline(final String tableId) throws ThriftTableOperationException {
 +    Tables.clearCache(instance);
 +    if (!Tables.getTableState(instance, tableId).equals(TableState.ONLINE))
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online");
 +  }
 +  
 +  public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
 +    return instance.getConnector(SecurityConstants.SYSTEM_PRINCIPAL, SecurityConstants.getSystemToken());
 +  }
 +  
 +  private void waitAround(EventCoordinator.Listener listener) {
 +    listener.waitForEvents(ONE_SECOND);
 +  }
 +  
 +  // TODO: maybe move this to Property? We do this in TabletServer, Master, TableLoadBalancer, etc. - ACCUMULO-1295
 +  public static <T> T createInstanceFromPropertyName(AccumuloConfiguration conf, Property property, Class<T> base, T defaultInstance) {
 +    String clazzName = conf.get(property);
 +    T instance = null;
 +    
 +    try {
 +      Class<? extends T> clazz = AccumuloVFSClassLoader.loadClass(clazzName, base);
 +      instance = clazz.newInstance();
 +      log.info("Loaded class : " + clazzName);
 +    } catch (Exception e) {
 +      log.warn("Failed to load class ", e);
 +    }
 +    
 +    if (instance == null) {
 +      log.info("Using " + defaultInstance.getClass().getName());
 +      instance = defaultInstance;
 +    }
 +    return instance;
 +  }
 +  
 +  public Master(ServerConfiguration config, FileSystem fs, String hostname) throws IOException {
 +    this.serverConfig = config;
 +    this.instance = config.getInstance();
 +    this.fs = TraceFileSystem.wrap(fs);
 +    this.hostname = hostname;
 +    
 +    AccumuloConfiguration aconf = serverConfig.getConfiguration();
 +    
 +    log.info("Version " + Constants.VERSION);
 +    log.info("Instance " + instance.getInstanceID());
 +    ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +    security = AuditedSecurityOperation.getInstance();
 +    tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this);
 +    this.tabletBalancer = createInstanceFromPropertyName(aconf, Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());
 +    this.tabletBalancer.init(serverConfig);
 +  }
 +  
 +  public TServerConnection getConnection(TServerInstance server) {
 +    try {
 +      return tserverSet.getConnection(server);
 +    } catch (TException ex) {
 +      return null;
 +    }
 +  }
 +  
 +  private class MasterClientServiceHandler implements MasterClientService.Iface {
 +    
 +    protected String checkTableId(String tableName, TableOperation operation) throws ThriftTableOperationException {
 +      final String tableId = Tables.getNameToIdMap(getConfiguration().getInstance()).get(tableName);
 +      if (tableId == null)
 +        throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.NOTFOUND, null);
 +      return tableId;
 +    }
 +    
 +    @Override
 +    public long initiateFlush(TInfo tinfo, TCredentials c, String tableId) throws ThriftSecurityException, ThriftTableOperationException, TException {
 +      security.canFlush(c, tableId);
 +      
 +      String zTablePath = Constants.ZROOT + "/" + getConfiguration().getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
 +          + Constants.ZTABLE_FLUSH_ID;
 +      
 +      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +      byte fid[];
 +      try {
 +        fid = zoo.mutate(zTablePath, null, null, new Mutator() {
 +          @Override
 +          public byte[] mutate(byte[] currentValue) throws Exception {
 +            long flushID = Long.parseLong(new String(currentValue));
 +            flushID++;
 +            return ("" + flushID).getBytes();
 +          }
 +        });
 +      } catch (NoNodeException nne) {
 +        throw new ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.NOTFOUND, null);
 +      } catch (Exception e) {
 +        log.warn(e.getMessage(), e);
 +        throw new ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.OTHER, null);
 +      }
 +      return Long.parseLong(new String(fid));
 +    }
 +    
 +    @Override
 +    public void waitForFlush(TInfo tinfo, TCredentials c, String tableId, ByteBuffer startRow, ByteBuffer endRow, long flushID, long maxLoops)
 +        throws ThriftSecurityException, ThriftTableOperationException, TException {
 +      security.canFlush(c, tableId);
 +      
 +      if (endRow != null && startRow != null && ByteBufferUtil.toText(startRow).compareTo(ByteBufferUtil.toText(endRow)) >= 0)
 +        throw new ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.BAD_RANGE,
 +            "start row must be less than end row");
 +      
 +      Set<TServerInstance> serversToFlush = new HashSet<TServerInstance>(tserverSet.getCurrentServers());
 +      
 +      for (long l = 0; l < maxLoops; l++) {
 +        
 +        for (TServerInstance instance : serversToFlush) {
 +          try {
 +            final TServerConnection server = tserverSet.getConnection(instance);
 +            if (server != null)
 +              server.flush(masterLock, tableId, ByteBufferUtil.toBytes(startRow), ByteBufferUtil.toBytes(endRow));
 +          } catch (TException ex) {
 +            log.error(ex.toString());
 +          }
 +        }
 +        
 +        if (l == maxLoops - 1)
 +          break;
 +        
 +        UtilWaitThread.sleep(50);
 +        
 +        serversToFlush.clear();
 +        
 +        try {
 +          Connector conn = getConnector();
 +          Scanner scanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS));
 +          Constants.METADATA_FLUSH_COLUMN.fetch(scanner);
 +          Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);
 +          scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
 +          scanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY);
 +          scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange());
 +          
 +          RowIterator ri = new RowIterator(scanner);
 +          
 +          int tabletsToWaitFor = 0;
 +          int tabletCount = 0;
 +          
 +          Text ert = ByteBufferUtil.toText(endRow);
 +          
 +          while (ri.hasNext()) {
 +            Iterator<Entry<Key,Value>> row = ri.next();
 +            long tabletFlushID = -1;
 +            int logs = 0;
 +            boolean online = false;
 +            
 +            TServerInstance server = null;
 +            
 +            Entry<Key,Value> entry = null;
 +            while (row.hasNext()) {
 +              entry = row.next();
 +              Key key = entry.getKey();
 +              
 +              if (Constants.METADATA_FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) {
 +                tabletFlushID = Long.parseLong(entry.getValue().toString());
 +              }
 +              
 +              if (Constants.METADATA_LOG_COLUMN_FAMILY.equals(key.getColumnFamily()))
 +                logs++;
 +              
 +              if (Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY.equals(key.getColumnFamily())) {
 +                online = true;
 +                server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
 +              }
 +              
 +            }
 +            
 +            // when tablet is not online and has no logs, there is no reason to wait for it
 +            if ((online || logs > 0) && tabletFlushID < flushID) {
 +              tabletsToWaitFor++;
 +              if (server != null)
 +                serversToFlush.add(server);
 +            }
 +            
 +            tabletCount++;
 +            
 +            Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow();
 +            if (tabletEndRow == null || (ert != null && tabletEndRow.compareTo(ert) >= 0))
 +              break;
 +          }
 +          
 +          if (tabletsToWaitFor == 0)
 +            break;
 +          
 +          // TODO detect case of table offline AND tablets w/ logs? - ACCUMULO-1296
 +          
 +          if (tabletCount == 0 && !Tables.exists(instance, tableId))
 +            throw new ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.NOTFOUND, null);
 +          
 +        } catch (AccumuloException e) {
 +          log.debug("Failed to scan !METADATA table to wait for flush " + tableId, e);
 +        } catch (TabletDeletedException tde) {
 +          log.debug("Failed to scan !METADATA table to wait for flush " + tableId, tde);
 +        } catch (AccumuloSecurityException e) {
 +          log.warn(e.getMessage(), e);
 +          throw new ThriftSecurityException();
 +        } catch (TableNotFoundException e) {
 +          log.error(e.getMessage(), e);
 +          throw new ThriftTableOperationException();
 +        }
 +      }
 +      
 +    }
 +    
 +    @Override
 +    public MasterMonitorInfo getMasterStats(TInfo info, TCredentials credentials) throws ThriftSecurityException, TException {
 +      final MasterMonitorInfo result = new MasterMonitorInfo();
 +      
 +      result.tServerInfo = new ArrayList<TabletServerStatus>();
 +      result.tableMap = new DefaultMap<String,TableInfo>(new TableInfo());
 +      for (Entry<TServerInstance,TabletServerStatus> serverEntry : tserverStatus.entrySet()) {
 +        final TabletServerStatus status = serverEntry.getValue();
 +        result.tServerInfo.add(status);
 +        for (Entry<String,TableInfo> entry : status.tableMap.entrySet()) {
 +          String table = entry.getKey();
 +          TableInfo summary = result.tableMap.get(table);
 +          Monitor.add(summary, entry.getValue());
 +        }
 +      }
 +      result.badTServers = new HashMap<String,Byte>();
 +      synchronized (badServers) {
 +        for (TServerInstance bad : badServers.keySet()) {
 +          result.badTServers.put(bad.hostPort(), TabletServerState.UNRESPONSIVE.getId());
 +        }
 +      }
 +      result.state = getMasterState();
 +      result.goalState = getMasterGoalState();
 +      result.unassignedTablets = Master.this.displayUnassigned();
 +      result.serversShuttingDown = new HashSet<String>();
 +      synchronized (serversToShutdown) {
 +        for (TServerInstance server : serversToShutdown)
 +          result.serversShuttingDown.add(server.hostPort());
 +      }
 +      DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS);
 +      result.deadTabletServers = obit.getList();
 +      return result;
 +    }
 +    
 +    private void alterTableProperty(TCredentials c, String tableName, String property, String value, TableOperation op) throws ThriftSecurityException,
 +        ThriftTableOperationException {
 +      final String tableId = checkTableId(tableName, op);
 +      if (!security.canAlterTable(c, tableId))
 +        throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +      
 +      try {
 +        if (value == null || value.isEmpty()) {
 +          TablePropUtil.removeTableProperty(tableId, property);
 +        } else if (!TablePropUtil.setTableProperty(tableId, property, value)) {
 +          throw new Exception("Invalid table property.");
 +        }
 +      } catch (KeeperException.NoNodeException e) {
 +        // race condition... table no longer exists?  This call will throw an exception if the table was deleted:
 +        checkTableId(tableName, op);
 +        log.info("Error altering table property", e);
 +        throw new ThriftTableOperationException(tableId, tableName, op, TableOperationExceptionType.OTHER, "Problem altering table property");
 +      } catch (Exception e) {
 +        log.error("Problem altering table property", e);
 +        throw new ThriftTableOperationException(tableId, tableName, op, TableOperationExceptionType.OTHER, "Problem altering table property");
 +      }
 +    }
 +    
 +    @Override
 +    public void removeTableProperty(TInfo info, TCredentials credentials, String tableName, String property) throws ThriftSecurityException,
 +        ThriftTableOperationException, TException {
 +      alterTableProperty(credentials, tableName, property, null, TableOperation.REMOVE_PROPERTY);
 +    }
 +    
 +    @Override
 +    public void setTableProperty(TInfo info, TCredentials credentials, String tableName, String property, String value) throws ThriftSecurityException,
 +        ThriftTableOperationException, TException {
 +      alterTableProperty(credentials, tableName, property, value, TableOperation.SET_PROPERTY);
 +    }
 +    
 +    @Override
 +    public void shutdown(TInfo info, TCredentials c, boolean stopTabletServers) throws ThriftSecurityException, TException {
 +      security.canPerformSystemActions(c);
 +      Master.this.shutdown(stopTabletServers);
 +    }
 +    
 +    @Override
 +    public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer, boolean force) throws ThriftSecurityException, TException {
 +      security.canPerformSystemActions(c);
 +      
 +      final InetSocketAddress addr = AddressUtil.parseAddress(tabletServer, Property.TSERV_CLIENTPORT);
 +      final String addrString = org.apache.accumulo.core.util.AddressUtil.toString(addr);
 +      final TServerInstance doomed = tserverSet.find(addrString);
 +      if (!force) {
 +        final TServerConnection server = tserverSet.getConnection(doomed);
 +        if (server == null) {
 +          log.warn("No server found for name " + tabletServer);
 +          return;
 +        }
 +      }
 +      
 +      long tid = fate.startTransaction();
 +      fate.seedTransaction(tid, new TraceRepo<Master>(new ShutdownTServer(doomed, force)), false);
 +      fate.waitForCompletion(tid);
 +      fate.delete(tid);
 +    }
 +    
 +    @Override
 +    public void reportSplitExtent(TInfo info, TCredentials credentials, String serverName, TabletSplit split) throws TException {
 +      KeyExtent oldTablet = new KeyExtent(split.oldTablet);
 +      if (migrations.remove(oldTablet) != null) {
 +        log.info("Canceled migration of " + split.oldTablet);
 +      }
 +      for (TServerInstance instance : tserverSet.getCurrentServers()) {
 +        if (serverName.equals(instance.hostPort())) {
 +          nextEvent.event("%s reported split %s, %s", serverName, new KeyExtent(split.newTablets.get(0)), new KeyExtent(split.newTablets.get(1)));
 +          return;
 +        }
 +      }
 +      log.warn("Got a split from a server we don't recognize: " + serverName);
 +    }
 +    
 +    @Override
 +    public void reportTabletStatus(TInfo info, TCredentials credentials, String serverName, TabletLoadState status, TKeyExtent ttablet) throws TException {
 +      KeyExtent tablet = new KeyExtent(ttablet);
 +      
 +      switch (status) {
 +        case LOAD_FAILURE:
 +          log.error(serverName + " reports assignment failed for tablet " + tablet);
 +          break;
 +        case LOADED:
 +          nextEvent.event("tablet %s was loaded on %s", tablet, serverName);
 +          break;
 +        case UNLOADED:
 +          nextEvent.event("tablet %s was unloaded from %s", tablet, serverName);
 +          break;
 +        case UNLOAD_ERROR:
 +          log.error(serverName + " reports unload failed for tablet " + tablet);
 +          break;
 +        case UNLOAD_FAILURE_NOT_SERVING:
 +          if (log.isTraceEnabled()) {
 +            log.trace(serverName + " reports unload failed: not serving tablet, could be a split: " + tablet);
 +          }
 +          break;
 +        case CHOPPED:
 +          nextEvent.event("tablet %s chopped", tablet);
 +          break;
 +      }
 +    }
 +    
 +    @Override
 +    public void setMasterGoalState(TInfo info, TCredentials c, MasterGoalState state) throws ThriftSecurityException, TException {
 +      security.canPerformSystemActions(c);
 +      
 +      Master.this.setMasterGoalState(state);
 +    }
 +    
 +    private void updatePlugins(String property) {
 +      if (property.equals(Property.MASTER_TABLET_BALANCER.getKey())) {
 +        TabletBalancer balancer = createInstanceFromPropertyName(instance.getConfiguration(), Property.MASTER_TABLET_BALANCER, TabletBalancer.class,
 +            new DefaultLoadBalancer());
 +        balancer.init(serverConfig);
 +        tabletBalancer = balancer;
 +        log.info("tablet balancer changed to " + tabletBalancer.getClass().getName());
 +      }
 +    }
 +    
 +    @Override
 +    public void removeSystemProperty(TInfo info, TCredentials c, String property) throws ThriftSecurityException, TException {
 +      security.canPerformSystemActions(c);
 +      
 +      try {
 +        SystemPropUtil.removeSystemProperty(property);
 +        updatePlugins(property);
 +      } catch (Exception e) {
 +        log.error("Problem removing config property in zookeeper", e);
 +        throw new TException(e.getMessage());
 +      }
 +    }
 +    
 +    @Override
 +    public void setSystemProperty(TInfo info, TCredentials c, String property, String value) throws ThriftSecurityException, TException {
 +      security.canPerformSystemActions(c);
 +      
 +      try {
 +        SystemPropUtil.setSystemProperty(property, value);
 +        updatePlugins(property);
 +      } catch (Exception e) {
 +        log.error("Problem setting config property in zookeeper", e);
 +        throw new TException(e.getMessage());
 +      }
 +    }
 +    
 +    private void authenticate(TCredentials c) throws ThriftSecurityException {
 +      if (!security.authenticateUser(c, c))
 +        throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS);
 +      
 +    }
 +    
 +    @Override
 +    public long beginTableOperation(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      authenticate(credentials);
 +      return fate.startTransaction();
 +    }
 +    
 +    @Override
 +    public void executeTableOperation(TInfo tinfo, TCredentials c, long opid, org.apache.accumulo.core.master.thrift.TableOperation op,
 +        List<ByteBuffer> arguments, Map<String,String> options, boolean autoCleanup) throws ThriftSecurityException, ThriftTableOperationException, TException {
 +      authenticate(c);
 +      
 +      switch (op) {
 +        case CREATE: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          if (!security.canCreateTable(c))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          checkNotMetadataTable(tableName, TableOperation.CREATE);
 +          checkTableName(tableName, TableOperation.CREATE);
 +          
 +          org.apache.accumulo.core.client.admin.TimeType timeType = org.apache.accumulo.core.client.admin.TimeType.valueOf(ByteBufferUtil.toString(arguments
 +              .get(1)));
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new CreateTable(c.getPrincipal(), tableName, timeType, options)), autoCleanup);
 +          
 +          break;
 +        }
 +        case RENAME: {
 +          String oldTableName = ByteBufferUtil.toString(arguments.get(0));
 +          String newTableName = ByteBufferUtil.toString(arguments.get(1));
 +          
 +          String tableId = checkTableId(oldTableName, TableOperation.RENAME);
 +          checkNotMetadataTable(oldTableName, TableOperation.RENAME);
 +          checkNotMetadataTable(newTableName, TableOperation.RENAME);
 +          checkTableName(newTableName, TableOperation.RENAME);
 +          if (!security.canRenameTable(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new RenameTable(tableId, oldTableName, newTableName)), autoCleanup);
 +          
 +          break;
 +        }
 +        case CLONE: {
 +          String srcTableId = ByteBufferUtil.toString(arguments.get(0));
 +          String tableName = ByteBufferUtil.toString(arguments.get(1));
 +          
 +          checkNotMetadataTable(tableName, TableOperation.CLONE);
 +          checkTableName(tableName, TableOperation.CLONE);
 +          if (!security.canCloneTable(c, srcTableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          Map<String,String> propertiesToSet = new HashMap<String,String>();
 +          Set<String> propertiesToExclude = new HashSet<String>();
 +          
 +          for (Entry<String,String> entry : options.entrySet()) {
 +            if (entry.getKey().startsWith(TableOperationsImpl.CLONE_EXCLUDE_PREFIX)) {
 +              propertiesToExclude.add(entry.getKey().substring(TableOperationsImpl.CLONE_EXCLUDE_PREFIX.length()));
 +              continue;
 +            }
 +            
 +            if (!TablePropUtil.isPropertyValid(entry.getKey(), entry.getValue())) {
 +              throw new ThriftTableOperationException(null, tableName, TableOperation.CLONE, TableOperationExceptionType.OTHER, "Property or value not valid "
 +                  + entry.getKey() + "=" + entry.getValue());
 +            }
 +            
 +            propertiesToSet.put(entry.getKey(), entry.getValue());
 +          }
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new CloneTable(c.getPrincipal(), srcTableId, tableName, propertiesToSet, propertiesToExclude)),
 +              autoCleanup);
 +          
 +          break;
 +        }
 +        case DELETE: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          final String tableId = checkTableId(tableName, TableOperation.DELETE);
 +          checkNotMetadataTable(tableName, TableOperation.DELETE);
 +          if (!security.canDeleteTable(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new DeleteTable(tableId)), autoCleanup);
 +          break;
 +        }
 +        case ONLINE: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          final String tableId = checkTableId(tableName, TableOperation.ONLINE);
 +          checkNotMetadataTable(tableName, TableOperation.ONLINE);
 +          
 +          if (!security.canOnlineOfflineTable(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new ChangeTableState(tableId, TableOperation.ONLINE)), autoCleanup);
 +          break;
 +        }
 +        case OFFLINE: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          final String tableId = checkTableId(tableName, TableOperation.OFFLINE);
 +          checkNotMetadataTable(tableName, TableOperation.OFFLINE);
 +          
 +          if (!security.canOnlineOfflineTable(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new ChangeTableState(tableId, TableOperation.OFFLINE)), autoCleanup);
 +          break;
 +        }
 +        case MERGE: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          Text startRow = ByteBufferUtil.toText(arguments.get(1));
 +          Text endRow = ByteBufferUtil.toText(arguments.get(2));
 +          final String tableId = checkTableId(tableName, TableOperation.MERGE);
 +          if (tableName.equals(Constants.METADATA_TABLE_NAME)) {
 +            if (startRow.compareTo(new Text("0")) < 0) {
 +              startRow = new Text("0");
 +              if (endRow.getLength() != 0 && endRow.compareTo(startRow) < 0)
 +                throw new ThriftTableOperationException(null, tableName, TableOperation.MERGE, TableOperationExceptionType.OTHER,
 +                    "end-row specification is in the root tablet, which cannot be merged or split");
 +            }
 +          }
 +          log.debug("Creating merge op: " + tableId + " " + startRow + " " + endRow);
 +          
 +          if (!security.canMerge(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new TableRangeOp(MergeInfo.Operation.MERGE, tableId, startRow, endRow)), autoCleanup);
 +          break;
 +        }
 +        case DELETE_RANGE: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          Text startRow = ByteBufferUtil.toText(arguments.get(1));
 +          Text endRow = ByteBufferUtil.toText(arguments.get(2));
 +          
 +          final String tableId = checkTableId(tableName, TableOperation.DELETE_RANGE);
 +          checkNotMetadataTable(tableName, TableOperation.DELETE_RANGE);
 +          
 +          if (!security.canDeleteRange(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new TableRangeOp(MergeInfo.Operation.DELETE, tableId, startRow, endRow)), autoCleanup);
 +          break;
 +        }
 +        case BULK_IMPORT: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          String dir = ByteBufferUtil.toString(arguments.get(1));
 +          String failDir = ByteBufferUtil.toString(arguments.get(2));
 +          boolean setTime = Boolean.parseBoolean(ByteBufferUtil.toString(arguments.get(3)));
 +          
 +          final String tableId = checkTableId(tableName, TableOperation.BULK_IMPORT);
 +          checkNotMetadataTable(tableName, TableOperation.BULK_IMPORT);
 +          
 +          if (!security.canBulkImport(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new BulkImport(tableId, dir, failDir, setTime)), autoCleanup);
 +          break;
 +        }
 +        case COMPACT: {
 +          String tableId = ByteBufferUtil.toString(arguments.get(0));
 +          byte[] startRow = ByteBufferUtil.toBytes(arguments.get(1));
 +          byte[] endRow = ByteBufferUtil.toBytes(arguments.get(2));
 +          List<IteratorSetting> iterators = IteratorUtil.decodeIteratorSettings(ByteBufferUtil.toBytes(arguments.get(3)));
 +          
 +          if (!security.canCompact(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new CompactRange(tableId, startRow, endRow, iterators)), autoCleanup);
 +          break;
 +        }
 +        case COMPACT_CANCEL: {
 +          String tableId = ByteBufferUtil.toString(arguments.get(0));
 +          
 +          if (!security.canCompact(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new CancelCompactions(tableId)), autoCleanup);
 +          break;
 +        }
 +        case IMPORT: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          String exportDir = ByteBufferUtil.toString(arguments.get(1));
 +          
 +          if (!security.canImport(c))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          checkNotMetadataTable(tableName, TableOperation.CREATE);
 +          checkTableName(tableName, TableOperation.CREATE);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new ImportTable(c.getPrincipal(), tableName, exportDir)), autoCleanup);
 +          break;
 +        }
 +        case EXPORT: {
 +          String tableName = ByteBufferUtil.toString(arguments.get(0));
 +          String exportDir = ByteBufferUtil.toString(arguments.get(1));
 +          
 +          String tableId = checkTableId(tableName, TableOperation.EXPORT);
 +          
 +          if (!security.canExport(c, tableId))
 +            throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          
 +          checkNotMetadataTable(tableName, TableOperation.EXPORT);
 +          
 +          fate.seedTransaction(opid, new TraceRepo<Master>(new ExportTable(tableName, tableId, exportDir)), autoCleanup);
 +          break;
 +        }
 +        
 +        default:
 +          throw new UnsupportedOperationException();
 +      }
 +      
 +    }
 +    
 +    @Override
 +    public String waitForTableOperation(TInfo tinfo, TCredentials credentials, long opid) throws ThriftSecurityException, ThriftTableOperationException,
 +        TException {
 +      authenticate(credentials);
 +      
 +      TStatus status = fate.waitForCompletion(opid);
 +      if (status == TStatus.FAILED) {
 +        Exception e = fate.getException(opid);
 +        if (e instanceof ThriftTableOperationException)
 +          throw (ThriftTableOperationException) e;
 +        if (e instanceof ThriftSecurityException)
 +          throw (ThriftSecurityException) e;
 +        else if (e instanceof RuntimeException)
 +          throw (RuntimeException) e;
 +        else
 +          throw new RuntimeException(e);
 +      }
 +      
 +      String ret = fate.getReturn(opid);
 +      if (ret == null)
 +        ret = ""; // thrift does not like returning null
 +      return ret;
 +    }
 +    
 +    @Override
 +    public void finishTableOperation(TInfo tinfo, TCredentials credentials, long opid) throws ThriftSecurityException, TException {
 +      authenticate(credentials);
 +      fate.delete(opid);
 +    }
 +  }
 +  
 +  public MergeInfo getMergeInfo(KeyExtent tablet) {
 +    if (tablet.isRootTablet())
 +      return new MergeInfo();
 +    return getMergeInfo(tablet.getTableId());
 +  }
 +  
 +  public MergeInfo getMergeInfo(Text tableId) {
 +    synchronized (mergeLock) {
 +      try {
 +        String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
 +        if (!ZooReaderWriter.getInstance().exists(path))
 +          return new MergeInfo();
 +        byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
 +        DataInputBuffer in = new DataInputBuffer();
 +        in.reset(data, data.length);
 +        MergeInfo info = new MergeInfo();
 +        info.readFields(in);
 +        return info;
 +      } catch (KeeperException.NoNodeException ex) {
 +        log.info("Error reading merge state, it probably just finished");
 +        return new MergeInfo();
 +      } catch (Exception ex) {
 +        log.warn("Unexpected error reading merge state", ex);
 +        return new MergeInfo();
 +      }
 +    }
 +  }
 +  
 +  public void setMergeState(MergeInfo info, MergeState state) throws IOException, KeeperException, InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + info.getRange().getTableId().toString() + "/merge";
 +      info.setState(state);
 +      if (state.equals(MergeState.NONE)) {
 +        ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP);
 +      } else {
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try {
 +          info.write(out);
 +        } catch (IOException ex) {
 +          throw new RuntimeException("Unlikely", ex);
 +        }
 +        ZooReaderWriter.getInstance().putPersistentData(path, out.getData(),
 +            state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE);
 +      }
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s set to %s", info.getRange(), state);
 +  }
 +  
 +  public void clearMergeState(Text tableId) throws IOException, KeeperException, InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
 +      ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP);
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s cleared", tableId);
 +  }
 +  
 +  private void setMasterGoalState(MasterGoalState state) {
 +    try {
 +      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(),
 +          NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception ex) {
 +      log.error("Unable to set master goal state in zookeeper");
 +    }
 +  }
 +  
 +  MasterGoalState getMasterGoalState() {
 +    while (true)
 +      try {
 +        byte[] data = ZooReaderWriter.getInstance().getData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, null);
 +        return MasterGoalState.valueOf(new String(data));
 +      } catch (Exception e) {
 +        log.error("Problem getting real goal state: " + e);
 +        UtilWaitThread.sleep(1000);
 +      }
 +  }
 +  
 +  private void shutdown(boolean stopTabletServers) {
 +    if (stopTabletServers) {
 +      setMasterGoalState(MasterGoalState.CLEAN_STOP);
 +      EventCoordinator.Listener eventListener = nextEvent.getListener();
 +      do {
 +        waitAround(eventListener);
 +      } while (tserverSet.size() > 0);
 +    }
 +    setMasterState(MasterState.STOP);
 +  }
 +  
 +  public boolean hasCycled(long time) {
 +    for (TabletGroupWatcher watcher : watchers) {
 +      if (watcher.stats.lastScanFinished() < time)
 +        return false;
 +    }
 +    
 +    return true;
 +  }
 +  
 +  public void clearMigrations(String tableId) {
 +    synchronized (migrations) {
 +      Iterator<KeyExtent> iterator = migrations.keySet().iterator();
 +      while (iterator.hasNext()) {
 +        KeyExtent extent = iterator.next();
 +        if (extent.getTableId().toString().equals(tableId)) {
 +          iterator.remove();
 +        }
 +      }
 +    }
 +  }
 +  
 +  static enum TabletGoalState {
 +    HOSTED, UNASSIGNED, DELETED
 +  };
 +  
 +  TabletGoalState getSystemGoalState(TabletLocationState tls) {
 +    switch (getMasterState()) {
 +      case NORMAL:
 +        return TabletGoalState.HOSTED;
 +      case HAVE_LOCK: // fall-through intended
 +      case INITIAL: // fall-through intended
 +      case SAFE_MODE:
 +        if (tls.extent.isMeta())
 +          return TabletGoalState.HOSTED;
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_METADATA_TABLETS:
 +        if (tls.extent.isRootTablet())
 +          return TabletGoalState.HOSTED;
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_ROOT_TABLET:
 +        return TabletGoalState.UNASSIGNED;
 +      case STOP:
 +        return TabletGoalState.UNASSIGNED;
 +    }
 +    // unreachable
 +    return TabletGoalState.HOSTED;
 +  }
 +  
 +  TabletGoalState getTableGoalState(KeyExtent extent) {
 +    TableState tableState = TableManager.getInstance().getTableState(extent.getTableId().toString());
 +    if (tableState == null)
 +      return TabletGoalState.DELETED;
 +    switch (tableState) {
 +      case DELETING:
 +        return TabletGoalState.DELETED;
 +      case OFFLINE:
 +      case NEW:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        return TabletGoalState.HOSTED;
 +    }
 +  }
 +  
 +  TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) {
 +    KeyExtent extent = tls.extent;
 +    // Shutting down?
 +    TabletGoalState state = getSystemGoalState(tls);
 +    if (state == TabletGoalState.HOSTED) {
 +      if (tls.current != null && serversToShutdown.contains(tls.current)) {
 +        return TabletGoalState.UNASSIGNED;
 +      }
 +      // Handle merge transitions
 +      if (mergeInfo.getRange() != null) {
 +        log.debug("mergeInfo overlaps: " + extent + " " + mergeInfo.overlaps(extent));
 +        if (mergeInfo.overlaps(extent)) {
 +          switch (mergeInfo.getState()) {
 +            case NONE:
 +            case COMPLETE:
 +              break;
 +            case STARTED:
 +            case SPLITTING:
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_CHOPPED:
 +              if (tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) {
 +                if (tls.chopped)
 +                  return TabletGoalState.UNASSIGNED;
 +              } else {
 +                if (tls.chopped && tls.walogs.isEmpty())
 +                  return TabletGoalState.UNASSIGNED;
 +              }
 +              
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_OFFLINE:
 +            case MERGING:
 +              return TabletGoalState.UNASSIGNED;
 +          }
 +        }
 +      }
 +      
 +      // taking table offline?
 +      state = getTableGoalState(extent);
 +      if (state == TabletGoalState.HOSTED) {
 +        // Maybe this tablet needs to be migrated
 +        TServerInstance dest = migrations.get(extent);
 +        if (dest != null && tls.current != null && !dest.equals(tls.current)) {
 +          return TabletGoalState.UNASSIGNED;
 +        }
 +      }
 +    }
 +    return state;
 +  }
 +  
 +  private class TabletGroupWatcher extends Daemon {
 +    
 +    final TabletStateStore store;
 +    final TabletGroupWatcher dependentWatcher;
 +    
 +    final TableStats stats = new TableStats();
 +    
 +    TabletGroupWatcher(TabletStateStore store, TabletGroupWatcher dependentWatcher) {
 +      this.store = store;
 +      this.dependentWatcher = dependentWatcher;
 +    }
 +    
 +    Map<Text,TableCounts> getStats() {
 +      return stats.getLast();
 +    }
 +    
 +    TableCounts getStats(Text tableId) {
 +      return stats.getLast(tableId);
 +    }
 +    
 +    @Override
 +    public void run() {
 +      
 +      Thread.currentThread().setName("Watching " + store.name());
 +      int[] oldCounts = new int[TabletState.values().length];
 +      EventCoordinator.Listener eventListener = nextEvent.getListener();
 +      
 +      while (stillMaster()) {
 +        int totalUnloaded = 0;
 +        int unloaded = 0;
 +        try {
 +          Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
 +          
 +          // Get the current status for the current list of tservers
 +          SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
 +          for (TServerInstance entry : tserverSet.getCurrentServers()) {
 +            currentTServers.put(entry, tserverStatus.get(entry));
 +          }
 +          
 +          if (currentTServers.size() == 0) {
 +            eventListener.waitForEvents(TIME_TO_WAIT_BETWEEN_SCANS);
 +            continue;
 +          }
 +          
 +          // Don't move tablets to servers that are shutting down
 +          SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
 +          destinations.keySet().removeAll(serversToShutdown);
 +          
 +          List<Assignment> assignments = new ArrayList<Assignment>();
 +          List<Assignment> assigned = new ArrayList<Assignment>();
 +          List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
 +          Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
 +          
 +          int[] counts = new int[TabletState.values().length];
 +          stats.begin();
 +          // Walk through the tablets in our store, and work tablets
 +          // towards their goal
 +          for (TabletLocationState tls : store) {
 +            if (tls == null) {
 +              continue;
 +            }
 +            // ignore entries for tables that do not exist in zookeeper
 +            if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
 +              continue;
 +            
 +            // Don't overwhelm the tablet servers with work
 +            if (unassigned.size() + unloaded > MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
 +              flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
 +              assignments.clear();
 +              assigned.clear();
 +              assignedToDeadServers.clear();
 +              unassigned.clear();
 +              unloaded = 0;
 +              eventListener.waitForEvents(TIME_TO_WAIT_BETWEEN_SCANS);
 +            }
 +            Text tableId = tls.extent.getTableId();
 +            MergeStats mergeStats = mergeStatsCache.get(tableId);
 +            if (mergeStats == null) {
 +              mergeStatsCache.put(tableId, mergeStats = new MergeStats(getMergeInfo(tls.extent)));
 +            }
 +            TabletGoalState goal = getGoalState(tls, mergeStats.getMergeInfo());
 +            TServerInstance server = tls.getServer();
 +            TabletState state = tls.getState(currentTServers.keySet());
 +            stats.update(tableId, state);
 +            mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
 +            sendChopRequest(mergeStats.getMergeInfo(), state, tls);
 +            sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
 +            
 +            // Always follow through with assignments
 +            if (state == TabletState.ASSIGNED) {
 +              goal = TabletGoalState.HOSTED;
 +            }
 +            
 +            // if we are shutting down all the tabletservers, we have to do it in order
 +            if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
 +              if (serversToShutdown.equals(currentTServers.keySet())) {
 +                if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
 +                  goal = TabletGoalState.HOSTED;
 +                }
 +              }
 +            }
 +            
 +            if (goal == TabletGoalState.HOSTED) {
 +              if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
 +                if (recoveryManager.recoverLogs(tls.extent, tls.walogs))
 +                  continue;
 +              }
 +              switch (state) {
 +                case HOSTED:
 +                  if (server.equals(migrations.get(tls.extent)))
 +                    migrations.remove(tls.extent);
 +                  break;
 +                case ASSIGNED_TO_DEAD_SERVER:
 +                  assignedToDeadServers.add(tls);
 +                  if (server.equals(migrations.get(tls.extent)))
 +                    migrations.remove(tls.extent);
 +                  // log.info("Current servers " + currentTServers.keySet());
 +                  break;
 +                case UNASSIGNED:
 +                  // maybe it's a finishing migration
 +                  TServerInstance dest = migrations.get(tls.extent);
 +                  if (dest != null) {
 +                    // if destination is still good, assign it
 +                    if (destinations.keySet().contains(dest)) {
 +                      assignments.add(new Assignment(tls.extent, dest));
 +                    } else {
 +                      // get rid of this migration
 +                      migrations.remove(tls.extent);
 +                      unassigned.put(tls.extent, server);
 +                    }
 +                  } else {
 +                    unassigned.put(tls.extent, server);
 +                  }
 +                  break;
 +                case ASSIGNED:
 +                  // Send another reminder
 +                  assigned.add(new Assignment(tls.extent, tls.future));
 +                  break;
 +              }
 +            } else {
 +              switch (state) {
 +                case UNASSIGNED:
 +                  break;
 +                case ASSIGNED_TO_DEAD_SERVER:
 +                  assignedToDeadServers.add(tls);
 +                  // log.info("Current servers " + currentTServers.keySet());
 +                  break;
 +                case HOSTED:
 +                  TServerConnection conn = tserverSet.getConnection(server);
 +                  if (conn != null) {
 +                    conn.unloadTablet(masterLock, tls.extent, goal != TabletGoalState.DELETED);
 +                    unloaded++;
 +                    totalUnloaded++;
 +                  } else {
 +                    log.warn("Could not connect to server " + server);
 +                  }
 +                  break;
 +                case ASSIGNED:
 +                  break;
 +              }
 +            }
 +            counts[state.ordinal()]++;
 +          }
 +          
 +          flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
 +          
 +          // provide stats after flushing changes to avoid race conditions w/ delete table
 +          stats.end();
 +          
 +          // Report changes
 +          for (TabletState state : TabletState.values()) {
 +            int i = state.ordinal();
 +            if (counts[i] > 0 && counts[i] != oldCounts[i]) {
 +              nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
 +            }
 +          }
 +          log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
 +          oldCounts = counts;
 +          if (totalUnloaded > 0) {
 +            nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
 +          }
 +          
 +          updateMergeState(mergeStatsCache);
 +          
 +          log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
 +          eventListener.waitForEvents(TIME_TO_WAIT_BETWEEN_SCANS);
 +        } catch (Exception ex) {
 +          log.error("Error processing table state for store " + store.name(), ex);
 +          UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS);
 +        }
 +      }
 +    }
 +    
 +    private int assignedOrHosted() {
 +      int result = 0;
 +      for (TableCounts counts : stats.getLast().values()) {
 +        result += counts.assigned() + counts.hosted();
 +      }
 +      return result;
 +    }
 +    
 +    private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
 +      // Already split?
 +      if (!info.getState().equals(MergeState.SPLITTING))
 +        return;
 +      // Merges don't split
 +      if (!info.isDelete())
 +        return;
 +      // Online and ready to split?
 +      if (!state.equals(TabletState.HOSTED))
 +        return;
 +      // Does this extent cover the end points of the delete?
 +      KeyExtent range = info.getRange();
 +      if (tls.extent.overlaps(range)) {
 +        for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
 +          if (splitPoint == null)
 +            continue;
 +          if (!tls.extent.contains(splitPoint))
 +            continue;
 +          if (splitPoint.equals(tls.extent.getEndRow()))
 +            continue;
 +          if (splitPoint.equals(tls.extent.getPrevEndRow()))
 +            continue;
 +          try {
 +            TServerConnection conn;
 +            conn = tserverSet.getConnection(tls.current);
 +            if (conn != null) {
 +              log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
 +              conn.splitTablet(masterLock, tls.extent, splitPoint);
 +            } else {
 +              log.warn("Not connected to server " + tls.current);
 +            }
 +          } catch (NotServingTabletException e) {
 +            log.debug("Error asking tablet server to split a tablet: " + e);
 +          } catch (Exception e) {
 +            log.warn("Error asking tablet server to split a tablet: " + e);
 +          }
 +        }
 +      }
 +    }
 +    
 +    private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
 +      // Don't bother if we're in the wrong state
 +      if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
 +        return;
 +      // Tablet must be online
 +      if (!state.equals(TabletState.HOSTED))
 +        return;
 +      // Tablet isn't already chopped
 +      if (tls.chopped)
 +        return;
 +      // Tablet ranges intersect
 +      if (info.needsToBeChopped(tls.extent)) {
 +        TServerConnection conn;
 +        try {
 +          conn = tserverSet.getConnection(tls.current);
 +          if (conn != null) {
 +            log.info("Asking " + tls.current + " to chop " + tls.extent);
 +            conn.chop(masterLock, tls.extent);
 +          } else {
 +            log.warn("Could not connect to server " + tls.current);
 +          }
 +        } catch (TException e) {
 +          log.warn("Communications error asking tablet server to chop a tablet");
 +        }
 +      }
 +    }
 +    
 +    private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
 +      for (MergeStats stats : mergeStatsCache.values()) {
 +        try {
 +          MergeState update = stats.nextMergeState(getConnector(), Master.this);
 +          // when next state is MERGING, its important to persist this before
 +          // starting the merge... the verification check that is done before
 +          // moving into the merging state could fail if merge starts but does
 +          // not finish
 +          if (update == MergeState.COMPLETE)
 +            update = MergeState.NONE;
 +          if (update != stats.getMergeInfo().getState()) {
 +            setMergeState(stats.getMergeInfo(), update);
 +          }
 +          
 +          if (update == MergeState.MERGING) {
 +            try {
 +              if (stats.getMergeInfo().isDelete()) {
 +                deleteTablets(stats.getMergeInfo());
 +              } else {
 +                mergeMetadataRecords(stats.getMergeInfo());
 +              }
 +              setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
 +            } catch (Exception ex) {
 +              log.error("Unable merge metadata table records", ex);
 +            }
 +          }
 +        } catch (Exception ex) {
 +          log.error("Unable to update merge state for merge " + stats.getMergeInfo().getRange(), ex);
 +        }
 +      }
 +    }
 +    
 +    private void deleteTablets(MergeInfo info) throws AccumuloException {
 +      KeyExtent range = info.getRange();
 +      log.debug("Deleting tablets for " + range);
 +      char timeType = '\0';
 +      KeyExtent followingTablet = null;
 +      if (range.getEndRow() != null) {
 +        Key nextExtent = new Key(range.getEndRow()).followingKey(PartialKey.ROW);
 +        followingTablet = getHighTablet(new KeyExtent(range.getTableId(), nextExtent.getRow(), range.getEndRow()));
 +        log.debug("Found following tablet " + followingTablet);
 +      }
 +      try {
 +        Connector conn = getConnector();
 +        Text start = range.getPrevEndRow();
 +        if (start == null) {
 +          start = new Text();
 +        }
 +        log.debug("Making file deletion entries for " + range);
 +        Range deleteRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, KeyExtent.getMetadataEntry(range.getTableId(),
 +            range.getEndRow()), true);
 +        Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 +        scanner.setRange(deleteRange);
 +        Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);
 +        Constants.METADATA_TIME_COLUMN.fetch(scanner);
 +        scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
 +        scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
 +        Set<String> datafiles = new TreeSet<String>();
 +        for (Entry<Key,Value> entry : scanner) {
 +          Key key = entry.getKey();
 +          if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
 +            datafiles.add(key.getColumnQualifier().toString());
 +            if (datafiles.size() > 1000) {
 +              MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
 +              datafiles.clear();
 +            }
 +          } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
 +            timeType = entry.getValue().toString().charAt(0);
 +          } else if (key.compareColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
 +            throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
 +          } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
 +            datafiles.add(entry.getValue().toString());
 +            if (datafiles.size() > 1000) {
 +              MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
 +              datafiles.clear();
 +            }
 +          }
 +        }
 +        MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
 +        BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
 +        try {
 +          deleteTablets(deleteRange, bw, conn);
 +        } finally {
 +          bw.close();
 +        }
 +        
 +        if (followingTablet != null) {
 +          log.debug("Updating prevRow of " + followingTablet + " to " + range.getPrevEndRow());
 +          bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
 +          try {
 +            Mutation m = new Mutation(followingTablet.getMetadataEntry());
 +            Constants.METADATA_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(range.getPrevEndRow()));
 +            Constants.METADATA_CHOPPED_COLUMN.putDelete(m);
 +            bw.addMutation(m);
 +            bw.flush();
 +          } finally {
 +            bw.close();
 +          }
 +        } else {
 +          // Recreate the default tablet to hold the end of the table
 +          log.debug("Recreating the last tablet to point to " + range.getPrevEndRow());
 +          MetadataTable.addTablet(new KeyExtent(range.getTableId(), null, range.getPrevEndRow()), Constants.DEFAULT_TABLET_LOCATION,
 +              SecurityConstants.getSystemCredentials(), timeType, masterLock);
 +        }
 +      } catch (Exception ex) {
 +        throw new AccumuloException(ex);
 +      }
 +    }
 +    
 +    private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
 +      KeyExtent range = info.getRange();
 +      log.debug("Merging metadata for " + range);
 +      KeyExtent stop = getHighTablet(range);
 +      log.debug("Highest tablet is " + stop);
 +      Value firstPrevRowValue = null;
 +      Text stopRow = stop.getMetadataEntry();
 +      Text start = range.getPrevEndRow();
 +      if (start == null) {
 +        start = new Text();
 +      }
 +      Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
 +      if (range.isMeta())
 +        scanRange = scanRange.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE);
 +      
 +      BatchWriter bw = null;
 +      try {
 +        long fileCount = 0;
 +        Connector conn = getConnector();
 +        // Make file entries in highest tablet
 +        bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
 +        Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 +        scanner.setRange(scanRange);
 +        Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
 +        Constants.METADATA_TIME_COLUMN.fetch(scanner);
 +        Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);
 +        scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
 +        Mutation m = new Mutation(stopRow);
 +        String maxLogicalTime = null;
 +        for (Entry<Key,Value> entry : scanner) {
 +          Key key = entry.getKey();
 +          Value value = entry.getValue();
 +          if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
 +            m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
 +            fileCount++;
 +          } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
 +            log.debug("prevRow entry for lowest tablet is " + value);
 +            firstPrevRowValue = new Value(value);
 +          } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
 +            maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
 +          } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
 +            if (!range.isMeta())
 +              bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
 +          }
 +        }
 +        
 +        // read the logical time from the last tablet in the merge range, it is not included in
 +        // the loop above
 +        scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 +        Range last = new Range(stopRow);
 +        if (range.isMeta())
 +          last = last.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE);
 +        scanner.setRange(last);
 +        Constants.METADATA_TIME_COLUMN.fetch(scanner);
 +        for (Entry<Key,Value> entry : scanner) {
 +          if (Constants.METADATA_TIME_COLUMN.hasColumns(entry.getKey())) {
 +            maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
 +          }
 +        }
 +        
 +        if (maxLogicalTime != null)
 +          Constants.METADATA_TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
 +        
 +        if (!m.getUpdates().isEmpty()) {
 +          bw.addMutation(m);
 +        }
 +        
 +        bw.flush();
 +        
 +        log.debug("Moved " + fileCount + " files to " + stop);
 +        
 +        if (firstPrevRowValue == null) {
 +          log.debug("tablet already merged");
 +          return;
 +        }
 +        
 +        stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
 +        Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
 +        log.debug("Setting the prevRow for last tablet: " + stop);
 +        bw.addMutation(updatePrevRow);
 +        bw.flush();
 +        
 +        deleteTablets(scanRange, bw, conn);
 +        
 +        // Clean-up the last chopped marker
 +        m = new Mutation(stopRow);
 +        Constants.METADATA_CHOPPED_COLUMN.putDelete(m);
 +        bw.addMutation(m);
 +        bw.flush();
 +        
 +      } catch (Exception ex) {
 +        throw new AccumuloException(ex);
 +      } finally {
 +        if (bw != null)
 +          try {
 +            bw.close();
 +          } catch (Exception ex) {
 +            throw new AccumuloException(ex);
 +          }
 +      }
 +    }
 +    
 +    private void deleteTablets(Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
 +      Scanner scanner;
 +      Mutation m;
 +      // Delete everything in the other tablets
 +      // group all deletes into tablet into one mutation, this makes tablets
 +      // either disappear entirely or not all.. this is important for the case
 +      // where the process terminates in the loop below...
 +      scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 +      log.debug("Deleting range " + scanRange);
 +      scanner.setRange(scanRange);
 +      RowIterator rowIter = new RowIterator(scanner);
 +      while (rowIter.hasNext()) {
 +        Iterator<Entry<Key,Value>> row = rowIter.next();
 +        m = null;
 +        while (row.hasNext()) {
 +          Entry<Key,Value> entry = row.next();
 +          Key key = entry.getKey();
 +          
 +          if (m == null)
 +            m = new Mutation(key.getRow());
 +          
 +          m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
 +          log.debug("deleting entry " + key);
 +        }
 +        bw.addMutation(m);
 +      }
 +      
 +      bw.flush();
 +    }
 +    
 +    private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
 +      try {
 +        Connector conn = getConnector();
 +        Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 +        Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
 +        KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
 +        scanner.setRange(new Range(start.getMetadataEntry(), null));
 +        Iterator<Entry<Key,Value>> iterator = scanner.iterator();
 +        if (!iterator.hasNext()) {
 +          throw new AccumuloException("No last tablet for a merge " + range);
 +        }
 +        Entry<Key,Value> entry = iterator.next();
 +        KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
 +        if (highTablet.getTableId() != range.getTableId()) {
 +          throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
 +        }
 +        return highTablet;
 +      } catch (Exception ex) {
 +        throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
 +      }
 +    }
 +    
 +    private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
 +        List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
 +      if (!assignedToDeadServers.isEmpty()) {
 +        int maxServersToShow = min(assignedToDeadServers.size(), 100);
 +        log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
 +        store.unassign(assignedToDeadServers);
 +        nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
 +      }
 +      
 +      if (!currentTServers.isEmpty()) {
 +        Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
 +        tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
 +        for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
 +          if (unassigned.containsKey(assignment.getKey())) {
 +            if (assignment.getValue() != null) {
 +              log.debug(store.name() + " assigning tablet " + assignment);
 +              assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
 +            }
 +          } else {
 +            log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
 +          }
 +        }
 +        if (!unassigned.isEmpty() && assignedOut.isEmpty())
 +          log.warn("Load balancer failed to assign any tablets");
 +      }
 +      
 +      if (assignments.size() > 0) {
 +        log.info(String.format("Assigning %d tablets", assignments.size()));
 +        store.setFutureLocations(assignments);
 +      }
 +      assignments.addAll(assigned);
 +      for (Assignment a : assignments) {
 +        TServerConnection conn = tserverSet.getConnection(a.server);
 +        if (conn != null) {
 +          conn.assignTablet(masterLock, a.tablet);
 +        } else {
 +          log.warn("Could not connect to server " + a.server);
 +        }
 +      }
 +    }
 +    
 +  }
 +  
 +  private class MigrationCleanupThread extends Daemon {
 +    
 +    @Override
 +    public void run() {
 +      setName("Migration Cleanup Thread");
 +      while (stillMaster()) {
 +        if (!migrations.isEmpty()) {
 +          try {
 +            cleanupMutations();
 +          } catch (Exception ex) {
 +            log.error("Error cleaning up migrations", ex);
 +          }
 +        }
 +        UtilWaitThread.sleep(TIME_BETWEEN_MIGRATION_CLEANUPS);
 +      }
 +    }
 +    
 +    // If a migrating tablet splits, and the tablet dies before sending the
 +    // master a message, the migration will refer to a non-existing tablet,
 +    // so it can never complete. Periodically scan the metadata table and
 +    // remove any migrating tablets that no longer exist.
 +    private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +      Connector connector = getConnector();
 +      Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 +      Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
 +      Set<KeyExtent> found = new HashSet<KeyExtent>();
 +      for (Entry<Key,Value> entry : scanner) {
 +        KeyExtent extent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
 +        if (migrations.containsKey(extent)) {
 +          found.add(extent);
 +        }
 +      }
 +      migrations.keySet().retainAll(found);
 +    }
 +  }
 +  
 +  private class StatusThread extends Daemon {
 +    
 +    @Override
 +    public void run() {
 +      setName("Status Thread");
 +      EventCoordinator.Listener eventListener = nextEvent.getListener();
 +      while (stillMaster()) {
 +        int count = 0;
 +        long wait = DEFAULT_WAIT_FOR_WATCHER;
 +        try {
 +          switch (getMasterGoalState()) {
 +            case NORMAL:
 +              setMasterState(MasterState.NORMAL);
 +              break;
 +            case SAFE_MODE:
 +              if (getMasterState() == MasterState.NORMAL) {
 +                setMasterState(MasterState.SAFE_MODE);
 +              }
 +              if (getMasterState() == MasterState.HAVE_LOCK) {
 +                setMasterState(MasterState.SAFE_MODE);
 +              }
 +              break;
 +            case CLEAN_STOP:
 +              switch (getMasterState()) {
 +                case NORMAL:
 +                  setMasterState(MasterState.SAFE_MODE);
 +                  break;
 +                case SAFE_MODE:
 +                  count = nonMetaDataTabletsAssignedOrHosted();
 +                  log.debug(String.format("There are %d non-metadata tablets assigned or hosted", count));
 +                  if (count == 0)
 +                    setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
 +                  break;
 +                case UNLOAD_METADATA_TABLETS:
 +                  count = assignedOrHosted(METADATA_TABLE_ID);
 +                  log.debug(String.format("There are %d metadata tablets assigned or hosted", count));
 +                  // Assumes last tablet hosted is the root tablet;
 +                  // it's possible
 +                  // that's not the case (root tablet is offline?)
 +                  if (count == 1)
 +                    setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +                  break;
 +                case UNLOAD_ROOT_TABLET:
 +                  count = assignedOrHosted(METADATA_TABLE_ID);
 +                  if (count > 0)
 +                    log.debug(String.format("The root tablet is still assigned or hosted"));
 +                  if (count == 0) {
 +                    Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
 +                    log.debug("stopping " + currentServers.size() + " tablet servers");
 +                    for (TServerInstance server : currentServers) {
 +                      try {
 +                        serversToShutdown.add(server);
 +                        tserverSet.getConnection(server).fastHalt(masterLock);
 +                      } catch (TException e) {
 +                        // its probably down, and we don't care
 +                      } finally {
 +                        tserverSet.remove(server);
 +                      }
 +                    }
 +                    if (currentServers.size() == 0)
 +                      setMasterState(MasterState.STOP);
 +                  }
 +                  break;
 +                default:
 +                  break;
 +              }
 +          }
 +          wa

<TRUNCATED>