You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:55:55 UTC

[16/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java b/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
deleted file mode 100644
index 9492bd7..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
+++ /dev/null
@@ -1,659 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master;
-
-import static java.lang.Math.min;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.master.Master.TabletGoalState;
-import org.apache.accumulo.server.master.state.Assignment;
-import org.apache.accumulo.server.master.state.DistributedStoreException;
-import org.apache.accumulo.server.master.state.MergeInfo;
-import org.apache.accumulo.server.master.state.MergeState;
-import org.apache.accumulo.server.master.state.MergeStats;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TableCounts;
-import org.apache.accumulo.server.master.state.TableStats;
-import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.accumulo.server.master.state.TabletState;
-import org.apache.accumulo.server.master.state.TabletStateStore;
-import org.apache.accumulo.server.master.state.tables.TableManager;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.hadoop.io.Text;
-import org.apache.thrift.TException;
-
-class TabletGroupWatcher extends Daemon {
-  
-  private final Master master;
-  final TabletStateStore store;
-  final TabletGroupWatcher dependentWatcher;
-  
-  final TableStats stats = new TableStats();
-  
-  TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
-    this.master = master;
-    this.store = store;
-    this.dependentWatcher = dependentWatcher;
-  }
-  
-  Map<Text,TableCounts> getStats() {
-    return stats.getLast();
-  }
-  
-  TableCounts getStats(Text tableId) {
-    return stats.getLast(tableId);
-  }
-  
-  @Override
-  public void run() {
-    
-    Thread.currentThread().setName("Watching " + store.name());
-    int[] oldCounts = new int[TabletState.values().length];
-    EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
-    
-    while (this.master.stillMaster()) {
-      // slow things down a little, otherwise we spam the logs when there are many wake-up events
-      UtilWaitThread.sleep(100);
-
-      int totalUnloaded = 0;
-      int unloaded = 0;
-      try {
-        Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
-        
-        // Get the current status for the current list of tservers
-        SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
-        for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) {
-          currentTServers.put(entry, this.master.tserverStatus.get(entry));
-        }
-        
-        if (currentTServers.size() == 0) {
-          eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
-          continue;
-        }
-        
-        // Don't move tablets to servers that are shutting down
-        SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
-        destinations.keySet().removeAll(this.master.serversToShutdown);
-        
-        List<Assignment> assignments = new ArrayList<Assignment>();
-        List<Assignment> assigned = new ArrayList<Assignment>();
-        List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
-        Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
-        
-        int[] counts = new int[TabletState.values().length];
-        stats.begin();
-        // Walk through the tablets in our store, and work tablets
-        // towards their goal
-        for (TabletLocationState tls : store) {
-          if (tls == null) {
-            continue;
-          }
-          // ignore entries for tables that do not exist in zookeeper
-          if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
-            continue;
-          
-          // Don't overwhelm the tablet servers with work
-          if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
-            flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
-            assignments.clear();
-            assigned.clear();
-            assignedToDeadServers.clear();
-            unassigned.clear();
-            unloaded = 0;
-            eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
-          }
-          Text tableId = tls.extent.getTableId();
-          MergeStats mergeStats = mergeStatsCache.get(tableId);
-          if (mergeStats == null) {
-            mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent)));
-          }
-          TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
-          TServerInstance server = tls.getServer();
-          TabletState state = tls.getState(currentTServers.keySet());
-          stats.update(tableId, state);
-          mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
-          sendChopRequest(mergeStats.getMergeInfo(), state, tls);
-          sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
-          
-          // Always follow through with assignments
-          if (state == TabletState.ASSIGNED) {
-            goal = TabletGoalState.HOSTED;
-          }
-          
-          // if we are shutting down all the tabletservers, we have to do it in order
-          if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
-            if (this.master.serversToShutdown.equals(currentTServers.keySet())) {
-              if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
-                goal = TabletGoalState.HOSTED;
-              }
-            }
-          }
-          
-          if (goal == TabletGoalState.HOSTED) {
-            if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
-              if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs))
-                continue;
-            }
-            switch (state) {
-              case HOSTED:
-                if (server.equals(this.master.migrations.get(tls.extent)))
-                  this.master.migrations.remove(tls.extent);
-                break;
-              case ASSIGNED_TO_DEAD_SERVER:
-                assignedToDeadServers.add(tls);
-                if (server.equals(this.master.migrations.get(tls.extent)))
-                  this.master.migrations.remove(tls.extent);
-                // log.info("Current servers " + currentTServers.keySet());
-                break;
-              case UNASSIGNED:
-                // maybe it's a finishing migration
-                TServerInstance dest = this.master.migrations.get(tls.extent);
-                if (dest != null) {
-                  // if destination is still good, assign it
-                  if (destinations.keySet().contains(dest)) {
-                    assignments.add(new Assignment(tls.extent, dest));
-                  } else {
-                    // get rid of this migration
-                    this.master.migrations.remove(tls.extent);
-                    unassigned.put(tls.extent, server);
-                  }
-                } else {
-                  unassigned.put(tls.extent, server);
-                }
-                break;
-              case ASSIGNED:
-                // Send another reminder
-                assigned.add(new Assignment(tls.extent, tls.future));
-                break;
-            }
-          } else {
-            switch (state) {
-              case UNASSIGNED:
-                break;
-              case ASSIGNED_TO_DEAD_SERVER:
-                assignedToDeadServers.add(tls);
-                // log.info("Current servers " + currentTServers.keySet());
-                break;
-              case HOSTED:
-                TServerConnection conn = this.master.tserverSet.getConnection(server);
-                if (conn != null) {
-                  conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED);
-                  unloaded++;
-                  totalUnloaded++;
-                } else {
-                  Master.log.warn("Could not connect to server " + server);
-                }
-                break;
-              case ASSIGNED:
-                break;
-            }
-          }
-          counts[state.ordinal()]++;
-        }
-        
-        flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
-        
-        // provide stats after flushing changes to avoid race conditions w/ delete table
-        stats.end();
-        
-        // Report changes
-        for (TabletState state : TabletState.values()) {
-          int i = state.ordinal();
-          if (counts[i] > 0 && counts[i] != oldCounts[i]) {
-            this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
-          }
-        }
-        Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
-        oldCounts = counts;
-        if (totalUnloaded > 0) {
-          this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
-        }
-        
-        updateMergeState(mergeStatsCache);
-        
-        Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
-        eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
-      } catch (Exception ex) {
-        Master.log.error("Error processing table state for store " + store.name(), ex);
-        UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
-      }
-    }
-  }
-  
-  private int assignedOrHosted() {
-    int result = 0;
-    for (TableCounts counts : stats.getLast().values()) {
-      result += counts.assigned() + counts.hosted();
-    }
-    return result;
-  }
-  
-  private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
-    // Already split?
-    if (!info.getState().equals(MergeState.SPLITTING))
-      return;
-    // Merges don't split
-    if (!info.isDelete())
-      return;
-    // Online and ready to split?
-    if (!state.equals(TabletState.HOSTED))
-      return;
-    // Does this extent cover the end points of the delete?
-    KeyExtent range = info.getExtent();
-    if (tls.extent.overlaps(range)) {
-      for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
-        if (splitPoint == null)
-          continue;
-        if (!tls.extent.contains(splitPoint))
-          continue;
-        if (splitPoint.equals(tls.extent.getEndRow()))
-          continue;
-        if (splitPoint.equals(tls.extent.getPrevEndRow()))
-          continue;
-        try {
-          TServerConnection conn;
-          conn = this.master.tserverSet.getConnection(tls.current);
-          if (conn != null) {
-            Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
-            conn.splitTablet(this.master.masterLock, tls.extent, splitPoint);
-          } else {
-            Master.log.warn("Not connected to server " + tls.current);
-          }
-        } catch (NotServingTabletException e) {
-          Master.log.debug("Error asking tablet server to split a tablet: " + e);
-        } catch (Exception e) {
-          Master.log.warn("Error asking tablet server to split a tablet: " + e);
-        }
-      }
-    }
-  }
-  
-  private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
-    // Don't bother if we're in the wrong state
-    if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
-      return;
-    // Tablet must be online
-    if (!state.equals(TabletState.HOSTED))
-      return;
-    // Tablet isn't already chopped
-    if (tls.chopped)
-      return;
-    // Tablet ranges intersect
-    if (info.needsToBeChopped(tls.extent)) {
-      TServerConnection conn;
-      try {
-        conn = this.master.tserverSet.getConnection(tls.current);
-        if (conn != null) {
-          Master.log.info("Asking " + tls.current + " to chop " + tls.extent);
-          conn.chop(this.master.masterLock, tls.extent);
-        } else {
-          Master.log.warn("Could not connect to server " + tls.current);
-        }
-      } catch (TException e) {
-        Master.log.warn("Communications error asking tablet server to chop a tablet");
-      }
-    }
-  }
-  
-  private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
-    for (MergeStats stats : mergeStatsCache.values()) {
-      try {
-        MergeState update = stats.nextMergeState(this.master.getConnector(), this.master);
-        // when next state is MERGING, its important to persist this before
-        // starting the merge... the verification check that is done before
-        // moving into the merging state could fail if merge starts but does
-        // not finish
-        if (update == MergeState.COMPLETE)
-          update = MergeState.NONE;
-        if (update != stats.getMergeInfo().getState()) {
-          this.master.setMergeState(stats.getMergeInfo(), update);
-        }
-        
-        if (update == MergeState.MERGING) {
-          try {
-            if (stats.getMergeInfo().isDelete()) {
-              deleteTablets(stats.getMergeInfo());
-            } else {
-              mergeMetadataRecords(stats.getMergeInfo());
-            }
-            this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
-          } catch (Exception ex) {
-            Master.log.error("Unable merge metadata table records", ex);
-          }
-        }
-      } catch (Exception ex) {
-        Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex);
-      }
-    }
-  }
-  
-  private void deleteTablets(MergeInfo info) throws AccumuloException {
-    KeyExtent extent = info.getExtent();
-    String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME;
-    Master.log.debug("Deleting tablets for " + extent);
-    char timeType = '\0';
-    KeyExtent followingTablet = null;
-    if (extent.getEndRow() != null) {
-      Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW);
-      followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow()));
-      Master.log.debug("Found following tablet " + followingTablet);
-    }
-    try {
-      Connector conn = this.master.getConnector();
-      Text start = extent.getPrevEndRow();
-      if (start == null) {
-        start = new Text();
-      }
-      Master.log.debug("Making file deletion entries for " + extent);
-      Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(),
-          extent.getEndRow()), true);
-      Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
-      scanner.setRange(deleteRange);
-      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
-      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
-      Set<FileRef> datafiles = new TreeSet<FileRef>();
-      for (Entry<Key,Value> entry : scanner) {
-        Key key = entry.getKey();
-        if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
-          datafiles.add(new FileRef(this.master.fs, key));
-          if (datafiles.size() > 1000) {
-            MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
-            datafiles.clear();
-          }
-        } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
-          timeType = entry.getValue().toString().charAt(0);
-        } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
-          throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
-        } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
-          datafiles.add(new FileRef(entry.getValue().toString(), this.master.fs.getFullPath(FileType.TABLE, entry.getValue().toString())));
-          if (datafiles.size() > 1000) {
-            MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
-            datafiles.clear();
-          }
-        }
-      }
-      MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
-      BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
-      try {
-        deleteTablets(info, deleteRange, bw, conn);
-      } finally {
-        bw.close();
-      }
-      
-      if (followingTablet != null) {
-        Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow());
-        bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
-        try {
-          Mutation m = new Mutation(followingTablet.getMetadataEntry());
-          TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow()));
-          ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
-          bw.addMutation(m);
-          bw.flush();
-        } finally {
-          bw.close();
-        }
-      } else {
-        // Recreate the default tablet to hold the end of the table
-        Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow());
-        String tdir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + extent.getTableId() + Constants.DEFAULT_TABLET_LOCATION;
-        MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir,
-            SystemCredentials.get(), timeType, this.master.masterLock);
-      }
-    } catch (Exception ex) {
-      throw new AccumuloException(ex);
-    }
-  }
-  
-  private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
-    KeyExtent range = info.getExtent();
-    Master.log.debug("Merging metadata for " + range);
-    KeyExtent stop = getHighTablet(range);
-    Master.log.debug("Highest tablet is " + stop);
-    Value firstPrevRowValue = null;
-    Text stopRow = stop.getMetadataEntry();
-    Text start = range.getPrevEndRow();
-    if (start == null) {
-      start = new Text();
-    }
-    Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
-    String targetSystemTable = MetadataTable.NAME;
-    if (range.isMeta()) {
-      targetSystemTable = RootTable.NAME;
-    }
-    
-    BatchWriter bw = null;
-    try {
-      long fileCount = 0;
-      Connector conn = this.master.getConnector();
-      // Make file entries in highest tablet
-      bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
-      Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
-      scanner.setRange(scanRange);
-      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
-      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      Mutation m = new Mutation(stopRow);
-      String maxLogicalTime = null;
-      for (Entry<Key,Value> entry : scanner) {
-        Key key = entry.getKey();
-        Value value = entry.getValue();
-        if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
-          m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
-          fileCount++;
-        } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
-          Master.log.debug("prevRow entry for lowest tablet is " + value);
-          firstPrevRowValue = new Value(value);
-        } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
-          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
-        } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
-          bw.addMutation(MetadataTableUtil.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
-        }
-      }
-      
-      // read the logical time from the last tablet in the merge range, it is not included in
-      // the loop above
-      scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
-      scanner.setRange(new Range(stopRow));
-      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
-      for (Entry<Key,Value> entry : scanner) {
-        if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
-          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
-        }
-      }
-      
-      if (maxLogicalTime != null)
-        TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
-      
-      if (!m.getUpdates().isEmpty()) {
-        bw.addMutation(m);
-      }
-      
-      bw.flush();
-      
-      Master.log.debug("Moved " + fileCount + " files to " + stop);
-      
-      if (firstPrevRowValue == null) {
-        Master.log.debug("tablet already merged");
-        return;
-      }
-      
-      stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
-      Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
-      Master.log.debug("Setting the prevRow for last tablet: " + stop);
-      bw.addMutation(updatePrevRow);
-      bw.flush();
-      
-      deleteTablets(info, scanRange, bw, conn);
-      
-      // Clean-up the last chopped marker
-      m = new Mutation(stopRow);
-      ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
-      bw.addMutation(m);
-      bw.flush();
-      
-    } catch (Exception ex) {
-      throw new AccumuloException(ex);
-    } finally {
-      if (bw != null)
-        try {
-          bw.close();
-        } catch (Exception ex) {
-          throw new AccumuloException(ex);
-        }
-    }
-  }
-  
-  private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
-    Scanner scanner;
-    Mutation m;
-    // Delete everything in the other tablets
-    // group all deletes into tablet into one mutation, this makes tablets
-    // either disappear entirely or not all.. this is important for the case
-    // where the process terminates in the loop below...
-    scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
-    Master.log.debug("Deleting range " + scanRange);
-    scanner.setRange(scanRange);
-    RowIterator rowIter = new RowIterator(scanner);
-    while (rowIter.hasNext()) {
-      Iterator<Entry<Key,Value>> row = rowIter.next();
-      m = null;
-      while (row.hasNext()) {
-        Entry<Key,Value> entry = row.next();
-        Key key = entry.getKey();
-        
-        if (m == null)
-          m = new Mutation(key.getRow());
-        
-        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-        Master.log.debug("deleting entry " + key);
-      }
-      bw.addMutation(m);
-    }
-    
-    bw.flush();
-  }
-  
-  private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
-    try {
-      Connector conn = this.master.getConnector();
-      Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
-      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
-      KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
-      scanner.setRange(new Range(start.getMetadataEntry(), null));
-      Iterator<Entry<Key,Value>> iterator = scanner.iterator();
-      if (!iterator.hasNext()) {
-        throw new AccumuloException("No last tablet for a merge " + range);
-      }
-      Entry<Key,Value> entry = iterator.next();
-      KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
-      if (highTablet.getTableId() != range.getTableId()) {
-        throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
-      }
-      return highTablet;
-    } catch (Exception ex) {
-      throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
-    }
-  }
-  
-  private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
-      List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
-    if (!assignedToDeadServers.isEmpty()) {
-      int maxServersToShow = min(assignedToDeadServers.size(), 100);
-      Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
-      store.unassign(assignedToDeadServers);
-      this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
-    }
-    
-    if (!currentTServers.isEmpty()) {
-      Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
-      this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
-      for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
-        if (unassigned.containsKey(assignment.getKey())) {
-          if (assignment.getValue() != null) {
-            Master.log.debug(store.name() + " assigning tablet " + assignment);
-            assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
-          }
-        } else {
-          Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
-        }
-      }
-      if (!unassigned.isEmpty() && assignedOut.isEmpty())
-        Master.log.warn("Load balancer failed to assign any tablets");
-    }
-    
-    if (assignments.size() > 0) {
-      Master.log.info(String.format("Assigning %d tablets", assignments.size()));
-      store.setFutureLocations(assignments);
-    }
-    assignments.addAll(assigned);
-    for (Assignment a : assignments) {
-      TServerConnection conn = this.master.tserverSet.getConnection(a.server);
-      if (conn != null) {
-        conn.assignTablet(this.master.masterLock, a.tablet);
-      } else {
-        Master.log.warn("Could not connect to server " + a.server);
-      }
-      master.assignedTablet(a.tablet);
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
deleted file mode 100644
index ec3371c..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.balancer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TableInfo;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletMigration;
-import org.apache.thrift.TException;
-
-/**
- * A chaotic load balancer used for testing. It constantly shuffles tablets, preventing them from resting in a single location for very long. This is not
- * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
- */
-public class ChaoticLoadBalancer extends TabletBalancer {
-  Random r = new Random();
-  
-  @Override
-  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
-      Map<KeyExtent,TServerInstance> assignments) {
-    long total = assignments.size() + unassigned.size();
-    long avg = (long) Math.ceil(((double) total) / current.size());
-    Map<TServerInstance,Long> toAssign = new HashMap<TServerInstance,Long>();
-    List<TServerInstance> tServerArray = new ArrayList<TServerInstance>();
-    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
-      long numTablets = 0;
-      for (TableInfo ti : e.getValue().getTableMap().values()) {
-        numTablets += ti.tablets;
-      }
-      if (numTablets < avg) {
-        tServerArray.add(e.getKey());
-        toAssign.put(e.getKey(), avg - numTablets);
-      }
-    }
-    
-    for (KeyExtent ke : unassigned.keySet()) {
-      int index = r.nextInt(tServerArray.size());
-      TServerInstance dest = tServerArray.get(index);
-      assignments.put(ke, dest);
-      long remaining = toAssign.get(dest).longValue() - 1;
-      if (remaining == 0) {
-        tServerArray.remove(index);
-        toAssign.remove(dest);
-      } else {
-        toAssign.put(dest, remaining);
-      }
-    }
-  }
-  
-  /**
-   * Will balance randomly, maintaining distribution
-   */
-  @Override
-  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
-    Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
-    List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
-    
-    if (!migrations.isEmpty())
-      return 100;
-    
-    boolean moveMetadata = r.nextInt(4) == 0;
-    long totalTablets = 0;
-    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
-      long tabletCount = 0;
-      for (TableInfo ti : e.getValue().getTableMap().values()) {
-        tabletCount += ti.tablets;
-      }
-      numTablets.put(e.getKey(), tabletCount);
-      underCapacityTServer.add(e.getKey());
-      totalTablets += tabletCount;
-    }
-    // totalTablets is fuzzy due to asynchronicity of the stats
-    // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios
-    long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2);
-    
-    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
-      for (String table : e.getValue().getTableMap().keySet()) {
-        if (!moveMetadata && MetadataTable.NAME.equals(table))
-          continue;
-        try {
-          for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) {
-            KeyExtent ke = new KeyExtent(ts.extent);
-            int index = r.nextInt(underCapacityTServer.size());
-            TServerInstance dest = underCapacityTServer.get(index);
-            if (dest.equals(e.getKey()))
-              continue;
-            migrationsOut.add(new TabletMigration(ke, e.getKey(), dest));
-            if (numTablets.put(dest, numTablets.get(dest) + 1) > avg)
-              underCapacityTServer.remove(index);
-            if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg && !underCapacityTServer.contains(e.getKey()))
-              underCapacityTServer.add(e.getKey());
-            
-            // We can get some craziness with only 1 tserver, so lets make sure there's always an option!
-            if (underCapacityTServer.isEmpty())
-              underCapacityTServer.addAll(numTablets.keySet());
-          }
-        } catch (ThriftSecurityException e1) {
-          // Shouldn't happen, but carry on if it does
-          e1.printStackTrace();
-        } catch (TException e1) {
-          // Shouldn't happen, but carry on if it does
-          e1.printStackTrace();
-        }
-      }
-    }
-    
-    return 100;
-  }
-  
-  @Override
-  public void init(ServerConfiguration conf) {
-    super.init(conf);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
deleted file mode 100644
index 9b88d74..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.balancer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TableInfo;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletMigration;
-import org.apache.log4j.Logger;
-
-public class DefaultLoadBalancer extends TabletBalancer {
-  
-  private static final Logger log = Logger.getLogger(DefaultLoadBalancer.class);
-  
-  Iterator<TServerInstance> assignments;
-  // if tableToBalance is set, then only balance the given table
-  String tableToBalance = null;
-  
-  public DefaultLoadBalancer() {
-    
-  }
-  
-  public DefaultLoadBalancer(String table) {
-    tableToBalance = table;
-  }
-  
-  List<TServerInstance> randomize(Set<TServerInstance> locations) {
-    List<TServerInstance> result = new ArrayList<TServerInstance>(locations);
-    Collections.shuffle(result);
-    return result;
-  }
-  
-  public TServerInstance getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations, KeyExtent extent, TServerInstance last) {
-    if (locations.size() == 0)
-      return null;
-    
-    if (last != null) {
-      // Maintain locality
-      TServerInstance simple = new TServerInstance(last.getLocation(), "");
-      Iterator<TServerInstance> find = locations.tailMap(simple).keySet().iterator();
-      if (find.hasNext()) {
-        TServerInstance current = find.next();
-        if (current.host().equals(last.host()))
-          return current;
-      }
-    }
-    
-    // The strategy here is to walk through the locations and hand them back, one at a time
-    // Grab an iterator off of the set of options; use a new iterator if it hands back something not in the current list.
-    if (assignments == null || !assignments.hasNext())
-      assignments = randomize(locations.keySet()).iterator();
-    TServerInstance result = assignments.next();
-    if (!locations.containsKey(result)) {
-      assignments = null;
-      return randomize(locations.keySet()).iterator().next();
-    }
-    return result;
-  }
-  
-  static class ServerCounts implements Comparable<ServerCounts> {
-    public final TServerInstance server;
-    public final int count;
-    public final TabletServerStatus status;
-    
-    ServerCounts(int count, TServerInstance server, TabletServerStatus status) {
-      this.count = count;
-      this.server = server;
-      this.status = status;
-    }
-    
-    public int compareTo(ServerCounts obj) {
-      int result = count - obj.count;
-      if (result == 0)
-        return server.compareTo(obj.server);
-      return result;
-    }
-  }
-  
-  public boolean getMigrations(Map<TServerInstance,TabletServerStatus> current, List<TabletMigration> result) {
-    boolean moreBalancingNeeded = false;
-    try {
-      // no moves possible
-      if (current.size() < 2) {
-        return false;
-      }
-      
-      // Sort by total number of online tablets, per server
-      int total = 0;
-      ArrayList<ServerCounts> totals = new ArrayList<ServerCounts>();
-      for (Entry<TServerInstance,TabletServerStatus> entry : current.entrySet()) {
-        int serverTotal = 0;
-        if (entry.getValue() != null && entry.getValue().tableMap != null) {
-          for (Entry<String,TableInfo> e : entry.getValue().tableMap.entrySet()) {
-            /**
-             * The check below was on entry.getKey(), but that resolves to a tabletserver not a tablename. Believe it should be e.getKey() which is a tablename
-             */
-            if (tableToBalance == null || tableToBalance.equals(e.getKey()))
-              serverTotal += e.getValue().onlineTablets;
-          }
-        }
-        totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue()));
-        total += serverTotal;
-      }
-      
-      // order from low to high
-      Collections.sort(totals);
-      Collections.reverse(totals);
-      int even = total / totals.size();
-      int numServersOverEven = total % totals.size();
-      
-      // Move tablets from the servers with too many to the servers with
-      // the fewest but only nominate tablets to move once. This allows us
-      // to fill new servers with tablets from a mostly balanced server
-      // very quickly. However, it may take several balancing passes to move
-      // tablets from one hugely overloaded server to many slightly
-      // under-loaded servers.
-      int end = totals.size() - 1;
-      int movedAlready = 0;
-      for (int tooManyIndex = 0; tooManyIndex < totals.size(); tooManyIndex++) {
-        ServerCounts tooMany = totals.get(tooManyIndex);
-        int goal = even;
-        if (tooManyIndex < numServersOverEven) {
-          goal++;
-        }
-        int needToUnload = tooMany.count - goal;
-        ServerCounts tooLittle = totals.get(end);
-        int needToLoad = goal - tooLittle.count - movedAlready;
-        if (needToUnload < 1 && needToLoad < 1) {
-          break;
-        }
-        if (needToUnload >= needToLoad) {
-          result.addAll(move(tooMany, tooLittle, needToLoad));
-          end--;
-          movedAlready = 0;
-        } else {
-          result.addAll(move(tooMany, tooLittle, needToUnload));
-          movedAlready += needToUnload;
-        }
-        if (needToUnload > needToLoad)
-          moreBalancingNeeded = true;
-      }
-      
-    } finally {
-      log.debug("balance ended with " + result.size() + " migrations");
-    }
-    return moreBalancingNeeded;
-  }
-  
-  static class TableDiff {
-    int diff;
-    String table;
-    
-    public TableDiff(int diff, String table) {
-      this.diff = diff;
-      this.table = table;
-    }
-  };
-  
-  /**
-   * Select a tablet based on differences between table loads; if the loads are even, use the busiest table
-   */
-  List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, int count) {
-    
-    List<TabletMigration> result = new ArrayList<TabletMigration>();
-    if (count == 0)
-      return result;
-    
-    Map<String,Map<KeyExtent,TabletStats>> onlineTablets = new HashMap<String,Map<KeyExtent,TabletStats>>();
-    // Copy counts so we can update them as we propose migrations
-    Map<String,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status);
-    Map<String,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
-    
-    for (int i = 0; i < count; i++) {
-      String table;
-      Integer tooLittleCount;
-      if (tableToBalance == null) {
-        // find a table to migrate
-        // look for an uneven table count
-        int biggestDifference = 0;
-        String biggestDifferenceTable = null;
-        for (Entry<String,Integer> tableEntry : tooMuchMap.entrySet()) {
-          String tableID = tableEntry.getKey();
-          if (tooLittleMap.get(tableID) == null)
-            tooLittleMap.put(tableID, 0);
-          int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
-          if (diff > biggestDifference) {
-            biggestDifference = diff;
-            biggestDifferenceTable = tableID;
-          }
-        }
-        if (biggestDifference < 2) {
-          table = busiest(tooMuch.status.tableMap);
-        } else {
-          table = biggestDifferenceTable;
-        }
-      } else {
-        // just balance the given table
-        table = tableToBalance;
-      }
-      Map<KeyExtent,TabletStats> onlineTabletsForTable = onlineTablets.get(table);
-      try {
-        if (onlineTabletsForTable == null) {
-          onlineTabletsForTable = new HashMap<KeyExtent,TabletStats>();
-          for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server, table))
-            onlineTabletsForTable.put(new KeyExtent(stat.extent), stat);
-          onlineTablets.put(table, onlineTabletsForTable);
-        }
-      } catch (Exception ex) {
-        log.error("Unable to select a tablet to move", ex);
-        return result;
-      }
-      KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable);
-      onlineTabletsForTable.remove(extent);
-      if (extent == null)
-        return result;
-      tooMuchMap.put(table, tooMuchMap.get(table) - 1);
-      /**
-       * If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there is only one tabletserver that holds all of the tablets. Here
-       * we check to see if in fact that is the case and if so set the value to 0.
-       */
-      tooLittleCount = tooLittleMap.get(table);
-      if (tooLittleCount == null) {
-        tooLittleCount = 0;
-      }
-      tooLittleMap.put(table, tooLittleCount + 1);
-      
-      result.add(new TabletMigration(extent, tooMuch.server, tooLittle.server));
-    }
-    return result;
-  }
-  
-  static Map<String,Integer> tabletCountsPerTable(TabletServerStatus status) {
-    Map<String,Integer> result = new HashMap<String,Integer>();
-    if (status != null && status.tableMap != null) {
-      Map<String,TableInfo> tableMap = status.tableMap;
-      for (Entry<String,TableInfo> entry : tableMap.entrySet()) {
-        result.put(entry.getKey(), entry.getValue().onlineTablets);
-      }
-    }
-    return result;
-  }
-  
-  static KeyExtent selectTablet(TServerInstance tserver, Map<KeyExtent,TabletStats> extents) {
-    if (extents.size() == 0)
-      return null;
-    KeyExtent mostRecentlySplit = null;
-    long splitTime = 0;
-    for (Entry<KeyExtent,TabletStats> entry : extents.entrySet())
-      if (entry.getValue().splitCreationTime >= splitTime) {
-        splitTime = entry.getValue().splitCreationTime;
-        mostRecentlySplit = entry.getKey();
-      }
-    return mostRecentlySplit;
-  }
-  
-  // define what it means for a tablet to be busy
-  private static String busiest(Map<String,TableInfo> tables) {
-    String result = null;
-    double busiest = Double.NEGATIVE_INFINITY;
-    for (Entry<String,TableInfo> entry : tables.entrySet()) {
-      TableInfo info = entry.getValue();
-      double busy = info.ingestRate + info.queryRate;
-      if (busy > busiest) {
-        busiest = busy;
-        result = entry.getKey();
-      }
-    }
-    return result;
-  }
-  
-  @Override
-  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
-      Map<KeyExtent,TServerInstance> assignments) {
-    for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) {
-      assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue()));
-    }
-  }
-  
-  @Override
-  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
-    // do we have any servers?
-    if (current.size() > 0) {
-      // Don't migrate if we have migrations in progress
-      if (migrations.size() == 0) {
-        if (getMigrations(current, migrationsOut))
-          return 1 * 1000;
-      }
-    }
-    return 5 * 1000;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
deleted file mode 100644
index 3e0a2bf..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.balancer;
-
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletMigration;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.apache.log4j.Logger;
-
-public class TableLoadBalancer extends TabletBalancer {
-  
-  private static final Logger log = Logger.getLogger(TableLoadBalancer.class);
-  
-  Map<String,TabletBalancer> perTableBalancers = new HashMap<String,TabletBalancer>();
-  
-  private TabletBalancer constructNewBalancerForTable(String clazzName, String table) throws Exception {
-    Class<? extends TabletBalancer> clazz = AccumuloVFSClassLoader.loadClass(clazzName, TabletBalancer.class);
-    Constructor<? extends TabletBalancer> constructor = clazz.getConstructor(String.class);
-    return constructor.newInstance(table);
-  }
-  
-  protected String getLoadBalancerClassNameForTable(String table) {
-    return configuration.getTableConfiguration(table).get(Property.TABLE_LOAD_BALANCER);
-  }
-  
-  protected TabletBalancer getBalancerForTable(String table) {
-    TabletBalancer balancer = perTableBalancers.get(table);
-    
-    String clazzName = getLoadBalancerClassNameForTable(table);
-    
-    if (clazzName == null)
-      clazzName = DefaultLoadBalancer.class.getName();
-    if (balancer != null) {
-      if (clazzName.equals(balancer.getClass().getName()) == false) {
-        // the balancer class for this table does not match the class specified in the configuration
-        try {
-          // attempt to construct a balancer with the specified class
-          TabletBalancer newBalancer = constructNewBalancerForTable(clazzName, table);
-          if (newBalancer != null) {
-            balancer = newBalancer;
-            perTableBalancers.put(table, balancer);
-            balancer.init(configuration);
-          }
-        } catch (Exception e) {
-          log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e);
-        }
-      }
-    }
-    if (balancer == null) {
-      try {
-        balancer = constructNewBalancerForTable(clazzName, table);
-        log.info("Loaded class " + clazzName + " for table " + table);
-      } catch (Exception e) {
-        log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e);
-      }
-      
-      if (balancer == null) {
-        log.info("Using balancer " + DefaultLoadBalancer.class.getName() + " for table " + table);
-        balancer = new DefaultLoadBalancer(table);
-      }
-      perTableBalancers.put(table, balancer);
-      balancer.init(configuration);
-    }
-    return balancer;
-  }
-  
-  @Override
-  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
-      Map<KeyExtent,TServerInstance> assignments) {
-    // separate the unassigned into tables
-    Map<String,Map<KeyExtent,TServerInstance>> groupedUnassigned = new HashMap<String,Map<KeyExtent,TServerInstance>>();
-    for (Entry<KeyExtent,TServerInstance> e : unassigned.entrySet()) {
-      Map<KeyExtent,TServerInstance> tableUnassigned = groupedUnassigned.get(e.getKey().getTableId().toString());
-      if (tableUnassigned == null) {
-        tableUnassigned = new HashMap<KeyExtent,TServerInstance>();
-        groupedUnassigned.put(e.getKey().getTableId().toString(), tableUnassigned);
-      }
-      tableUnassigned.put(e.getKey(), e.getValue());
-    }
-    for (Entry<String,Map<KeyExtent,TServerInstance>> e : groupedUnassigned.entrySet()) {
-      Map<KeyExtent,TServerInstance> newAssignments = new HashMap<KeyExtent,TServerInstance>();
-      getBalancerForTable(e.getKey()).getAssignments(current, e.getValue(), newAssignments);
-      assignments.putAll(newAssignments);
-    }
-  }
-  
-  private TableOperations tops = null;
-  
-  protected TableOperations getTableOperations() {
-    if (tops == null)
-      try {
-        tops = configuration.getInstance().getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()).tableOperations();
-      } catch (AccumuloException e) {
-        log.error("Unable to access table operations from within table balancer", e);
-      } catch (AccumuloSecurityException e) {
-        log.error("Unable to access table operations from within table balancer", e);
-      }
-    return tops;
-  }
-  
-  @Override
-  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
-    long minBalanceTime = 5 * 1000;
-    // Iterate over the tables and balance each of them
-    TableOperations t = getTableOperations();
-    if (t == null)
-      return minBalanceTime;
-    for (String s : t.tableIdMap().values()) {
-      ArrayList<TabletMigration> newMigrations = new ArrayList<TabletMigration>();
-      long tableBalanceTime = getBalancerForTable(s).balance(current, migrations, newMigrations);
-      if (tableBalanceTime < minBalanceTime)
-        minBalanceTime = tableBalanceTime;
-      migrationsOut.addAll(newMigrations);
-    }
-    return minBalanceTime;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
deleted file mode 100644
index fd76ce2..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.balancer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletMigration;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-
-public abstract class TabletBalancer {
-  
-  private static final Logger log = Logger.getLogger(TabletBalancer.class);
-  
-  protected ServerConfiguration configuration;
-  
-  /**
-   * Initialize the TabletBalancer. This gives the balancer the opportunity to read the configuration.
-   */
-  public void init(ServerConfiguration conf) {
-    configuration = conf;
-  }
-  
-  /**
-   * Assign tablets to tablet servers. This method is called whenever the master finds tablets that are unassigned.
-   * 
-   * @param current
-   *          The current table-summary state of all the online tablet servers. Read-only. The TabletServerStatus for each server may be null if the tablet
-   *          server has not yet responded to a recent request for status.
-   * @param unassigned
-   *          A map from unassigned tablet to the last known tablet server. Read-only.
-   * @param assignments
-   *          A map from tablet to assigned server. Write-only.
-   */
-  abstract public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
-      Map<KeyExtent,TServerInstance> assignments);
-  
-  /**
-   * Ask the balancer if any migrations are necessary.
-   * 
-   * @param current
-   *          The current table-summary state of all the online tablet servers. Read-only.
-   * @param migrations
-   *          the current set of migrations. Read-only.
-   * @param migrationsOut
-   *          new migrations to perform; should not contain tablets in the current set of migrations. Write-only.
-   * @return the time, in milliseconds, to wait before re-balancing.
-   * 
-   *         This method will not be called when there are unassigned tablets.
-   */
-  public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
-  
-  /**
-   * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets
-   * to move.
-   * 
-   * @param tserver
-   *          The tablet server to ask.
-   * @param tableId
-   *          The table id
-   * @return a list of tablet statistics
-   * @throws ThriftSecurityException
-   *           tablet server disapproves of your internal System password.
-   * @throws TException
-   *           any other problem
-   */
-  public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException {
-    log.debug("Scanning tablet server " + tserver + " for table " + tableId);
-    Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), tserver.getLocation(), configuration.getConfiguration());
-    try {
-      List<TabletStats> onlineTabletsForTable = client.getTabletStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(configuration.getInstance()),
-          tableId);
-      return onlineTabletsForTable;
-    } catch (TTransportException e) {
-      log.error("Unable to connect to " + tserver + ": " + e);
-    } finally {
-      ThriftUtil.returnClient(client);
-    }
-    return null;
-  }
-  
-  /**
-   * Utility to ensure that the migrations from balance() are consistent:
-   * <ul>
-   * <li>Tablet objects are not null
-   * <li>Source and destination tablet servers are not null and current
-   * </ul>
-   * 
-   * @param current
-   * @param migrations
-   * @return A list of TabletMigration object that passed sanity checks.
-   */
-  public static List<TabletMigration> checkMigrationSanity(Set<TServerInstance> current, List<TabletMigration> migrations) {
-    List<TabletMigration> result = new ArrayList<TabletMigration>(migrations.size());
-    for (TabletMigration m : migrations) {
-      if (m.tablet == null) {
-        log.warn("Balancer gave back a null tablet " + m);
-        continue;
-      }
-      if (m.newServer == null) {
-        log.warn("Balancer did not set the destination " + m);
-        continue;
-      }
-      if (m.oldServer == null) {
-        log.warn("Balancer did not set the source " + m);
-        continue;
-      }
-      if (!current.contains(m.oldServer)) {
-        log.warn("Balancer wants to move a tablet from a server that is not current: " + m);
-        continue;
-      }
-      if (!current.contains(m.newServer)) {
-        log.warn("Balancer wants to move a tablet to a server that is not current: " + m);
-        continue;
-      }
-      result.add(m);
-    }
-    return result;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
deleted file mode 100644
index 3eda844..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.recovery;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.log4j.Logger;
-
-public class HadoopLogCloser implements LogCloser {
-  
-  private static Logger log = Logger.getLogger(HadoopLogCloser.class);
-
-  @Override
-  public long close(Master master, VolumeManager fs, Path source) throws IOException {
-    FileSystem ns = fs.getFileSystemByPath(source);
-    if (ns instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem) ns;
-      try {
-        if (!dfs.recoverLease(source)) {
-          log.info("Waiting for file to be closed " + source.toString());
-          return master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_LEASE_RECOVERY_WAITING_PERIOD);
-        }
-        log.info("Recovered lease on " + source.toString());
-      } catch (FileNotFoundException ex) {
-        throw ex;
-      } catch (Exception ex) {
-        log.warn("Error recovering lease on " + source.toString(), ex);
-        ns.append(source).close();
-        log.info("Recovered lease on " + source.toString() + " using append");
-      }
-    } else if (ns instanceof LocalFileSystem) {
-      // ignore
-    } else {
-      throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
-    }
-    return 0;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
deleted file mode 100644
index 42497ff..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.recovery;
-
-import java.io.IOException;
-
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.Path;
-
-public interface LogCloser {
-  public long close(Master master, VolumeManager fs, Path path) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
deleted file mode 100644
index a5bb0c7..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.recovery;
-
-import java.io.IOException;
-
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.log4j.Logger;
-
-public class MapRLogCloser implements LogCloser {
-  
-  private static Logger log = Logger.getLogger(MapRLogCloser.class);
-  
-  @Override
-  public long close(Master m, VolumeManager fs, Path path) throws IOException {
-    log.info("Recovering file " + path.toString() + " by changing permission to readonly");
-    FileSystem ns = fs.getFileSystemByPath(path);
-    FsPermission roPerm = new FsPermission((short) 0444);
-    try {
-      ns.setPermission(path, roPerm);
-      return 0;
-    } catch (IOException ex) {
-      log.error("error recovering lease ", ex);
-      // lets do this again
-      return 1000;
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
deleted file mode 100644
index 5ce7a66..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.recovery;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-public class RecoveryManager {
-  
-  private static Logger log = Logger.getLogger(RecoveryManager.class);
-  
-  private Map<String,Long> recoveryDelay = new HashMap<String,Long>();
-  private Set<String> closeTasksQueued = new HashSet<String>();
-  private Set<String> sortsQueued = new HashSet<String>();
-  private ScheduledExecutorService executor;
-  private Master master;
-  private ZooCache zooCache;
-  
-  public RecoveryManager(Master master) {
-    this.master = master;
-    executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
-    zooCache = new ZooCache();
-    try {
-      List<String> workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued();
-      sortsQueued.addAll(workIDs);
-    } catch (Exception e) {
-      log.warn(e, e);
-    }
-  }
-  
-  private class LogSortTask implements Runnable {
-    private String source;
-    private String destination;
-    private String sortId;
-    private LogCloser closer;
-    
-    public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
-      this.closer = closer;
-      this.source = source;
-      this.destination = destination;
-      this.sortId = sortId;
-    }
-    
-    @Override
-    public void run() {
-      boolean rescheduled = false;
-      try {
-        
-        long time = closer.close(master, master.getFileSystem(), new Path(source));
-        
-        if (time > 0) {
-          executor.schedule(this, time, TimeUnit.MILLISECONDS);
-          rescheduled = true;
-        } else {
-          initiateSort(sortId, source, destination);
-        }
-      } catch (FileNotFoundException e) {
-        log.debug("Unable to initate log sort for " + source + ": " + e);
-      } catch (Exception e) {
-        log.warn("Failed to initiate log sort " + source, e);
-      } finally {
-        if (!rescheduled) {
-          synchronized (RecoveryManager.this) {
-            closeTasksQueued.remove(sortId);
-          }
-        }
-      }
-    }
-    
-  }
-  
-  private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
-    String work =  source + "|" + destination; 
-    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
-    
-    synchronized (this) {
-      sortsQueued.add(sortId);
-    }
-    
-    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
-    log.info("Created zookeeper entry " + path + " with data " + work);
-  }
-  
-  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
-    boolean recoveryNeeded = false;
-    ;
-    for (Collection<String> logs : walogs) {
-      for (String walog : logs) {
-        String hostFilename[] = walog.split("/", 2);
-        String host = hostFilename[0];
-        String filename = hostFilename[1];
-        String parts[] = filename.split("/");
-        String sortId = parts[parts.length - 1];
-        String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId;
-        filename = master.getFileSystem().getFullPath(FileType.WAL, walog).toString();
-        log.debug("Recovering " + filename + " to " + dest);
-        
-        boolean sortQueued;
-        synchronized (this) {
-          sortQueued = sortsQueued.contains(sortId);
-        }
-        
-        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
-          synchronized (this) {
-            sortsQueued.remove(sortId);
-          }
-        }
-
-        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
-          synchronized (this) {
-            closeTasksQueued.remove(sortId);
-            recoveryDelay.remove(sortId);
-            sortsQueued.remove(sortId);
-          }
-          continue;
-        }
-        
-        recoveryNeeded = true;
-        synchronized (this) {
-          if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
-            AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
-            LogCloser closer = Property.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser());
-            Long delay = recoveryDelay.get(sortId);
-            if (delay == null) {
-              delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
-            } else {
-              delay = Math.min(2 * delay, 1000 * 60 * 5l);
-            }
-            
-            log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
-            
-            executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
-            closeTasksQueued.add(sortId);
-            recoveryDelay.put(sortId, delay);
-          }
-        }
-      }
-    }
-    return recoveryNeeded;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java b/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
deleted file mode 100644
index 40b7a93..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import org.apache.accumulo.core.data.KeyExtent;
-
-public class Assignment {
-  public KeyExtent tablet;
-  public TServerInstance server;
-  
-  public Assignment(KeyExtent tablet, TServerInstance server) {
-    this.tablet = tablet;
-    this.server = server;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
deleted file mode 100644
index f4d98bf..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import java.util.Collection;
-import java.util.Set;
-
-public interface CurrentState {
-  
-  Set<String> onlineTables();
-  
-  Set<TServerInstance> onlineTabletServers();
-  
-  Collection<MergeInfo> merges();
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java b/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
deleted file mode 100644
index b2ea7d6..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.accumulo.core.master.thrift.DeadServer;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-public class DeadServerList {
-  private static final Logger log = Logger.getLogger(DeadServerList.class);
-  private final String path;
-  
-  public DeadServerList(String path) {
-    this.path = path;
-    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    try {
-      zoo.mkdirs(path);
-    } catch (Exception ex) {
-      log.error("Unable to make parent directories of " + path, ex);
-    }
-  }
-  
-  public List<DeadServer> getList() {
-    List<DeadServer> result = new ArrayList<DeadServer>();
-    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    try {
-      List<String> children = zoo.getChildren(path);
-      if (children != null) {
-        for (String child : children) {
-          Stat stat = new Stat();
-          byte[] data = zoo.getData(path + "/" + child, stat);
-          DeadServer server = new DeadServer(child, stat.getMtime(), new String(data));
-          result.add(server);
-        }
-      }
-    } catch (Exception ex) {
-      log.error(ex, ex);
-    }
-    return result;
-  }
-  
-  public void delete(String server) {
-    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    try {
-      zoo.recursiveDelete(path + "/" + server, NodeMissingPolicy.SKIP);
-    } catch (Exception ex) {
-      log.error(ex, ex);
-    }
-  }
-  
-  public void post(String server, String cause) {
-    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    try {
-      zoo.putPersistentData(path + "/" + server, cause.getBytes(), NodeExistsPolicy.SKIP);
-    } catch (Exception ex) {
-      log.error(ex, ex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java b/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
deleted file mode 100644
index ad658df..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import java.util.List;
-
-/*
- * An abstract version of ZooKeeper that we can write tests against.
- */
-public interface DistributedStore {
-  
-  public List<String> getChildren(String path) throws DistributedStoreException;
-  
-  public byte[] get(String path) throws DistributedStoreException;
-  
-  public void put(String path, byte[] bs) throws DistributedStoreException;
-  
-  public void remove(String path) throws DistributedStoreException;
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java b/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
deleted file mode 100644
index 3d3a725..0000000
--- a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-public class DistributedStoreException extends Exception {
-  
-  private static final long serialVersionUID = 1L;
-  
-  public DistributedStoreException(String why) {
-    super(why);
-  }
-  
-  public DistributedStoreException(Exception cause) {
-    super(cause);
-  }
-  
-  public DistributedStoreException(String why, Exception cause) {
-    super(why, cause);
-  }
-}