You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:15 UTC

[36/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
new file mode 100644
index 0000000..32a2ab7
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import static java.lang.Math.min;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.master.Master.TabletGoalState;
+import org.apache.accumulo.master.state.MergeStats;
+import org.apache.accumulo.master.state.TableCounts;
+import org.apache.accumulo.master.state.TableStats;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+
+class TabletGroupWatcher extends Daemon {
+  
+  private final Master master;
+  final TabletStateStore store;
+  final TabletGroupWatcher dependentWatcher;
+  
+  final TableStats stats = new TableStats();
+  
+  TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
+    this.master = master;
+    this.store = store;
+    this.dependentWatcher = dependentWatcher;
+  }
+  
+  Map<Text,TableCounts> getStats() {
+    return stats.getLast();
+  }
+  
+  TableCounts getStats(Text tableId) {
+    return stats.getLast(tableId);
+  }
+  
+  @Override
+  public void run() {
+    
+    Thread.currentThread().setName("Watching " + store.name());
+    int[] oldCounts = new int[TabletState.values().length];
+    EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
+    
+    while (this.master.stillMaster()) {
+      // slow things down a little, otherwise we spam the logs when there are many wake-up events
+      UtilWaitThread.sleep(100);
+
+      int totalUnloaded = 0;
+      int unloaded = 0;
+      try {
+        Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
+        
+        // Get the current status for the current list of tservers
+        SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
+        for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) {
+          currentTServers.put(entry, this.master.tserverStatus.get(entry));
+        }
+        
+        if (currentTServers.size() == 0) {
+          eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+          continue;
+        }
+        
+        // Don't move tablets to servers that are shutting down
+        SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
+        destinations.keySet().removeAll(this.master.serversToShutdown);
+        
+        List<Assignment> assignments = new ArrayList<Assignment>();
+        List<Assignment> assigned = new ArrayList<Assignment>();
+        List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
+        Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
+        
+        int[] counts = new int[TabletState.values().length];
+        stats.begin();
+        // Walk through the tablets in our store, and work tablets
+        // towards their goal
+        for (TabletLocationState tls : store) {
+          if (tls == null) {
+            continue;
+          }
+          // ignore entries for tables that do not exist in zookeeper
+          if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
+            continue;
+          
+          // Don't overwhelm the tablet servers with work
+          if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
+            flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+            assignments.clear();
+            assigned.clear();
+            assignedToDeadServers.clear();
+            unassigned.clear();
+            unloaded = 0;
+            eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+          }
+          Text tableId = tls.extent.getTableId();
+          MergeStats mergeStats = mergeStatsCache.get(tableId);
+          if (mergeStats == null) {
+            mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent)));
+          }
+          TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
+          TServerInstance server = tls.getServer();
+          TabletState state = tls.getState(currentTServers.keySet());
+          stats.update(tableId, state);
+          mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
+          sendChopRequest(mergeStats.getMergeInfo(), state, tls);
+          sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
+          
+          // Always follow through with assignments
+          if (state == TabletState.ASSIGNED) {
+            goal = TabletGoalState.HOSTED;
+          }
+          
+          // if we are shutting down all the tabletservers, we have to do it in order
+          if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
+            if (this.master.serversToShutdown.equals(currentTServers.keySet())) {
+              if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
+                goal = TabletGoalState.HOSTED;
+              }
+            }
+          }
+          
+          if (goal == TabletGoalState.HOSTED) {
+            if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
+              if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs))
+                continue;
+            }
+            switch (state) {
+              case HOSTED:
+                if (server.equals(this.master.migrations.get(tls.extent)))
+                  this.master.migrations.remove(tls.extent);
+                break;
+              case ASSIGNED_TO_DEAD_SERVER:
+                assignedToDeadServers.add(tls);
+                if (server.equals(this.master.migrations.get(tls.extent)))
+                  this.master.migrations.remove(tls.extent);
+                // log.info("Current servers " + currentTServers.keySet());
+                break;
+              case UNASSIGNED:
+                // maybe it's a finishing migration
+                TServerInstance dest = this.master.migrations.get(tls.extent);
+                if (dest != null) {
+                  // if destination is still good, assign it
+                  if (destinations.keySet().contains(dest)) {
+                    assignments.add(new Assignment(tls.extent, dest));
+                  } else {
+                    // get rid of this migration
+                    this.master.migrations.remove(tls.extent);
+                    unassigned.put(tls.extent, server);
+                  }
+                } else {
+                  unassigned.put(tls.extent, server);
+                }
+                break;
+              case ASSIGNED:
+                // Send another reminder
+                assigned.add(new Assignment(tls.extent, tls.future));
+                break;
+            }
+          } else {
+            switch (state) {
+              case UNASSIGNED:
+                break;
+              case ASSIGNED_TO_DEAD_SERVER:
+                assignedToDeadServers.add(tls);
+                // log.info("Current servers " + currentTServers.keySet());
+                break;
+              case HOSTED:
+                TServerConnection conn = this.master.tserverSet.getConnection(server);
+                if (conn != null) {
+                  conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED);
+                  unloaded++;
+                  totalUnloaded++;
+                } else {
+                  Master.log.warn("Could not connect to server " + server);
+                }
+                break;
+              case ASSIGNED:
+                break;
+            }
+          }
+          counts[state.ordinal()]++;
+        }
+        
+        flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+        
+        // provide stats after flushing changes to avoid race conditions w/ delete table
+        stats.end();
+        
+        // Report changes
+        for (TabletState state : TabletState.values()) {
+          int i = state.ordinal();
+          if (counts[i] > 0 && counts[i] != oldCounts[i]) {
+            this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
+          }
+        }
+        Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
+        oldCounts = counts;
+        if (totalUnloaded > 0) {
+          this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
+        }
+        
+        updateMergeState(mergeStatsCache);
+        
+        Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+        eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+      } catch (Exception ex) {
+        Master.log.error("Error processing table state for store " + store.name(), ex);
+        UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
+      }
+    }
+  }
+  
+  private int assignedOrHosted() {
+    int result = 0;
+    for (TableCounts counts : stats.getLast().values()) {
+      result += counts.assigned() + counts.hosted();
+    }
+    return result;
+  }
+  
+  private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+    // Already split?
+    if (!info.getState().equals(MergeState.SPLITTING))
+      return;
+    // Merges don't split
+    if (!info.isDelete())
+      return;
+    // Online and ready to split?
+    if (!state.equals(TabletState.HOSTED))
+      return;
+    // Does this extent cover the end points of the delete?
+    KeyExtent range = info.getExtent();
+    if (tls.extent.overlaps(range)) {
+      for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
+        if (splitPoint == null)
+          continue;
+        if (!tls.extent.contains(splitPoint))
+          continue;
+        if (splitPoint.equals(tls.extent.getEndRow()))
+          continue;
+        if (splitPoint.equals(tls.extent.getPrevEndRow()))
+          continue;
+        try {
+          TServerConnection conn;
+          conn = this.master.tserverSet.getConnection(tls.current);
+          if (conn != null) {
+            Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
+            conn.splitTablet(this.master.masterLock, tls.extent, splitPoint);
+          } else {
+            Master.log.warn("Not connected to server " + tls.current);
+          }
+        } catch (NotServingTabletException e) {
+          Master.log.debug("Error asking tablet server to split a tablet: " + e);
+        } catch (Exception e) {
+          Master.log.warn("Error asking tablet server to split a tablet: " + e);
+        }
+      }
+    }
+  }
+  
+  private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+    // Don't bother if we're in the wrong state
+    if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
+      return;
+    // Tablet must be online
+    if (!state.equals(TabletState.HOSTED))
+      return;
+    // Tablet isn't already chopped
+    if (tls.chopped)
+      return;
+    // Tablet ranges intersect
+    if (info.needsToBeChopped(tls.extent)) {
+      TServerConnection conn;
+      try {
+        conn = this.master.tserverSet.getConnection(tls.current);
+        if (conn != null) {
+          Master.log.info("Asking " + tls.current + " to chop " + tls.extent);
+          conn.chop(this.master.masterLock, tls.extent);
+        } else {
+          Master.log.warn("Could not connect to server " + tls.current);
+        }
+      } catch (TException e) {
+        Master.log.warn("Communications error asking tablet server to chop a tablet");
+      }
+    }
+  }
+  
+  private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
+    for (MergeStats stats : mergeStatsCache.values()) {
+      try {
+        MergeState update = stats.nextMergeState(this.master.getConnector(), this.master);
+        // when next state is MERGING, its important to persist this before
+        // starting the merge... the verification check that is done before
+        // moving into the merging state could fail if merge starts but does
+        // not finish
+        if (update == MergeState.COMPLETE)
+          update = MergeState.NONE;
+        if (update != stats.getMergeInfo().getState()) {
+          this.master.setMergeState(stats.getMergeInfo(), update);
+        }
+        
+        if (update == MergeState.MERGING) {
+          try {
+            if (stats.getMergeInfo().isDelete()) {
+              deleteTablets(stats.getMergeInfo());
+            } else {
+              mergeMetadataRecords(stats.getMergeInfo());
+            }
+            this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
+          } catch (Exception ex) {
+            Master.log.error("Unable merge metadata table records", ex);
+          }
+        }
+      } catch (Exception ex) {
+        Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex);
+      }
+    }
+  }
+  
+  private void deleteTablets(MergeInfo info) throws AccumuloException {
+    KeyExtent extent = info.getExtent();
+    String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME;
+    Master.log.debug("Deleting tablets for " + extent);
+    char timeType = '\0';
+    KeyExtent followingTablet = null;
+    if (extent.getEndRow() != null) {
+      Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW);
+      followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow()));
+      Master.log.debug("Found following tablet " + followingTablet);
+    }
+    try {
+      Connector conn = this.master.getConnector();
+      Text start = extent.getPrevEndRow();
+      if (start == null) {
+        start = new Text();
+      }
+      Master.log.debug("Making file deletion entries for " + extent);
+      Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(),
+          extent.getEndRow()), true);
+      Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+      scanner.setRange(deleteRange);
+      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+      Set<FileRef> datafiles = new TreeSet<FileRef>();
+      for (Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
+          datafiles.add(new FileRef(this.master.fs, key));
+          if (datafiles.size() > 1000) {
+            MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+            datafiles.clear();
+          }
+        } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
+          timeType = entry.getValue().toString().charAt(0);
+        } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
+          throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
+        } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+          datafiles.add(new FileRef(entry.getValue().toString(), this.master.fs.getFullPath(FileType.TABLE, entry.getValue().toString())));
+          if (datafiles.size() > 1000) {
+            MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+            datafiles.clear();
+          }
+        }
+      }
+      MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+      BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+      try {
+        deleteTablets(info, deleteRange, bw, conn);
+      } finally {
+        bw.close();
+      }
+      
+      if (followingTablet != null) {
+        Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow());
+        bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+        try {
+          Mutation m = new Mutation(followingTablet.getMetadataEntry());
+          TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow()));
+          ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+          bw.addMutation(m);
+          bw.flush();
+        } finally {
+          bw.close();
+        }
+      } else {
+        // Recreate the default tablet to hold the end of the table
+        Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow());
+        String tdir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + extent.getTableId() + Constants.DEFAULT_TABLET_LOCATION;
+        MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir,
+            SystemCredentials.get(), timeType, this.master.masterLock);
+      }
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    }
+  }
+  
+  private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
+    KeyExtent range = info.getExtent();
+    Master.log.debug("Merging metadata for " + range);
+    KeyExtent stop = getHighTablet(range);
+    Master.log.debug("Highest tablet is " + stop);
+    Value firstPrevRowValue = null;
+    Text stopRow = stop.getMetadataEntry();
+    Text start = range.getPrevEndRow();
+    if (start == null) {
+      start = new Text();
+    }
+    Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
+    String targetSystemTable = MetadataTable.NAME;
+    if (range.isMeta()) {
+      targetSystemTable = RootTable.NAME;
+    }
+    
+    BatchWriter bw = null;
+    try {
+      long fileCount = 0;
+      Connector conn = this.master.getConnector();
+      // Make file entries in highest tablet
+      bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+      Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+      scanner.setRange(scanRange);
+      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      Mutation m = new Mutation(stopRow);
+      String maxLogicalTime = null;
+      for (Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        Value value = entry.getValue();
+        if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+          m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
+          fileCount++;
+        } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
+          Master.log.debug("prevRow entry for lowest tablet is " + value);
+          firstPrevRowValue = new Value(value);
+        } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
+          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
+        } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+          bw.addMutation(MetadataTableUtil.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
+        }
+      }
+      
+      // read the logical time from the last tablet in the merge range, it is not included in
+      // the loop above
+      scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+      scanner.setRange(new Range(stopRow));
+      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      for (Entry<Key,Value> entry : scanner) {
+        if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
+          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
+        }
+      }
+      
+      if (maxLogicalTime != null)
+        TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
+      
+      if (!m.getUpdates().isEmpty()) {
+        bw.addMutation(m);
+      }
+      
+      bw.flush();
+      
+      Master.log.debug("Moved " + fileCount + " files to " + stop);
+      
+      if (firstPrevRowValue == null) {
+        Master.log.debug("tablet already merged");
+        return;
+      }
+      
+      stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
+      Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
+      Master.log.debug("Setting the prevRow for last tablet: " + stop);
+      bw.addMutation(updatePrevRow);
+      bw.flush();
+      
+      deleteTablets(info, scanRange, bw, conn);
+      
+      // Clean-up the last chopped marker
+      m = new Mutation(stopRow);
+      ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+      bw.addMutation(m);
+      bw.flush();
+      
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    } finally {
+      if (bw != null)
+        try {
+          bw.close();
+        } catch (Exception ex) {
+          throw new AccumuloException(ex);
+        }
+    }
+  }
+  
+  private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
+    Scanner scanner;
+    Mutation m;
+    // Delete everything in the other tablets
+    // group all deletes into tablet into one mutation, this makes tablets
+    // either disappear entirely or not all.. this is important for the case
+    // where the process terminates in the loop below...
+    scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+    Master.log.debug("Deleting range " + scanRange);
+    scanner.setRange(scanRange);
+    RowIterator rowIter = new RowIterator(scanner);
+    while (rowIter.hasNext()) {
+      Iterator<Entry<Key,Value>> row = rowIter.next();
+      m = null;
+      while (row.hasNext()) {
+        Entry<Key,Value> entry = row.next();
+        Key key = entry.getKey();
+        
+        if (m == null)
+          m = new Mutation(key.getRow());
+        
+        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        Master.log.debug("deleting entry " + key);
+      }
+      bw.addMutation(m);
+    }
+    
+    bw.flush();
+  }
+  
+  private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
+    try {
+      Connector conn = this.master.getConnector();
+      Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
+      scanner.setRange(new Range(start.getMetadataEntry(), null));
+      Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+      if (!iterator.hasNext()) {
+        throw new AccumuloException("No last tablet for a merge " + range);
+      }
+      Entry<Key,Value> entry = iterator.next();
+      KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
+      if (highTablet.getTableId() != range.getTableId()) {
+        throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
+      }
+      return highTablet;
+    } catch (Exception ex) {
+      throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
+    }
+  }
+  
+  private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
+      List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
+    if (!assignedToDeadServers.isEmpty()) {
+      int maxServersToShow = min(assignedToDeadServers.size(), 100);
+      Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
+      store.unassign(assignedToDeadServers);
+      this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
+    }
+    
+    if (!currentTServers.isEmpty()) {
+      Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
+      this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
+      for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
+        if (unassigned.containsKey(assignment.getKey())) {
+          if (assignment.getValue() != null) {
+            Master.log.debug(store.name() + " assigning tablet " + assignment);
+            assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
+          }
+        } else {
+          Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
+        }
+      }
+      if (!unassigned.isEmpty() && assignedOut.isEmpty())
+        Master.log.warn("Load balancer failed to assign any tablets");
+    }
+    
+    if (assignments.size() > 0) {
+      Master.log.info(String.format("Assigning %d tablets", assignments.size()));
+      store.setFutureLocations(assignments);
+    }
+    assignments.addAll(assigned);
+    for (Assignment a : assignments) {
+      TServerConnection conn = this.master.tserverSet.getConnection(a.server);
+      if (conn != null) {
+        conn.assignTablet(this.master.masterLock, a.tablet);
+      } else {
+        Master.log.warn("Could not connect to server " + a.server);
+      }
+      master.assignedTablet(a.tablet);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
new file mode 100644
index 0000000..dbe73ac
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.recovery;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.master.recovery.HadoopLogCloser;
+import org.apache.accumulo.server.master.recovery.LogCloser;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+public class RecoveryManager {
+
+  private static Logger log = Logger.getLogger(RecoveryManager.class);
+
+  private Map<String,Long> recoveryDelay = new HashMap<String,Long>();
+  private Set<String> closeTasksQueued = new HashSet<String>();
+  private Set<String> sortsQueued = new HashSet<String>();
+  private ScheduledExecutorService executor;
+  private Master master;
+  private ZooCache zooCache;
+
+  public RecoveryManager(Master master) {
+    this.master = master;
+    executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
+    zooCache = new ZooCache();
+    try {
+      List<String> workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued();
+      sortsQueued.addAll(workIDs);
+    } catch (Exception e) {
+      log.warn(e, e);
+    }
+  }
+
+  private class LogSortTask implements Runnable {
+    private String source;
+    private String destination;
+    private String sortId;
+    private LogCloser closer;
+
+    public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
+      this.closer = closer;
+      this.source = source;
+      this.destination = destination;
+      this.sortId = sortId;
+    }
+
+    @Override
+    public void run() {
+      boolean rescheduled = false;
+      try {
+
+        long time = closer.close(master.getConfiguration().getConfiguration(), master.getFileSystem(), new Path(source));
+
+        if (time > 0) {
+          executor.schedule(this, time, TimeUnit.MILLISECONDS);
+          rescheduled = true;
+        } else {
+          initiateSort(sortId, source, destination);
+        }
+      } catch (FileNotFoundException e) {
+        log.debug("Unable to initate log sort for " + source + ": " + e);
+      } catch (Exception e) {
+        log.warn("Failed to initiate log sort " + source, e);
+      } finally {
+        if (!rescheduled) {
+          synchronized (RecoveryManager.this) {
+            closeTasksQueued.remove(sortId);
+          }
+        }
+      }
+    }
+
+  }
+
+  private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
+    String work = source + "|" + destination;
+    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
+
+    synchronized (this) {
+      sortsQueued.add(sortId);
+    }
+
+    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
+    log.info("Created zookeeper entry " + path + " with data " + work);
+  }
+
+  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
+    boolean recoveryNeeded = false;
+    ;
+    for (Collection<String> logs : walogs) {
+      for (String walog : logs) {
+        String hostFilename[] = walog.split("/", 2);
+        String host = hostFilename[0];
+        String filename = hostFilename[1];
+        String parts[] = filename.split("/");
+        String sortId = parts[parts.length - 1];
+        String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId;
+        filename = master.getFileSystem().getFullPath(FileType.WAL, walog).toString();
+        log.debug("Recovering " + filename + " to " + dest);
+
+        boolean sortQueued;
+        synchronized (this) {
+          sortQueued = sortsQueued.contains(sortId);
+        }
+
+        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
+          synchronized (this) {
+            sortsQueued.remove(sortId);
+          }
+        }
+
+        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
+          synchronized (this) {
+            closeTasksQueued.remove(sortId);
+            recoveryDelay.remove(sortId);
+            sortsQueued.remove(sortId);
+          }
+          continue;
+        }
+
+        recoveryNeeded = true;
+        synchronized (this) {
+          if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
+            AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
+            LogCloser closer = aconf.instantiateClassProperty(Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser());
+            Long delay = recoveryDelay.get(sortId);
+            if (delay == null) {
+              delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
+            } else {
+              delay = Math.min(2 * delay, 1000 * 60 * 5l);
+            }
+
+            log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
+
+            executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
+            closeTasksQueued.add(sortId);
+            recoveryDelay.put(sortId, delay);
+          }
+        }
+      }
+    }
+    return recoveryNeeded;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
new file mode 100644
index 0000000..6b692d8
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class MergeStats {
+  final static private Logger log = Logger.getLogger(MergeStats.class);
+  MergeInfo info;
+  int hosted = 0;
+  int unassigned = 0;
+  int chopped = 0;
+  int needsToBeChopped = 0;
+  int total = 0;
+  boolean lowerSplit = false;
+  boolean upperSplit = false;
+  
+  public MergeStats(MergeInfo info) {
+    this.info = info;
+    if (info.getState().equals(MergeState.NONE))
+      return;
+    if (info.getExtent().getEndRow() == null)
+      upperSplit = true;
+    if (info.getExtent().getPrevEndRow() == null)
+      lowerSplit = true;
+  }
+  
+  public MergeInfo getMergeInfo() {
+    return info;
+  }
+  
+  public void update(KeyExtent ke, TabletState state, boolean chopped, boolean hasWALs) {
+    if (info.getState().equals(MergeState.NONE))
+      return;
+    if (!upperSplit && info.getExtent().getEndRow().equals(ke.getPrevEndRow())) {
+      log.info("Upper split found");
+      upperSplit = true;
+    }
+    if (!lowerSplit && info.getExtent().getPrevEndRow().equals(ke.getEndRow())) {
+      log.info("Lower split found");
+      lowerSplit = true;
+    }
+    if (!info.overlaps(ke))
+      return;
+    if (info.needsToBeChopped(ke)) {
+      this.needsToBeChopped++;
+      if (chopped) {
+        if (state.equals(TabletState.HOSTED)) {
+          this.chopped++;
+        } else if (!hasWALs) {
+          this.chopped++;
+        }
+      }
+    }
+    this.total++;
+    if (state.equals(TabletState.HOSTED))
+      this.hosted++;
+    if (state.equals(TabletState.UNASSIGNED))
+      this.unassigned++;
+  }
+  
+  public MergeState nextMergeState(Connector connector, CurrentState master) throws Exception {
+    MergeState state = info.getState();
+    if (state == MergeState.NONE)
+      return state;
+    if (total == 0) {
+      log.trace("failed to see any tablets for this range, ignoring " + info.getExtent());
+      return state;
+    }
+    log.info("Computing next merge state for " + info.getExtent() + " which is presently " + state + " isDelete : " + info.isDelete());
+    if (state == MergeState.STARTED) {
+      state = MergeState.SPLITTING;
+    }
+    if (state == MergeState.SPLITTING) {
+      log.info(hosted + " are hosted, total " + total);
+      if (!info.isDelete() && total == 1) {
+        log.info("Merge range is already contained in a single tablet " + info.getExtent());
+        state = MergeState.COMPLETE;
+      } else if (hosted == total) {
+        if (info.isDelete()) {
+          if (!lowerSplit)
+            log.info("Waiting for " + info + " lower split to occur " + info.getExtent());
+          else if (!upperSplit)
+            log.info("Waiting for " + info + " upper split to occur " + info.getExtent());
+          else
+            state = MergeState.WAITING_FOR_CHOPPED;
+        } else {
+          state = MergeState.WAITING_FOR_CHOPPED;
+        }
+      } else {
+        log.info("Waiting for " + hosted + " hosted tablets to be " + total + " " + info.getExtent());
+      }
+    }
+    if (state == MergeState.WAITING_FOR_CHOPPED) {
+      log.info(chopped + " tablets are chopped " + info.getExtent());
+      if (chopped == needsToBeChopped) {
+        state = MergeState.WAITING_FOR_OFFLINE;
+      } else {
+        log.info("Waiting for " + chopped + " chopped tablets to be " + needsToBeChopped + " " + info.getExtent());
+      }
+    }
+    if (state == MergeState.WAITING_FOR_OFFLINE) {
+      if (chopped != needsToBeChopped) {
+        log.warn("Unexpected state: chopped tablets should be " + needsToBeChopped + " was " + chopped + " merge " + info.getExtent());
+        // Perhaps a split occurred after we chopped, but before we went offline: start over
+        state = MergeState.WAITING_FOR_CHOPPED;
+      } else {
+        log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getExtent());
+        if (unassigned == total && chopped == needsToBeChopped) {
+          if (verifyMergeConsistency(connector, master))
+            state = MergeState.MERGING;
+          else
+            log.info("Merge consistency check failed " + info.getExtent());
+        } else {
+          log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + " " + info.getExtent());
+        }
+      }
+    }
+    if (state == MergeState.MERGING) {
+      if (hosted != 0) {
+        // Shouldn't happen
+        log.error("Unexpected state: hosted tablets should be zero " + hosted + " merge " + info.getExtent());
+        state = MergeState.WAITING_FOR_OFFLINE;
+      }
+      if (unassigned != total) {
+        // Shouldn't happen
+        log.error("Unexpected state: unassigned tablets should be " + total + " was " + unassigned + " merge " + info.getExtent());
+        state = MergeState.WAITING_FOR_CHOPPED;
+      }
+      log.info(unassigned + " tablets are unassigned " + info.getExtent());
+    }
+    return state;
+  }
+  
+  private boolean verifyMergeConsistency(Connector connector, CurrentState master) throws TableNotFoundException, IOException {
+    MergeStats verify = new MergeStats(info);
+    KeyExtent extent = info.getExtent();
+    Scanner scanner = connector.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+    MetaDataTableScanner.configureScanner(scanner, master);
+    Text start = extent.getPrevEndRow();
+    if (start == null) {
+      start = new Text();
+    }
+    Text tableId = extent.getTableId();
+    Text first = KeyExtent.getMetadataEntry(tableId, start);
+    Range range = new Range(first, false, null, true);
+    scanner.setRange(range);
+    KeyExtent prevExtent = null;
+    
+    log.debug("Scanning range " + range);
+    for (Entry<Key,Value> entry : scanner) {
+      TabletLocationState tls;
+      try {
+        tls = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
+      } catch (BadLocationStateException e) {
+        log.error(e, e);
+        return false;
+      }
+      log.debug("consistency check: " + tls + " walogs " + tls.walogs.size());
+      if (!tls.extent.getTableId().equals(tableId)) {
+        break;
+      }
+      
+      if (!tls.walogs.isEmpty() && verify.getMergeInfo().needsToBeChopped(tls.extent)) {
+        log.debug("failing consistency: needs to be chopped" + tls.extent);
+        return false;
+      }
+      
+      if (prevExtent == null) {
+        // this is the first tablet observed, it must be offline and its prev row must be less than the start of the merge range
+        if (tls.extent.getPrevEndRow() != null && tls.extent.getPrevEndRow().compareTo(start) > 0) {
+          log.debug("failing consistency: prev row is too high " + start);
+          return false;
+        }
+        
+        if (tls.getState(master.onlineTabletServers()) != TabletState.UNASSIGNED) {
+          log.debug("failing consistency: assigned or hosted " + tls);
+          return false;
+        }
+        
+      } else if (!tls.extent.isPreviousExtent(prevExtent)) {
+        log.debug("hole in " + MetadataTable.NAME);
+        return false;
+      }
+      
+      prevExtent = tls.extent;
+      
+      verify.update(tls.extent, tls.getState(master.onlineTabletServers()), tls.chopped, !tls.walogs.isEmpty());
+      // stop when we've seen the tablet just beyond our range
+      if (tls.extent.getPrevEndRow() != null && extent.getEndRow() != null && tls.extent.getPrevEndRow().compareTo(extent.getEndRow()) > 0) {
+        break;
+      }
+    }
+    log.debug("chopped " + chopped + " v.chopped " + verify.chopped + " unassigned " + unassigned + " v.unassigned " + verify.unassigned + " verify.total "
+        + verify.total);
+    return chopped == verify.chopped && unassigned == verify.unassigned && unassigned == verify.total;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    ClientOpts opts = new ClientOpts();
+    opts.parseArgs(MergeStats.class.getName(), args);
+    
+    Connector conn = opts.getConnector();
+    Map<String,String> tableIdMap = conn.tableOperations().tableIdMap();
+    for (String table : tableIdMap.keySet()) {
+      String tableId = tableIdMap.get(table);
+      String path = ZooUtil.getRoot(conn.getInstance().getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
+      MergeInfo info = new MergeInfo();
+      if (ZooReaderWriter.getInstance().exists(path)) {
+        byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
+        DataInputBuffer in = new DataInputBuffer();
+        in.reset(data, data.length);
+        info.readFields(in);
+      }
+      System.out.println(String.format("%25s  %10s %10s %s", table, info.getState(), info.getOperation(), info.getExtent()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java b/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
new file mode 100644
index 0000000..eff8baa
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+public class SetGoalState {
+  
+  /**
+   * Utility program that will change the goal state for the master from the command line.
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length != 1 || MasterGoalState.valueOf(args[0]) == null) {
+      System.err.println("Usage: accumulo " + SetGoalState.class.getName() + " [NORMAL|SAFE_MODE|CLEAN_STOP]");
+      System.exit(-1);
+    }
+    SecurityUtil.serverLogin();
+
+    VolumeManager fs = VolumeManagerImpl.get();
+    Accumulo.waitForZookeeperAndHdfs(fs);
+    ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
+        NodeExistsPolicy.OVERWRITE);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
new file mode 100644
index 0000000..4ebd745
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import org.apache.accumulo.server.master.state.TabletState;
+
+public class TableCounts {
+  int counts[] = new int[TabletState.values().length];
+  
+  public int unassigned() {
+    return counts[TabletState.UNASSIGNED.ordinal()];
+  }
+  
+  public int assigned() {
+    return counts[TabletState.ASSIGNED.ordinal()];
+  }
+  
+  public int assignedToDeadServers() {
+    return counts[TabletState.ASSIGNED_TO_DEAD_SERVER.ordinal()];
+  }
+  
+  public int hosted() {
+    return counts[TabletState.HOSTED.ordinal()];
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/state/TableStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/TableStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/TableStats.java
new file mode 100644
index 0000000..f088a5d
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/TableStats.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.hadoop.io.Text;
+
+public class TableStats {
+  private Map<Text,TableCounts> last = new HashMap<Text,TableCounts>();
+  private Map<Text,TableCounts> next;
+  private long startScan = 0;
+  private long endScan = 0;
+  
+  public synchronized void begin() {
+    next = new HashMap<Text,TableCounts>();
+    startScan = System.currentTimeMillis();
+  }
+  
+  public synchronized void update(Text tableId, TabletState state) {
+    TableCounts counts = next.get(tableId);
+    if (counts == null) {
+      counts = new TableCounts();
+      next.put(tableId, counts);
+    }
+    counts.counts[state.ordinal()]++;
+  }
+  
+  public synchronized void end() {
+    last = next;
+    next = null;
+    endScan = System.currentTimeMillis();
+  }
+  
+  public synchronized Map<Text,TableCounts> getLast() {
+    return last;
+  }
+  
+  public synchronized TableCounts getLast(Text tableId) {
+    TableCounts result = last.get(tableId);
+    if (result == null)
+      return new TableCounts();
+    return result;
+  }
+  
+  public synchronized long getScanTime() {
+    if (endScan <= startScan)
+      return System.currentTimeMillis() - startScan;
+    return endScan - startScan;
+  }
+  
+  public synchronized long lastScanFinished() {
+    return endScan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
new file mode 100644
index 0000000..a340b50
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tablets.UniqueNameAllocator;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.trace.instrument.TraceExecutorService;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+/*
+ * Bulk import makes requests of tablet servers, and those requests can take a
+ * long time. Our communications to the tablet server may fail, so we won't know
+ * the status of the request. The master will repeat failed requests so now
+ * there are multiple requests to the tablet server. The tablet server will not
+ * execute the request multiple times, so long as the marker it wrote in the
+ * metadata table stays there. The master needs to know when all requests have
+ * finished so it can remove the markers. Did it start? Did it finish? We can see
+ * that *a* request completed by seeing the flag written into the metadata
+ * table, but we won't know if some other rogue thread is still waiting to start
+ * a thread and repeat the operation.
+ * 
+ * The master can ask the tablet server if it has any requests still running.
+ * Except the tablet server might have some thread about to start a request, but
+ * before it has made any bookkeeping about the request. To prevent problems
+ * like this, an Arbitrator is used. Before starting any new request, the tablet
+ * server checks the Arbitrator to see if the request is still valid.
+ * 
+ */
+
+public class BulkImport extends MasterRepo {
+  public static final String FAILURES_TXT = "failures.txt";
+  
+  private static final long serialVersionUID = 1L;
+  
+  private static final Logger log = Logger.getLogger(BulkImport.class);
+  
+  private String tableId;
+  private String sourceDir;
+  private String errorDir;
+  private boolean setTime;
+  
+  public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
+    this.tableId = tableId;
+    this.sourceDir = sourceDir;
+    this.errorDir = errorDir;
+    this.setTime = setTime;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    if (!Utils.getReadLock(tableId, tid).tryLock())
+      return 100;
+    
+    Instance instance = HdfsZooInstance.getInstance();
+    Tables.clearCache(instance);
+    if (Tables.getTableState(instance, tableId) == TableState.ONLINE) {
+      long reserve1, reserve2;
+      reserve1 = reserve2 = Utils.reserveHdfsDirectory(sourceDir, tid);
+      if (reserve1 == 0)
+        reserve2 = Utils.reserveHdfsDirectory(errorDir, tid);
+      return reserve2;
+    } else {
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
+    }
+  }
+  
+  @Override
+  //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
+  @SuppressWarnings("deprecation")
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    log.debug(" tid " + tid + " sourceDir " + sourceDir);
+    
+    Utils.getReadLock(tableId, tid).lock();
+    
+    // check that the error directory exists and is empty
+    VolumeManager fs = master.getFileSystem();
+    
+    Path errorPath = new Path(errorDir);
+    FileStatus errorStatus = null;
+    try {
+      errorStatus = fs.getFileStatus(errorPath);
+    } catch (FileNotFoundException ex) {
+      // ignored
+    }
+    if (errorStatus == null)
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
+          + " does not exist");
+    if (!errorStatus.isDir())
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
+          + " is not a directory");
+    if (fs.listStatus(errorPath).length != 0)
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
+          + " is not empty");
+    
+    ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
+    
+    // move the files into the directory
+    try {
+      String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
+      log.debug(" tid " + tid + " bulkDir " + bulkDir);
+      return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime);
+    } catch (IOException ex) {
+      log.error("error preparing the bulk import directory", ex);
+      throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, sourceDir + ": "
+          + ex);
+    }
+  }
+  
+  private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
+    
+    String tableDir = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()).toString();
+    
+    if (tableDir == null)
+      throw new IllegalStateException(sourceDir + " is not in a known namespace");
+    Path directory = new Path(tableDir + "/" + tableId);
+    fs.mkdirs(directory);
+    
+    // only one should be able to create the lock file
+    // the purpose of the lock file is to avoid a race
+    // condition between the call to fs.exists() and
+    // fs.mkdirs()... if only hadoop had a mkdir() function
+    // that failed when the dir existed
+    
+    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    
+    while (true) {
+      Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
+      if (fs.exists(newBulkDir)) // sanity check
+        throw new IllegalStateException("Dir exist when it should not " + newBulkDir);
+      if (fs.mkdirs(newBulkDir))
+        return newBulkDir;
+      log.warn("Failed to create " + newBulkDir + " for unknown reason");
+      
+      UtilWaitThread.sleep(3000);
+    }
+  }
+
+  //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
+  @SuppressWarnings("deprecation")
+  private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
+    Path bulkDir = createNewBulkDir(fs, tableId);
+    
+    MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
+    
+    Path dirPath = new Path(dir);
+    FileStatus[] mapFiles = fs.listStatus(dirPath);
+    
+    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    
+    for (FileStatus fileStatus : mapFiles) {
+      String sa[] = fileStatus.getPath().getName().split("\\.");
+      String extension = "";
+      if (sa.length > 1) {
+        extension = sa[sa.length - 1];
+        
+        if (!FileOperations.getValidExtensions().contains(extension)) {
+          log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
+          continue;
+        }
+      } else {
+        // assume it is a map file
+        extension = Constants.MAPFILE_EXTENSION;
+      }
+      
+      if (extension.equals(Constants.MAPFILE_EXTENSION)) {
+        if (!fileStatus.isDir()) {
+          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+          continue;
+        }
+        
+        if (fileStatus.getPath().getName().equals("_logs")) {
+          log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping");
+          continue;
+        }
+        try {
+          FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME));
+          if (dataStatus.isDir()) {
+            log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+            continue;
+          }
+        } catch (FileNotFoundException fnfe) {
+          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+          continue;
+        }
+      }
+      
+      String newName = "I" + namer.getNextName() + "." + extension;
+      Path newPath = new Path(bulkDir, newName);
+      try {
+        fs.rename(fileStatus.getPath(), newPath);
+        log.debug("Moved " + fileStatus.getPath() + " to " + newPath);
+      } catch (IOException E1) {
+        log.error("Could not move: " + fileStatus.getPath().toString() + " " + E1.getMessage());
+      }
+    }
+    return bulkDir.toString();
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    // unreserve source/error directories
+    Utils.unreserveHdfsDirectory(sourceDir, tid);
+    Utils.unreserveHdfsDirectory(errorDir, tid);
+    Utils.getReadLock(tableId, tid).unlock();
+  }
+}
+
+class CleanUpBulkImport extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
+  
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    log.debug("removing the bulk processing flag file in " + bulk);
+    Path bulkDir = new Path(bulk);
+    MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
+    MetadataTableUtil.addDeleteEntry(tableId, "/" + bulkDir.getName());
+    log.debug("removing the metadata table markers for loaded files");
+    Connector conn = master.getConnector();
+    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
+    log.debug("releasing HDFS reservations for " + source + " and " + error);
+    Utils.unreserveHdfsDirectory(source, tid);
+    Utils.unreserveHdfsDirectory(error, tid);
+    Utils.getReadLock(tableId, tid).unlock();
+    log.debug("completing bulk import transaction " + tid);
+    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
+    return null;
+  }
+}
+
+class CompleteBulkImport extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CompleteBulkImport(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
+    return new CopyFailed(tableId, source, bulk, error);
+  }
+}
+
+class CopyFailed extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CopyFailed(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    Set<TServerInstance> finished = new HashSet<TServerInstance>();
+    Set<TServerInstance> running = master.onlineTabletServers();
+    for (TServerInstance server : running) {
+      try {
+        TServerConnection client = master.getConnection(server);
+        if (client != null && !client.isActive(tid))
+          finished.add(server);
+      } catch (TException ex) {
+        log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
+      }
+    }
+    if (finished.containsAll(running))
+      return 0;
+    return 500;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    // This needs to execute after the arbiter is stopped
+    
+    VolumeManager fs = master.getFileSystem();
+    
+    if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
+      return new CleanUpBulkImport(tableId, source, bulk, error);
+    
+    HashMap<String,String> failures = new HashMap<String,String>();
+    HashMap<String,String> loadedFailures = new HashMap<String,String>();
+    
+    FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
+    BufferedReader in = new BufferedReader(new InputStreamReader(failFile));
+    try {
+      String line = null;
+      while ((line = in.readLine()) != null) {
+        Path path = new Path(line);
+        if (!fs.exists(new Path(error, path.getName())))
+          failures.put("/" + path.getParent().getName() + "/" + path.getName(), line);
+      }
+    } finally {
+      failFile.close();
+    }
+    
+    /*
+     * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
+     * have no loaded markers.
+     */
+    
+    // determine which failed files were loaded
+    Connector conn = master.getConnector();
+    Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
+    mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+    mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+    
+    for (Entry<Key,Value> entry : mscanner) {
+      if (Long.parseLong(entry.getValue().toString()) == tid) {
+        String loadedFile = entry.getKey().getColumnQualifier().toString();
+        String absPath = failures.remove(loadedFile);
+        if (absPath != null) {
+          loadedFailures.put(loadedFile, absPath);
+        }
+      }
+    }
+    
+    // move failed files that were not loaded
+    for (String failure : failures.values()) {
+      Path orig = new Path(failure);
+      Path dest = new Path(error, orig.getName());
+      fs.rename(orig, dest);
+      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
+    }
+    
+    if (loadedFailures.size() > 0) {
+      DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+          + Constants.ZBULK_FAILED_COPYQ);
+      
+      HashSet<String> workIds = new HashSet<String>();
+      
+      for (String failure : loadedFailures.values()) {
+        Path orig = new Path(failure);
+        Path dest = new Path(error, orig.getName());
+        
+        if (fs.exists(dest))
+          continue;
+        
+        bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes());
+        workIds.add(orig.getName());
+        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
+      }
+      
+      bifCopyQueue.waitUntilDone(workIds);
+    }
+    
+    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
+    return new CleanUpBulkImport(tableId, source, bulk, error);
+  }
+  
+}
+
+class LoadFiles extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private static ExecutorService threadPool = null;
+  static {
+    
+  }
+  private static final Logger log = Logger.getLogger(BulkImport.class);
+  
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String errorDir;
+  private boolean setTime;
+  
+  public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.errorDir = errorDir;
+    this.setTime = setTime;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    if (master.onlineTabletServers().size() == 0)
+      return 500;
+    return 0;
+  }
+  
+  synchronized void initializeThreadPool(Master master) {
+    if (threadPool == null) {
+      int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
+      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
+      pool.allowCoreThreadTimeOut(true);
+      threadPool = new TraceExecutorService(pool);
+    }
+  }
+  
+  @Override
+  public Repo<Master> call(final long tid, final Master master) throws Exception {
+    initializeThreadPool(master);
+    final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
+    VolumeManager fs = master.getFileSystem();
+    List<FileStatus> files = new ArrayList<FileStatus>();
+    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
+      files.add(entry);
+    }
+    log.debug("tid " + tid + " importing " + files.size() + " files");
+    
+    Path writable = new Path(this.errorDir, ".iswritable");
+    if (!fs.createNewFile(writable)) {
+      // Maybe this is a re-try... clear the flag and try again
+      fs.delete(writable);
+      if (!fs.createNewFile(writable))
+        throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
+            "Unable to write to " + this.errorDir);
+    }
+    fs.delete(writable);
+    
+    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
+    for (FileStatus f : files)
+      filesToLoad.add(f.getPath().toString());
+    
+    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
+    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
+      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
+      
+      if (master.onlineTabletServers().size() == 0)
+        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
+      
+      while (master.onlineTabletServers().size() == 0) {
+        UtilWaitThread.sleep(500);
+      }
+      
+      // Use the threadpool to assign files one-at-a-time to the server
+      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
+      for (final String file : filesToLoad) {
+        results.add(threadPool.submit(new Callable<List<String>>() {
+          @Override
+          public List<String> call() {
+            List<String> failures = new ArrayList<String>();
+            ClientService.Client client = null;
+            String server = null;
+            try {
+              // get a connection to a random tablet server, do not prefer cached connections because
+              // this is running on the master and there are lots of connections to tablet servers
+              // serving the !METADATA tablets
+              long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
+              client = pair.getSecond();
+              server = pair.getFirst();
+              List<String> attempt = Collections.singletonList(file);
+              log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
+              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
+                  errorDir, setTime);
+              if (fail.isEmpty()) {
+                loaded.add(file);
+              } else {
+                failures.addAll(fail);
+              }
+            } catch (Exception ex) {
+              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
+            } finally {
+              ServerClient.close(client);
+            }
+            return failures;
+          }
+        }));
+      }
+      Set<String> failures = new HashSet<String>();
+      for (Future<List<String>> f : results)
+        failures.addAll(f.get());
+      filesToLoad.removeAll(loaded);
+      if (filesToLoad.size() > 0) {
+        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
+        UtilWaitThread.sleep(100);
+      }
+    }
+    
+    FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
+    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile));
+    try {
+      for (String f : filesToLoad) {
+        out.write(f);
+        out.write("\n");
+      }
+    } finally {
+      out.close();
+    }
+    
+    // return the next step, which will perform cleanup
+    return new CompleteBulkImport(tableId, source, bulk, errorDir);
+  }
+  
+  static String sampleList(Collection<?> potentiallyLongList, int max) {
+    StringBuffer result = new StringBuffer();
+    result.append("[");
+    int i = 0;
+    for (Object obj : potentiallyLongList) {
+      result.append(obj);
+      if (i >= max) {
+        result.append("...");
+        break;
+      } else {
+        result.append(", ");
+      }
+      i++;
+    }
+    if (i < max)
+      result.delete(result.length() - 2, result.length());
+    result.append("]");
+    return result.toString();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
new file mode 100644
index 0000000..dd4c229
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+class FinishCancelCompaction extends MasterRepo {
+  private static final long serialVersionUID = 1L;
+  private String tableId;
+  
+  public FinishCancelCompaction(String tableId) {
+    this.tableId = tableId;
+  }
+
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    Utils.getReadLock(tableId, tid).unlock();
+    return null;
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    
+  }
+}
+
+/**
+ * 
+ */
+public class CancelCompactions extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  private String tableId;
+  
+  public CancelCompactions(String tableId) {
+    this.tableId = tableId;
+  }
+
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT_CANCEL);
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    String zCompactID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
+    String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
+        + Constants.ZTABLE_COMPACT_CANCEL_ID;
+    
+    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    
+    byte[] currentValue = zoo.getData(zCompactID, null);
+    
+    String cvs = new String(currentValue);
+    String[] tokens = cvs.split(",");
+    final long flushID = Long.parseLong(new String(tokens[0]));
+    
+    zoo.mutate(zCancelID, null, null, new Mutator() {
+      @Override
+      public byte[] mutate(byte[] currentValue) throws Exception {
+        long cid = Long.parseLong(new String(currentValue));
+        
+        if (cid < flushID)
+          return (flushID + "").getBytes();
+        else
+          return (cid + "").getBytes();
+
+      }
+    });
+
+    return new FinishCancelCompaction(tableId);
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    Utils.unreserveTable(tableId, tid, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
new file mode 100644
index 0000000..697c15e
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChangeTableState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.log4j.Logger;
+
+public class ChangeTableState extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  private String tableId;
+  private TableOperation top;
+  
+  public ChangeTableState(String tableId, TableOperation top) {
+    this.tableId = tableId;
+    this.top = top;
+    
+    if (top != TableOperation.ONLINE && top != TableOperation.OFFLINE)
+      throw new IllegalArgumentException(top.toString());
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    // reserve the table so that this op does not run concurrently with create, clone, or delete table
+    return Utils.reserveTable(tableId, tid, true, true, top);
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master env) throws Exception {
+    
+    TableState ts = TableState.ONLINE;
+    if (top == TableOperation.OFFLINE)
+      ts = TableState.OFFLINE;
+    
+    TableManager.getInstance().transitionTableState(tableId, ts);
+    Utils.unreserveTable(tableId, tid, true);
+    Logger.getLogger(ChangeTableState.class).debug("Changed table state " + tableId + " " + ts);
+    env.getEventCoordinator().event("Set table state of %s to %s", tableId, ts);
+    return null;
+  }
+  
+  @Override
+  public void undo(long tid, Master env) throws Exception {
+    Utils.unreserveTable(tableId, tid, true);
+  }
+}