You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/12/03 22:18:51 UTC

[13/14] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
	core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
	core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
	core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
	server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
	server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8a34937a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8a34937a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8a34937a

Branch: refs/heads/master
Commit: 8a34937aa38bd8b989c21a89dc62697db645e6fa
Parents: f1f28be cccdb8c
Author: Christopher Tubbs <ct...@apache.org>
Authored: Tue Dec 3 16:15:01 2013 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Tue Dec 3 16:15:01 2013 -0500

----------------------------------------------------------------------
 .../core/client/mapred/InputFormatBase.java     |  5 +-
 .../core/client/mapreduce/InputFormatBase.java  |  4 +-
 .../mapreduce/AccumuloInputFormatTest.java      |  4 +-
 .../apache/accumulo/server/init/Initialize.java | 97 ++++++++++----------
 .../accumulo/server/init/InitializeTest.java    | 17 ++--
 .../accumulo/tserver/log/MultiReaderTest.java   |  6 +-
 6 files changed, 69 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8a34937a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
index b6a8258,fe75453..25db582
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
@@@ -17,22 -17,48 +17,21 @@@
  package org.apache.accumulo.core.client.mapred;
  
  import java.io.IOException;
 -import java.net.InetAddress;
 -import java.nio.ByteBuffer;
 -import java.util.ArrayList;
  import java.util.Collection;
 -import java.util.HashMap;
 -import java.util.Iterator;
  import java.util.List;
 -import java.util.Map;
 -import java.util.Map.Entry;
  import java.util.Set;
  
 -import org.apache.accumulo.core.Constants;
 -import org.apache.accumulo.core.client.AccumuloException;
 -import org.apache.accumulo.core.client.AccumuloSecurityException;
  import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 -import org.apache.accumulo.core.client.Connector;
 -import org.apache.accumulo.core.client.Instance;
  import org.apache.accumulo.core.client.IsolatedScanner;
  import org.apache.accumulo.core.client.IteratorSetting;
 -import org.apache.accumulo.core.client.RowIterator;
  import org.apache.accumulo.core.client.Scanner;
 -import org.apache.accumulo.core.client.TableDeletedException;
  import org.apache.accumulo.core.client.TableNotFoundException;
 -import org.apache.accumulo.core.client.TableOfflineException;
 -import org.apache.accumulo.core.client.ZooKeeperInstance;
 -import org.apache.accumulo.core.client.impl.OfflineScanner;
 -import org.apache.accumulo.core.client.impl.Tables;
  import org.apache.accumulo.core.client.impl.TabletLocator;
- import org.apache.accumulo.core.client.mapred.RangeInputSplit;
  import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 -import org.apache.accumulo.core.client.mock.MockInstance;
 -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
  import org.apache.accumulo.core.data.Key;
 -import org.apache.accumulo.core.data.KeyExtent;
 -import org.apache.accumulo.core.data.PartialKey;
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.data.Value;
 -import org.apache.accumulo.core.master.state.tables.TableState;
 -import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.security.CredentialHelper;
 -import org.apache.accumulo.core.security.thrift.TCredentials;
  import org.apache.accumulo.core.util.Pair;
 -import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapred.InputFormat;
  import org.apache.hadoop.mapred.InputSplit;
@@@ -328,16 -544,367 +327,16 @@@ public abstract class InputFormatBase<K
       * @param scanner
       *          the scanner to configure
       */
 -    protected void setupIterators(List<IteratorSetting> iterators, Scanner scanner) {
 -      for (IteratorSetting iterator : iterators) {
 -        scanner.addScanIterator(iterator);
 -      }
 -    }
 -
 -    /**
 -     * Initialize a scanner over the given input split using this task attempt configuration.
 -     */
 -    public void initialize(InputSplit inSplit, JobConf job) throws IOException {
 -      Scanner scanner;
 -      split = (RangeInputSplit) inSplit;
 -      log.debug("Initializing input split: " + split.getRange());
 -
 -      Instance instance = split.getInstance();
 -      if (null == instance) {
 -        instance = getInstance(job);
 -      }
 -
 -      String principal = split.getPrincipal();
 -      if (null == principal) {
 -        principal = getPrincipal(job);
 -      }
 -
 -      AuthenticationToken token = split.getToken();
 -      if (null == token) {
 -        String tokenClass = getTokenClass(job);
 -        byte[] tokenBytes = getToken(job);
 -        try {
 -          token = CredentialHelper.extractToken(tokenClass, tokenBytes);
 -        } catch (AccumuloSecurityException e) {
 -          throw new IOException(e);
 -        }
 -      }
 -
 -      Authorizations authorizations = split.getAuths();
 -      if (null == authorizations) {
 -        authorizations = getScanAuthorizations(job);
 -      }
 -
 -      String table = split.getTable();
 -      if (null == table) {
 -        table = getInputTableName(job);
 -      }
 -
 -      Boolean isOffline = split.isOffline();
 -      if (null == isOffline) {
 -        isOffline = isOfflineScan(job);
 -      }
 -
 -      Boolean isIsolated = split.isIsolatedScan();
 -      if (null == isIsolated) {
 -        isIsolated = isIsolated(job);
 -      }
 -
 -      Boolean usesLocalIterators = split.usesLocalIterators();
 -      if (null == usesLocalIterators) {
 -        usesLocalIterators = usesLocalIterators(job);
 -      }
 -
 +    protected void setupIterators(JobConf job, Scanner scanner, RangeInputSplit split) {
        List<IteratorSetting> iterators = split.getIterators();
-       
++
        if (null == iterators) {
          iterators = getIterators(job);
        }
-       
+ 
 -      Set<Pair<Text,Text>> columns = split.getFetchedColumns();
 -      if (null == columns) {
 -        columns = getFetchedColumns(job);
 -      }
 -
 -      try {
 -        log.debug("Creating connector with user: " + principal);
 -        Connector conn = instance.getConnector(principal, token);
 -        log.debug("Creating scanner for table: " + table);
 -        log.debug("Authorizations are: " + authorizations);
 -        if (isOfflineScan(job)) {
 -          String tokenClass = token.getClass().getCanonicalName();
 -          ByteBuffer tokenBuffer = ByteBuffer.wrap(CredentialHelper.toBytes(token));
 -          scanner = new OfflineScanner(instance, new TCredentials(principal, tokenClass, tokenBuffer, instance.getInstanceID()), Tables.getTableId(instance,
 -              table), authorizations);
 -        } else {
 -          scanner = conn.createScanner(table, authorizations);
 -        }
 -        if (isIsolated) {
 -          log.info("Creating isolated scanner");
 -          scanner = new IsolatedScanner(scanner);
 -        }
 -        if (usesLocalIterators) {
 -          log.info("Using local iterators");
 -          scanner = new ClientSideIteratorScanner(scanner);
 -        }
 -        setupIterators(iterators, scanner);
 -      } catch (Exception e) {
 -        throw new IOException(e);
 -      }
 -
 -      // setup a scanner within the bounds of this split
 -      for (Pair<Text,Text> c : columns) {
 -        if (c.getSecond() != null) {
 -          log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
 -          scanner.fetchColumn(c.getFirst(), c.getSecond());
 -        } else {
 -          log.debug("Fetching column family " + c.getFirst());
 -          scanner.fetchColumnFamily(c.getFirst());
 -        }
 -      }
 -
 -      scanner.setRange(split.getRange());
 -
 -      numKeysRead = 0;
 -
 -      // do this last after setting all scanner options
 -      scannerIterator = scanner.iterator();
 -    }
 -
 -    @Override
 -    public void close() {}
 -
 -    @Override
 -    public long getPos() throws IOException {
 -      return numKeysRead;
 -    }
 -
 -    @Override
 -    public float getProgress() throws IOException {
 -      if (numKeysRead > 0 && currentKey == null)
 -        return 1.0f;
 -      return split.getProgress(currentKey);
 -    }
 -
 -    protected Key currentKey = null;
 -
 -  }
 -
 -  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException,
 -      AccumuloSecurityException {
 -
 -    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 -
 -    Instance instance = getInstance(job);
 -    Connector conn = instance.getConnector(getPrincipal(job), CredentialHelper.extractToken(getTokenClass(job), getToken(job)));
 -    String tableId = Tables.getTableId(instance, tableName);
 -
 -    if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
 -      Tables.clearCache(instance);
 -      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
 -        throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
 -      }
 -    }
 -
 -    for (Range range : ranges) {
 -      Text startRow;
 -
 -      if (range.getStartKey() != null)
 -        startRow = range.getStartKey().getRow();
 -      else
 -        startRow = new Text();
 -
 -      Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
 -      Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 -      Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
 -      scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY);
 -      scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
 -      scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
 -      scanner.setRange(metadataRange);
 -
 -      RowIterator rowIter = new RowIterator(scanner);
 -
 -      KeyExtent lastExtent = null;
 -
 -      while (rowIter.hasNext()) {
 -        Iterator<Entry<Key,Value>> row = rowIter.next();
 -        String last = "";
 -        KeyExtent extent = null;
 -        String location = null;
 -
 -        while (row.hasNext()) {
 -          Entry<Key,Value> entry = row.next();
 -          Key key = entry.getKey();
 -
 -          if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) {
 -            last = entry.getValue().toString();
 -          }
 -
 -          if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
 -              || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
 -            location = entry.getValue().toString();
 -          }
 -
 -          if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
 -            extent = new KeyExtent(key.getRow(), entry.getValue());
 -          }
 -
 -        }
 -
 -        if (location != null)
 -          return null;
 -
 -        if (!extent.getTableId().toString().equals(tableId)) {
 -          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
 -        }
 -
 -        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
 -          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
 -        }
 -
 -        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
 -        if (tabletRanges == null) {
 -          tabletRanges = new HashMap<KeyExtent,List<Range>>();
 -          binnedRanges.put(last, tabletRanges);
 -        }
 -
 -        List<Range> rangeList = tabletRanges.get(extent);
 -        if (rangeList == null) {
 -          rangeList = new ArrayList<Range>();
 -          tabletRanges.put(extent, rangeList);
 -        }
 -
 -        rangeList.add(range);
 -
 -        if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
 -          break;
 -        }
 -
 -        lastExtent = extent;
 -      }
 -
 -    }
 -
 -    return binnedRanges;
 -  }
 -
 -  /**
 -   * Read the metadata table to get tablets and match up ranges to them.
 -   */
 -  @Override
 -  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 -    Level logLevel = getLogLevel(job);
 -    log.setLevel(logLevel);
 -
 -    validateOptions(job);
 -
 -    String tableName = getInputTableName(job);
 -    boolean autoAdjust = getAutoAdjustRanges(job);
 -    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job);
 -    Instance instance = getInstance(job);
 -    boolean offline = isOfflineScan(job);
 -    boolean isolated = isIsolated(job);
 -    boolean localIterators = usesLocalIterators(job);
 -    boolean mockInstance = (null != instance && MockInstance.class.equals(instance.getClass()));
 -    Set<Pair<Text,Text>> fetchedColumns = getFetchedColumns(job);
 -    Authorizations auths = getScanAuthorizations(job);
 -    String principal = getPrincipal(job);
 -    String tokenClass = getTokenClass(job);
 -    byte[] tokenBytes = getToken(job);
 -
 -    AuthenticationToken token;
 -    try {
 -      token = CredentialHelper.extractToken(tokenClass, tokenBytes);
 -    } catch (AccumuloSecurityException e) {
 -      throw new IOException(e);
 -    }
 -
 -    List<IteratorSetting> iterators = getIterators(job);
 -
 -    if (ranges.isEmpty()) {
 -      ranges = new ArrayList<Range>(1);
 -      ranges.add(new Range());
 -    }
 -
 -    // get the metadata information for these ranges
 -    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 -    TabletLocator tl;
 -    try {
 -      if (isOfflineScan(job)) {
 -        binnedRanges = binOfflineTable(job, tableName, ranges);
 -        while (binnedRanges == null) {
 -          // Some tablets were still online, try again
 -          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
 -          binnedRanges = binOfflineTable(job, tableName, ranges);
 -        }
 -      } else {
 -        String tableId = null;
 -        tl = getTabletLocator(job);
 -        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
 -        tl.invalidateCache();
 -        while (!tl.binRanges(ranges, binnedRanges, new TCredentials(principal, tokenClass, ByteBuffer.wrap(tokenBytes), instance.getInstanceID())).isEmpty()) {
 -          if (!(instance instanceof MockInstance)) {
 -            if (tableId == null)
 -              tableId = Tables.getTableId(instance, tableName);
 -            if (!Tables.exists(instance, tableId))
 -              throw new TableDeletedException(tableId);
 -            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
 -              throw new TableOfflineException(instance, tableId);
 -          }
 -          binnedRanges.clear();
 -          log.warn("Unable to locate bins for specified ranges. Retrying.");
 -          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
 -          tl.invalidateCache();
 -        }
 -      }
 -    } catch (Exception e) {
 -      throw new IOException(e);
 -    }
 -
 -    ArrayList<RangeInputSplit> splits = new ArrayList<RangeInputSplit>(ranges.size());
 -    HashMap<Range,ArrayList<String>> splitsToAdd = null;
 -
 -    if (!autoAdjust)
 -      splitsToAdd = new HashMap<Range,ArrayList<String>>();
 -
 -    HashMap<String,String> hostNameCache = new HashMap<String,String>();
 -
 -    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
 -      String ip = tserverBin.getKey().split(":", 2)[0];
 -      String location = hostNameCache.get(ip);
 -      if (location == null) {
 -        InetAddress inetAddress = InetAddress.getByName(ip);
 -        location = inetAddress.getHostName();
 -        hostNameCache.put(ip, location);
 -      }
 -
 -      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
 -        Range ke = extentRanges.getKey().toDataRange();
 -        for (Range r : extentRanges.getValue()) {
 -          if (autoAdjust) {
 -            // divide ranges into smaller ranges, based on the tablets
 -            splits.add(new RangeInputSplit(ke.clip(r), new String[] {location}));
 -          } else {
 -            // don't divide ranges
 -            ArrayList<String> locations = splitsToAdd.get(r);
 -            if (locations == null)
 -              locations = new ArrayList<String>(1);
 -            locations.add(location);
 -            splitsToAdd.put(r, locations);
 -          }
 -        }
 -      }
 -    }
 -
 -    if (!autoAdjust)
 -      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
 -        splits.add(new RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0])));
 -
 -    for (RangeInputSplit split : splits) {
 -      split.setTable(tableName);
 -      split.setOffline(offline);
 -      split.setIsolatedScan(isolated);
 -      split.setUsesLocalIterators(localIterators);
 -      split.setMockInstance(mockInstance);
 -      split.setFetchedColumns(fetchedColumns);
 -      split.setPrincipal(principal);
 -      split.setToken(token);
 -      split.setInstanceName(instance.getInstanceName());
 -      split.setZooKeepers(instance.getZooKeepers());
 -      split.setAuths(auths);
 -      split.setIterators(iterators);
 -      split.setLogLevel(logLevel);
 +      for (IteratorSetting iterator : iterators)
 +        scanner.addScanIterator(iterator);
      }
 -
 -    return splits.toArray(new InputSplit[splits.size()]);
    }
  
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8a34937a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 9525796,1404a25..329c6b1
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@@ -301,47 -510,813 +301,47 @@@ public abstract class InputFormatBase<K
     * @param context
     *          the Hadoop context for the configured job
     * @return an Accumulo tablet locator
 -   * @throws TableNotFoundException
 +   * @throws org.apache.accumulo.core.client.TableNotFoundException
     *           if the table name set on the configuration doesn't exist
     * @since 1.5.0
 +   * @deprecated since 1.6.0
     */
 +  @Deprecated
    protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
 -    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context));
 -  }
 -
 -  // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
 -  /**
 -   * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
 -   * 
 -   * @param context
 -   *          the Hadoop context for the configured job
 -   * @throws IOException
 -   *           if the context is improperly configured
 -   * @since 1.5.0
 -   */
 -  protected static void validateOptions(JobContext context) throws IOException {
 -    InputConfigurator.validateOptions(CLASS, getConfiguration(context));
 +    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context)));
    }
  
 -  /**
 -   * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
 -   * types.
 -   * 
 -   * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables:
 -   * <ul>
 -   * <li>K {@link #currentK}</li>
 -   * <li>V {@link #currentV}</li>
 -   * <li>Key {@link #currentKey} (used for progress reporting)</li>
 -   * <li>int {@link #numKeysRead} (used for progress reporting)</li>
 -   * </ul>
 -   */
 -  protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> {
 -    protected long numKeysRead;
 -    protected Iterator<Entry<Key,Value>> scannerIterator;
 -    protected RangeInputSplit split;
 +  protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> {
  
      /**
 -     * Apply the configured iterators from the configuration to the scanner.
 +     * Apply the configured iterators from the configuration to the scanner for the specified table name
       * 
 +     * @param context
 +     *          the Hadoop context for the configured job
       * @param scanner
       *          the scanner to configure
 +     * @since 1.6.0
       */
 -    protected void setupIterators(List<IteratorSetting> iterators, Scanner scanner) {
 -      for (IteratorSetting iterator : iterators) {
 -        scanner.addScanIterator(iterator);
 -      }
 +    @Override
 +    protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, RangeInputSplit split) {
 +      setupIterators(context, scanner, split);
      }
-     
+ 
      /**
 -     * Initialize a scanner over the given input split using this task attempt configuration.
 +     * Apply the configured iterators from the configuration to the scanner.
 +     * 
 +     * @param context
 +     *          the Hadoop context for the configured job
 +     * @param scanner
 +     *          the scanner to configure
       */
 -    @Override
 -    public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
 -      Scanner scanner;
 -      split = (RangeInputSplit) inSplit;
 -      log.debug("Initializing input split: " + split.getRange());
 -
 -      Instance instance = split.getInstance();
 -      if (null == instance) {
 -        instance = getInstance(attempt);
 -      }
 -
 -      String principal = split.getPrincipal();
 -      if (null == principal) {
 -        principal = getPrincipal(attempt);
 -      }
 -
 -      AuthenticationToken token = split.getToken();
 -      if (null == token) {
 -        String tokenClass = getTokenClass(attempt);
 -        byte[] tokenBytes = getToken(attempt);
 -        try {
 -          token = CredentialHelper.extractToken(tokenClass, tokenBytes);
 -        } catch (AccumuloSecurityException e) {
 -          throw new IOException(e);
 -        }
 -      }
 -
 -      Authorizations authorizations = split.getAuths();
 -      if (null == authorizations) {
 -        authorizations = getScanAuthorizations(attempt);
 -      }
 -
 -      String table = split.getTable();
 -      if (null == table) {
 -        table = getInputTableName(attempt);
 -      }
 -
 -      Boolean isOffline = split.isOffline();
 -      if (null == isOffline) {
 -        isOffline = isOfflineScan(attempt);
 -      }
 -
 -      Boolean isIsolated = split.isIsolatedScan();
 -      if (null == isIsolated) {
 -        isIsolated = isIsolated(attempt);
 -      }
 -
 -      Boolean usesLocalIterators = split.usesLocalIterators();
 -      if (null == usesLocalIterators) {
 -        usesLocalIterators = usesLocalIterators(attempt);
 -      }
 -
 +    protected void setupIterators(TaskAttemptContext context, Scanner scanner, RangeInputSplit split) {
-       List<IteratorSetting> iterators = split.getIterators(); 
+       List<IteratorSetting> iterators = split.getIterators();
        if (null == iterators) {
 -        iterators = getIterators(attempt);
 -      }
 -
 -      Set<Pair<Text,Text>> columns = split.getFetchedColumns();
 -      if (null == columns) {
 -        columns = getFetchedColumns(attempt);
 -      }
 -
 -      try {
 -        log.debug("Creating connector with user: " + principal);
 -        Connector conn = instance.getConnector(principal, token);
 -        log.debug("Creating scanner for table: " + table);
 -        log.debug("Authorizations are: " + authorizations);
 -        if (isOfflineScan(attempt)) {
 -          String tokenClass = token.getClass().getCanonicalName();
 -          ByteBuffer tokenBuffer = ByteBuffer.wrap(CredentialHelper.toBytes(token));
 -          scanner = new OfflineScanner(instance, new TCredentials(principal, tokenClass, tokenBuffer, instance.getInstanceID()), Tables.getTableId(instance,
 -              table), authorizations);
 -        } else {
 -          scanner = conn.createScanner(table, authorizations);
 -        }
 -        if (isIsolated) {
 -          log.info("Creating isolated scanner");
 -          scanner = new IsolatedScanner(scanner);
 -        }
 -        if (usesLocalIterators) {
 -          log.info("Using local iterators");
 -          scanner = new ClientSideIteratorScanner(scanner);
 -        }
 -        setupIterators(iterators, scanner);
 -      } catch (Exception e) {
 -        throw new IOException(e);
 -      }
 -
 -      // setup a scanner within the bounds of this split
 -      for (Pair<Text,Text> c : columns) {
 -        if (c.getSecond() != null) {
 -          log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
 -          scanner.fetchColumn(c.getFirst(), c.getSecond());
 -        } else {
 -          log.debug("Fetching column family " + c.getFirst());
 -          scanner.fetchColumnFamily(c.getFirst());
 -        }
 -      }
 -
 -      scanner.setRange(split.getRange());
 -
 -      numKeysRead = 0;
 -
 -      // do this last after setting all scanner options
 -      scannerIterator = scanner.iterator();
 -    }
 -
 -    @Override
 -    public void close() {}
 -
 -    @Override
 -    public float getProgress() throws IOException {
 -      if (numKeysRead > 0 && currentKey == null)
 -        return 1.0f;
 -      return split.getProgress(currentKey);
 -    }
 -
 -    protected K currentK = null;
 -    protected V currentV = null;
 -    protected Key currentKey = null;
 -    protected Value currentValue = null;
 -
 -    @Override
 -    public K getCurrentKey() throws IOException, InterruptedException {
 -      return currentK;
 -    }
 -
 -    @Override
 -    public V getCurrentValue() throws IOException, InterruptedException {
 -      return currentV;
 -    }
 -  }
 -
 -  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
 -      AccumuloException, AccumuloSecurityException {
 -
 -    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 -
 -    Instance instance = getInstance(context);
 -    Connector conn = instance.getConnector(getPrincipal(context), CredentialHelper.extractToken(getTokenClass(context), getToken(context)));
 -    String tableId = Tables.getTableId(instance, tableName);
 -
 -    if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
 -      Tables.clearCache(instance);
 -      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
 -        throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
 -      }
 -    }
 -
 -    for (Range range : ranges) {
 -      Text startRow;
 -
 -      if (range.getStartKey() != null)
 -        startRow = range.getStartKey().getRow();
 -      else
 -        startRow = new Text();
 -
 -      Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
 -      Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
 -      Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
 -      scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY);
 -      scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
 -      scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
 -      scanner.setRange(metadataRange);
 -
 -      RowIterator rowIter = new RowIterator(scanner);
 -
 -      KeyExtent lastExtent = null;
 -
 -      while (rowIter.hasNext()) {
 -        Iterator<Entry<Key,Value>> row = rowIter.next();
 -        String last = "";
 -        KeyExtent extent = null;
 -        String location = null;
 -
 -        while (row.hasNext()) {
 -          Entry<Key,Value> entry = row.next();
 -          Key key = entry.getKey();
 -
 -          if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) {
 -            last = entry.getValue().toString();
 -          }
 -
 -          if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
 -              || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
 -            location = entry.getValue().toString();
 -          }
 -
 -          if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
 -            extent = new KeyExtent(key.getRow(), entry.getValue());
 -          }
 -
 -        }
 -
 -        if (location != null)
 -          return null;
 -
 -        if (!extent.getTableId().toString().equals(tableId)) {
 -          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
 -        }
 -
 -        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
 -          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
 -        }
 -
 -        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
 -        if (tabletRanges == null) {
 -          tabletRanges = new HashMap<KeyExtent,List<Range>>();
 -          binnedRanges.put(last, tabletRanges);
 -        }
 -
 -        List<Range> rangeList = tabletRanges.get(extent);
 -        if (rangeList == null) {
 -          rangeList = new ArrayList<Range>();
 -          tabletRanges.put(extent, rangeList);
 -        }
 -
 -        rangeList.add(range);
 -
 -        if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
 -          break;
 -        }
 -
 -        lastExtent = extent;
 -      }
 -
 -    }
 -
 -    return binnedRanges;
 -  }
 -
 -  /**
 -   * Read the metadata table to get tablets and match up ranges to them.
 -   */
 -  @Override
 -  public List<InputSplit> getSplits(JobContext context) throws IOException {
 -    Level logLevel = getLogLevel(context);
 -    log.setLevel(logLevel);
 -
 -    validateOptions(context);
 -
 -    String tableName = getInputTableName(context);
 -    boolean autoAdjust = getAutoAdjustRanges(context);
 -    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
 -    Instance instance = getInstance(context);
 -    boolean offline = isOfflineScan(context);
 -    boolean isolated = isIsolated(context);
 -    boolean localIterators = usesLocalIterators(context);
 -    boolean mockInstance = (null != instance && MockInstance.class.equals(instance.getClass()));
 -    Set<Pair<Text,Text>> fetchedColumns = getFetchedColumns(context);
 -    Authorizations auths = getScanAuthorizations(context);
 -    String principal = getPrincipal(context);
 -    String tokenClass = getTokenClass(context);
 -    byte[] tokenBytes = getToken(context);
 -
 -    AuthenticationToken token;
 -    try {
 -      token = CredentialHelper.extractToken(tokenClass, tokenBytes);
 -    } catch (AccumuloSecurityException e) {
 -      throw new IOException(e);
 -    }
 -
 -    List<IteratorSetting> iterators = getIterators(context);
 -
 -    if (ranges.isEmpty()) {
 -      ranges = new ArrayList<Range>(1);
 -      ranges.add(new Range());
 -    }
 -
 -    // get the metadata information for these ranges
 -    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 -    TabletLocator tl;
 -    try {
 -      if (isOfflineScan(context)) {
 -        binnedRanges = binOfflineTable(context, tableName, ranges);
 -        while (binnedRanges == null) {
 -          // Some tablets were still online, try again
 -          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
 -          binnedRanges = binOfflineTable(context, tableName, ranges);
 -        }
 -      } else {
 -        String tableId = null;
 -        tl = getTabletLocator(context);
 -        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
 -        tl.invalidateCache();
 -        while (!tl.binRanges(ranges, binnedRanges, new TCredentials(principal, tokenClass, ByteBuffer.wrap(tokenBytes), instance.getInstanceID())).isEmpty()) {
 -          if (!(instance instanceof MockInstance)) {
 -            if (tableId == null)
 -              tableId = Tables.getTableId(instance, tableName);
 -            if (!Tables.exists(instance, tableId))
 -              throw new TableDeletedException(tableId);
 -            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
 -              throw new TableOfflineException(instance, tableId);
 -          }
 -          binnedRanges.clear();
 -          log.warn("Unable to locate bins for specified ranges. Retrying.");
 -          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
 -          tl.invalidateCache();
 -        }
 -      }
 -    } catch (Exception e) {
 -      throw new IOException(e);
 -    }
 -
 -    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
 -    HashMap<Range,ArrayList<String>> splitsToAdd = null;
 -
 -    if (!autoAdjust)
 -      splitsToAdd = new HashMap<Range,ArrayList<String>>();
 -
 -    HashMap<String,String> hostNameCache = new HashMap<String,String>();
 -
 -    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
 -      String ip = tserverBin.getKey().split(":", 2)[0];
 -      String location = hostNameCache.get(ip);
 -      if (location == null) {
 -        InetAddress inetAddress = InetAddress.getByName(ip);
 -        location = inetAddress.getHostName();
 -        hostNameCache.put(ip, location);
 -      }
 -
 -      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
 -        Range ke = extentRanges.getKey().toDataRange();
 -        for (Range r : extentRanges.getValue()) {
 -          if (autoAdjust) {
 -            // divide ranges into smaller ranges, based on the tablets
 -            splits.add(new RangeInputSplit(ke.clip(r), new String[] {location}));
 -          } else {
 -            // don't divide ranges
 -            ArrayList<String> locations = splitsToAdd.get(r);
 -            if (locations == null)
 -              locations = new ArrayList<String>(1);
 -            locations.add(location);
 -            splitsToAdd.put(r, locations);
 -          }
 -        }
 +        iterators = getIterators(context);
        }
 -    }
 -
 -    if (!autoAdjust)
 -      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
 -        splits.add(new RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0])));
 -
 -    for (InputSplit inputSplit : splits) {
 -      RangeInputSplit split = (RangeInputSplit) inputSplit;
 -
 -      split.setTable(tableName);
 -      split.setOffline(offline);
 -      split.setIsolatedScan(isolated);
 -      split.setUsesLocalIterators(localIterators);
 -      split.setMockInstance(mockInstance);
 -      split.setFetchedColumns(fetchedColumns);
 -      split.setPrincipal(principal);
 -      split.setToken(token);
 -      split.setInstanceName(instance.getInstanceName());
 -      split.setZooKeepers(instance.getZooKeepers());
 -      split.setAuths(auths);
 -      split.setIterators(iterators);
 -      split.setLogLevel(logLevel);
 -    }
 -
 -    return splits;
 -  }
 -
 -  // ----------------------------------------------------------------------------------------------------
 -  // Everything below this line is deprecated and should go away in future versions
 -  // ----------------------------------------------------------------------------------------------------
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setScanIsolation(Job, boolean)} instead.
 -   */
 -  @Deprecated
 -  public static void setIsolated(Configuration conf, boolean enable) {
 -    InputConfigurator.setScanIsolation(CLASS, conf, enable);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setLocalIterators(Job, boolean)} instead.
 -   */
 -  @Deprecated
 -  public static void setLocalIterators(Configuration conf, boolean enable) {
 -    InputConfigurator.setLocalIterators(CLASS, conf, enable);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, AuthenticationToken)}, {@link #setInputTableName(Job, String)}, and
 -   *             {@link #setScanAuthorizations(Job, Authorizations)} instead.
 -   */
 -  @Deprecated
 -  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
 -    try {
 -      InputConfigurator.setConnectorInfo(CLASS, conf, user, new PasswordToken(passwd));
 -    } catch (AccumuloSecurityException e) {
 -      throw new RuntimeException(e);
 -    }
 -    InputConfigurator.setInputTableName(CLASS, conf, table);
 -    InputConfigurator.setScanAuthorizations(CLASS, conf, auths);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead.
 -   */
 -  @Deprecated
 -  public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
 -    InputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead.
 -   */
 -  @Deprecated
 -  public static void setMockInstance(Configuration conf, String instanceName) {
 -    InputConfigurator.setMockInstance(CLASS, conf, instanceName);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setRanges(Job, Collection)} instead.
 -   */
 -  @Deprecated
 -  public static void setRanges(Configuration conf, Collection<Range> ranges) {
 -    InputConfigurator.setRanges(CLASS, conf, ranges);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setAutoAdjustRanges(Job, boolean)} instead.
 -   */
 -  @Deprecated
 -  public static void disableAutoAdjustRanges(Configuration conf) {
 -    InputConfigurator.setAutoAdjustRanges(CLASS, conf, false);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
 -   */
 -  @Deprecated
 -  public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
 -    IteratorSetting vers = new IteratorSetting(1, "vers", VersioningIterator.class);
 -    try {
 -      VersioningIterator.setMaxVersions(vers, maxVersions);
 -    } catch (IllegalArgumentException e) {
 -      throw new IOException(e);
 -    }
 -    InputConfigurator.addIterator(CLASS, conf, vers);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setOfflineTableScan(Job, boolean)} instead.
 -   */
 -  @Deprecated
 -  public static void setScanOffline(Configuration conf, boolean scanOff) {
 -    InputConfigurator.setOfflineTableScan(CLASS, conf, scanOff);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #fetchColumns(Job, Collection)} instead.
 -   */
 -  @Deprecated
 -  public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
 -    InputConfigurator.fetchColumns(CLASS, conf, columnFamilyColumnQualifierPairs);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead.
 -   */
 -  @Deprecated
 -  public static void setLogLevel(Configuration conf, Level level) {
 -    InputConfigurator.setLogLevel(CLASS, conf, level);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} instead.
 -   */
 -  @Deprecated
 -  public static void addIterator(Configuration conf, IteratorSetting cfg) {
 -    InputConfigurator.addIterator(CLASS, conf, cfg);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #isIsolated(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static boolean isIsolated(Configuration conf) {
 -    return InputConfigurator.isIsolated(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #usesLocalIterators(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static boolean usesLocalIterators(Configuration conf) {
 -    return InputConfigurator.usesLocalIterators(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getPrincipal(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static String getPrincipal(Configuration conf) {
 -    return InputConfigurator.getPrincipal(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static byte[] getToken(Configuration conf) {
 -    return InputConfigurator.getToken(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getInputTableName(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static String getTablename(Configuration conf) {
 -    return InputConfigurator.getInputTableName(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getScanAuthorizations(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static Authorizations getAuthorizations(Configuration conf) {
 -    return InputConfigurator.getScanAuthorizations(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static Instance getInstance(Configuration conf) {
 -    return InputConfigurator.getInstance(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getTabletLocator(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
 -    return InputConfigurator.getTabletLocator(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getRanges(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static List<Range> getRanges(Configuration conf) throws IOException {
 -    return InputConfigurator.getRanges(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getFetchedColumns(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
 -    return InputConfigurator.getFetchedColumns(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getAutoAdjustRanges(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static boolean getAutoAdjustRanges(Configuration conf) {
 -    return InputConfigurator.getAutoAdjustRanges(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static Level getLogLevel(Configuration conf) {
 -    return InputConfigurator.getLogLevel(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #validateOptions(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static void validateOptions(Configuration conf) throws IOException {
 -    InputConfigurator.validateOptions(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
 -   */
 -  @Deprecated
 -  protected static int getMaxVersions(Configuration conf) {
 -    // This is so convoluted, because the only reason to get the number of maxVersions is to construct the same type of IteratorSetting object we have to
 -    // deconstruct to get at this option in the first place, but to preserve correct behavior, this appears necessary.
 -    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
 -    for (IteratorSetting setting : iteratorSettings) {
 -      if ("vers".equals(setting.getName()) && 1 == setting.getPriority() && VersioningIterator.class.getName().equals(setting.getIteratorClass())) {
 -        if (setting.getOptions().containsKey("maxVersions"))
 -          return Integer.parseInt(setting.getOptions().get("maxVersions"));
 -        else
 -          return -1;
 -      }
 -    }
 -    return -1;
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #isOfflineScan(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static boolean isOfflineScan(Configuration conf) {
 -    return InputConfigurator.isOfflineScan(CLASS, conf);
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static List<AccumuloIterator> getIterators(Configuration conf) {
 -    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
 -    List<AccumuloIterator> deprecatedIterators = new ArrayList<AccumuloIterator>(iteratorSettings.size());
 -    for (IteratorSetting setting : iteratorSettings) {
 -      AccumuloIterator deprecatedIter = new AccumuloIterator(new String(setting.getPriority() + AccumuloIterator.FIELD_SEP + setting.getIteratorClass()
 -          + AccumuloIterator.FIELD_SEP + setting.getName()));
 -      deprecatedIterators.add(deprecatedIter);
 -    }
 -    return deprecatedIterators;
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
 -   */
 -  @Deprecated
 -  protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
 -    List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
 -    List<AccumuloIteratorOption> deprecatedIteratorOptions = new ArrayList<AccumuloIteratorOption>(iteratorSettings.size());
 -    for (IteratorSetting setting : iteratorSettings) {
 -      for (Entry<String,String> opt : setting.getOptions().entrySet()) {
 -        String deprecatedOption;
 -        try {
 -          deprecatedOption = new String(setting.getName() + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getKey(), "UTF-8")
 -              + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getValue(), "UTF-8"));
 -        } catch (UnsupportedEncodingException e) {
 -          throw new RuntimeException(e);
 -        }
 -        deprecatedIteratorOptions.add(new AccumuloIteratorOption(deprecatedOption));
 -      }
 -    }
 -    return deprecatedIteratorOptions;
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
 -   */
 -  @Deprecated
 -  static class AccumuloIterator {
 -
 -    private static final String FIELD_SEP = ":";
 -
 -    private int priority;
 -    private String iteratorClass;
 -    private String iteratorName;
 -
 -    public AccumuloIterator(int priority, String iteratorClass, String iteratorName) {
 -      this.priority = priority;
 -      this.iteratorClass = iteratorClass;
 -      this.iteratorName = iteratorName;
 -    }
 -
 -    // Parses out a setting given an string supplied from an earlier toString() call
 -    public AccumuloIterator(String iteratorSetting) {
 -      // Parse the string to expand the iterator
 -      StringTokenizer tokenizer = new StringTokenizer(iteratorSetting, FIELD_SEP);
 -      priority = Integer.parseInt(tokenizer.nextToken());
 -      iteratorClass = tokenizer.nextToken();
 -      iteratorName = tokenizer.nextToken();
 -    }
 -
 -    public int getPriority() {
 -      return priority;
 -    }
 -
 -    public String getIteratorClass() {
 -      return iteratorClass;
 -    }
 -
 -    public String getIteratorName() {
 -      return iteratorName;
 -    }
 -
 -    @Override
 -    public String toString() {
 -      return new String(priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName);
 -    }
 -
 -  }
 -
 -  /**
 -   * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
 -   */
 -  @Deprecated
 -  static class AccumuloIteratorOption {
 -    private static final String FIELD_SEP = ":";
 -
 -    private String iteratorName;
 -    private String key;
 -    private String value;
 -
 -    public AccumuloIteratorOption(String iteratorName, String key, String value) {
 -      this.iteratorName = iteratorName;
 -      this.key = key;
 -      this.value = value;
 -    }
 -
 -    // Parses out an option given a string supplied from an earlier toString() call
 -    public AccumuloIteratorOption(String iteratorOption) {
 -      StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP);
 -      this.iteratorName = tokenizer.nextToken();
 -      try {
 -        this.key = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
 -        this.value = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
 -      } catch (UnsupportedEncodingException e) {
 -        throw new RuntimeException(e);
 -      }
 -    }
 -
 -    public String getIteratorName() {
 -      return iteratorName;
 -    }
 -
 -    public String getKey() {
 -      return key;
 -    }
 -
 -    public String getValue() {
 -      return value;
 -    }
 -
 -    @Override
 -    public String toString() {
 -      try {
 -        return new String(iteratorName + FIELD_SEP + URLEncoder.encode(key, "UTF-8") + FIELD_SEP + URLEncoder.encode(value, "UTF-8"));
 -      } catch (UnsupportedEncodingException e) {
 -        throw new RuntimeException(e);
 -      }
 -    }
 -
 -  }
 -
 -  // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility
 -  static Configuration getConfiguration(JobContext context) {
 -    try {
 -      Class<?> c = InputFormatBase.class.getClassLoader().loadClass("org.apache.hadoop.mapreduce.JobContext");
 -      Method m = c.getMethod("getConfiguration");
 -      Object o = m.invoke(context, new Object[0]);
 -      return (Configuration) o;
 -    } catch (Exception e) {
 -      throw new RuntimeException(e);
 +      for (IteratorSetting iterator : iterators)
 +        scanner.addScanIterator(iterator);
      }
    }
 -
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8a34937a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 20b5f95,d826895..b0f0b71
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@@ -246,13 -281,11 +246,13 @@@ public class AccumuloInputFormatTest 
        String user = args[0];
        String pass = args[1];
        String table = args[2];
 +
        String instanceName = args[3];
        String inputFormatClassName = args[4];
-       @SuppressWarnings({"rawtypes", "unchecked"})
-       Class<? extends InputFormat> inputFormatClass = (Class<? extends InputFormat>) Class.forName(inputFormatClassName);
+       @SuppressWarnings("unchecked")
+       Class<? extends InputFormat<?,?>> inputFormatClass = (Class<? extends InputFormat<?,?>>) Class.forName(inputFormatClassName);
  
 +      @SuppressWarnings("deprecation")
        Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
        job.setJarByClass(this.getClass());
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8a34937a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 2ae0763,0000000..d6e04a8
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@@ -1,552 -1,0 +1,555 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.init;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.Locale;
 +import java.util.Map.Entry;
 +import java.util.UUID;
 +
 +import jline.console.ConsoleReader;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.iterators.user.VersioningIterator;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.master.thrift.MasterGoalState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.constraints.MetadataConstraints;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.tablets.TabletTime;
 +import org.apache.accumulo.server.util.TablePropUtil;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.ZooDefs.Ids;
 +
 +import com.beust.jcommander.Parameter;
 +
 +/**
 + * This class is used to setup the directory structure and the root tablet to get an instance started
 + * 
 + */
 +public class Initialize {
 +  private static final Logger log = Logger.getLogger(Initialize.class);
 +  private static final String DEFAULT_ROOT_USER = "root";
 +  public static final String TABLE_TABLETS_TABLET_DIR = "/table_info";
-   
++
 +  private static ConsoleReader reader = null;
 +  private static IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-   
++
 +  private static ConsoleReader getConsoleReader() throws IOException {
 +    if (reader == null)
 +      reader = new ConsoleReader();
 +    return reader;
 +  }
++
 +  /**
 +   * Sets this class's ZooKeeper reader/writer.
-    *
-    * @param izoo reader/writer
++   * 
++   * @param izoo
++   *          reader/writer
 +   */
 +  static void setZooReaderWriter(IZooReaderWriter izoo) {
 +    zoo = izoo;
 +  }
++
 +  /**
 +   * Gets this class's ZooKeeper reader/writer.
-    *
++   * 
 +   * @return reader/writer
 +   */
 +  static IZooReaderWriter getZooReaderWriter() {
 +    return zoo;
 +  }
 +
 +  private static HashMap<String,String> initialMetadataConf = new HashMap<String,String>();
 +  static {
 +    initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K");
 +    initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
 +    initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true");
 +    initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
 +    initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M");
 +    initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName());
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers", "10," + VersioningIterator.class.getName());
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions", "1");
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers", "10," + VersioningIterator.class.getName());
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1");
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName());
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1");
 +    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName());
 +    initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
 +    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet",
 +        String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME));
 +    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "server", String.format("%s,%s,%s,%s", DataFileColumnFamily.NAME,
 +        LogColumnFamily.NAME, TabletsSection.ServerColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME));
 +    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "tablet,server");
 +    initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), "");
 +    initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true");
 +    initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
 +  }
-   
++
 +  static boolean checkInit(Configuration conf, VolumeManager fs, SiteConfiguration sconf) throws IOException {
 +    String fsUri;
 +    if (!sconf.get(Property.INSTANCE_DFS_URI).equals(""))
 +      fsUri = sconf.get(Property.INSTANCE_DFS_URI);
 +    else
 +      fsUri = FileSystem.getDefaultUri(conf).toString();
 +    log.info("Hadoop Filesystem is " + fsUri);
 +    log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs()));
 +    log.info("Zookeeper server is " + sconf.get(Property.INSTANCE_ZK_HOST));
 +    log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running");
 +    if (!zookeeperAvailable()) {
 +      log.fatal("Zookeeper needs to be up and running in order to init. Exiting ...");
 +      return false;
 +    }
 +    if (sconf.get(Property.INSTANCE_SECRET).equals(Property.INSTANCE_SECRET.getDefaultValue())) {
 +      ConsoleReader c = getConsoleReader();
 +      c.beep();
 +      c.println();
 +      c.println();
 +      c.println("Warning!!! Your instance secret is still set to the default, this is not secure. We highly recommend you change it.");
 +      c.println();
 +      c.println();
 +      c.println("You can change the instance secret in accumulo by using:");
 +      c.println("   bin/accumulo " + org.apache.accumulo.server.util.ChangeSecret.class.getName() + " oldPassword newPassword.");
 +      c.println("You will also need to edit your secret in your configuration file by adding the property instance.secret to your conf/accumulo-site.xml. Without this accumulo will not operate correctly");
 +    }
 +    try {
 +      if (isInitialized(fs)) {
 +        String instanceDfsDir = sconf.get(Property.INSTANCE_DFS_DIR);
 +        log.fatal("It appears the directory " + fsUri + instanceDfsDir + " was previously initialized.");
 +        String instanceDfsUri = sconf.get(Property.INSTANCE_DFS_URI);
 +        if ("".equals(instanceDfsUri)) {
 +          log.fatal("You are using the default URI for the filesystem. Set the property " + Property.INSTANCE_DFS_URI + " to use a different filesystem,");
 +        } else {
 +          log.fatal("Change the property " + Property.INSTANCE_DFS_URI + " to use a different filesystem,");
 +        }
 +        log.fatal("or change the property " + Property.INSTANCE_DFS_DIR + " to use a different directory.");
 +        log.fatal("The current value of " + Property.INSTANCE_DFS_URI + " is |" + instanceDfsUri + "|");
 +        log.fatal("The current value of " + Property.INSTANCE_DFS_DIR + " is |" + instanceDfsDir + "|");
 +        return false;
 +      }
 +    } catch (IOException e) {
 +      throw new IOException("Failed to check if filesystem already initialized", e);
 +    }
 +
 +    return true;
 +  }
 +
 +  public static boolean doInit(Opts opts, Configuration conf, VolumeManager fs) throws IOException {
 +    if (!checkInit(conf, fs, ServerConfiguration.getSiteConfiguration())) {
 +      return false;
 +    }
 +
 +    // prompt user for instance name and root password early, in case they
 +    // abort, we don't leave an inconsistent HDFS/ZooKeeper structure
 +    String instanceNamePath;
 +    try {
 +      instanceNamePath = getInstanceNamePath(opts);
 +    } catch (Exception e) {
 +      log.fatal("Failed to talk to zookeeper", e);
 +      return false;
 +    }
 +    opts.rootpass = getRootPassword(opts);
 +    return initialize(opts, instanceNamePath, fs);
 +  }
-   
++
 +  public static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) {
-     
++
 +    UUID uuid = UUID.randomUUID();
 +    // the actual disk locations of the root table and tablets
 +    final Path rootTablet = new Path(fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
 +    try {
 +      initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTablet);
 +    } catch (Exception e) {
 +      log.fatal("Failed to initialize zookeeper", e);
 +      return false;
 +    }
-     
++
 +    try {
 +      initFileSystem(opts, fs, uuid, rootTablet);
 +    } catch (Exception e) {
 +      log.fatal("Failed to initialize filesystem", e);
 +      return false;
 +    }
-     
++
 +    try {
 +      initSecurity(opts, uuid.toString());
 +    } catch (Exception e) {
 +      log.fatal("Failed to initialize security", e);
 +      return false;
 +    }
 +    return true;
 +  }
-   
++
 +  private static boolean zookeeperAvailable() {
 +    try {
 +      return zoo.exists("/");
 +    } catch (KeeperException e) {
 +      return false;
 +    } catch (InterruptedException e) {
 +      return false;
 +    }
 +  }
-   
++
 +  private static Path[] paths(String[] paths) {
 +    Path[] result = new Path[paths.length];
 +    for (int i = 0; i < paths.length; i++) {
 +      result[i] = new Path(paths[i]);
 +    }
 +    return result;
 +  }
-   
-   //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
++
++  // TODO Remove deprecation warning suppression when Hadoop1 support is dropped
 +  @SuppressWarnings("deprecation")
 +  private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, Path rootTablet) throws IOException {
 +    FileStatus fstat;
 +
 +    // the actual disk locations of the metadata table and tablets
 +    final Path[] metadataTableDirs = paths(ServerConstants.getMetadataTableDirs());
-     
++
 +    String tableMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR));
 +    String defaultMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION));
 +
 +    fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + ServerConstants.DATA_VERSION));
-     
++
 +    // create an instance id
 +    fs.mkdirs(ServerConstants.getInstanceIdLocation());
 +    fs.createNewFile(new Path(ServerConstants.getInstanceIdLocation(), uuid.toString()));
-     
++
 +    // initialize initial metadata config in zookeeper
 +    initMetadataConfig();
-     
++
 +    // create metadata table
 +    for (Path mtd : metadataTableDirs) {
 +      try {
 +        fstat = fs.getFileStatus(mtd);
 +        if (!fstat.isDir()) {
 +          log.fatal("location " + mtd.toString() + " exists but is not a directory");
 +          return;
 +        }
 +      } catch (FileNotFoundException fnfe) {
 +        if (!fs.mkdirs(mtd)) {
 +          log.fatal("unable to create directory " + mtd.toString());
 +          return;
 +        }
 +      }
 +    }
-     
++
 +    // create root table and tablet
 +    try {
 +      fstat = fs.getFileStatus(rootTablet);
 +      if (!fstat.isDir()) {
 +        log.fatal("location " + rootTablet.toString() + " exists but is not a directory");
 +        return;
 +      }
 +    } catch (FileNotFoundException fnfe) {
 +      if (!fs.mkdirs(rootTablet)) {
 +        log.fatal("unable to create directory " + rootTablet.toString());
 +        return;
 +      }
 +    }
-     
++
 +    // populate the root tablet with info about the default tablet
 +    // the root tablet contains the key extent and locations of all the
 +    // metadata tablets
 +    String initRootTabFile = rootTablet + "/00000_00000." + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
 +    FileSystem ns = fs.getFileSystemByPath(new Path(initRootTabFile));
 +    FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, ns, ns.getConf(), AccumuloConfiguration.getDefaultConfiguration());
 +    mfw.startDefaultLocalityGroup();
-     
++
 +    Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), MetadataSchema.TabletsSection.getRange().getEndKey().getRow()));
-     
++
 +    // table tablet's directory
 +    Key tableDirKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(),
 +        TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0);
 +    mfw.append(tableDirKey, new Value(tableMetadataTabletDir.getBytes()));
-     
++
 +    // table tablet time
 +    Key tableTimeKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(),
 +        TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0);
 +    mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
-     
++
 +    // table tablet's prevrow
 +    Key tablePrevRowKey = new Key(tableExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
 +        TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0);
 +    mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(null));
-     
++
 +    // ----------] default tablet info
 +    Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null));
-     
++
 +    // default's directory
 +    Key defaultDirKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(),
 +        TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0);
 +    mfw.append(defaultDirKey, new Value(defaultMetadataTabletDir.getBytes()));
-     
++
 +    // default's time
 +    Key defaultTimeKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(),
 +        TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0);
 +    mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
-     
++
 +    // default's prevrow
 +    Key defaultPrevRowKey = new Key(defaultExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
 +        TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0);
 +    mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(MetadataSchema.TabletsSection.getRange().getEndKey().getRow()));
-     
++
 +    mfw.close();
-     
++
 +    // create table and default tablets directories
 +    for (String s : Arrays.asList(tableMetadataTabletDir, defaultMetadataTabletDir)) {
 +      Path dir = new Path(s);
 +      try {
 +        fstat = fs.getFileStatus(dir);
 +        if (!fstat.isDir()) {
 +          log.fatal("location " + dir.toString() + " exists but is not a directory");
 +          return;
 +        }
 +      } catch (FileNotFoundException fnfe) {
 +        try {
 +          fstat = fs.getFileStatus(dir);
 +          if (!fstat.isDir()) {
 +            log.fatal("location " + dir.toString() + " exists but is not a directory");
 +            return;
 +          }
 +        } catch (FileNotFoundException fnfe2) {
 +          // create table info dir
 +          if (!fs.mkdirs(dir)) {
 +            log.fatal("unable to create directory " + dir.toString());
 +            return;
 +          }
 +        }
-         
++
 +        // create default dir
 +        if (!fs.mkdirs(dir)) {
 +          log.fatal("unable to create directory " + dir.toString());
 +          return;
 +        }
 +      }
 +    }
 +  }
-   
++
 +  private static void initZooKeeper(Opts opts, String uuid, String instanceNamePath, Path rootTablet) throws KeeperException, InterruptedException {
 +    // setup basic data in zookeeper
 +    ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
 +    ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT + Constants.ZINSTANCES, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
-     
++
 +    // setup instance name
 +    if (opts.clearInstanceName)
 +      zoo.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP);
 +    zoo.putPersistentData(instanceNamePath, uuid.getBytes(), NodeExistsPolicy.FAIL);
-     
++
 +    // setup the instance
 +    String zkInstanceRoot = Constants.ZROOT + "/" + uuid;
 +    zoo.putPersistentData(zkInstanceRoot, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, Constants.ZTABLES_INITIAL_ID, NodeExistsPolicy.FAIL);
 +    TableManager.prepareNewTableState(uuid, RootTable.ID, RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
 +    TableManager.prepareNewTableState(uuid, MetadataTable.ID, MetadataTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTablet.toString().getBytes(), NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(), NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_LOCKS, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, new byte[0], NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, new byte[] {'0'}, NodeExistsPolicy.FAIL);
 +    zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, new byte[] {'0'}, NodeExistsPolicy.FAIL);
 +  }
-   
++
 +  private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {
 +    // setup the instance name
 +    String instanceName, instanceNamePath = null;
 +    boolean exists = true;
 +    do {
 +      if (opts.cliInstanceName == null) {
 +        instanceName = getConsoleReader().readLine("Instance name : ");
 +      } else {
 +        instanceName = opts.cliInstanceName;
 +      }
 +      if (instanceName == null)
 +        System.exit(0);
 +      instanceName = instanceName.trim();
 +      if (instanceName.length() == 0)
 +        continue;
 +      instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
 +      if (opts.clearInstanceName) {
 +        exists = false;
 +        break;
 +      } else if (exists = zoo.exists(instanceNamePath)) {
 +        String decision = getConsoleReader().readLine("Instance name \"" + instanceName + "\" exists. Delete existing entry from zookeeper? [Y/N] : ");
 +        if (decision == null)
 +          System.exit(0);
 +        if (decision.length() == 1 && decision.toLowerCase(Locale.ENGLISH).charAt(0) == 'y') {
 +          opts.clearInstanceName = true;
 +          exists = false;
 +        }
 +      }
 +    } while (exists);
 +    return instanceNamePath;
 +  }
-   
++
 +  private static byte[] getRootPassword(Opts opts) throws IOException {
 +    if (opts.cliPassword != null) {
 +      return opts.cliPassword.getBytes();
 +    }
 +    String rootpass;
 +    String confirmpass;
 +    do {
 +      rootpass = getConsoleReader()
 +          .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*');
 +      if (rootpass == null)
 +        System.exit(0);
 +      confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*');
 +      if (confirmpass == null)
 +        System.exit(0);
 +      if (!rootpass.equals(confirmpass))
 +        log.error("Passwords do not match");
 +    } while (!rootpass.equals(confirmpass));
 +    return rootpass.getBytes();
 +  }
-   
++
 +  private static void initSecurity(Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException {
 +    AuditedSecurityOperation.getInstance(iid, true).initializeSecurity(SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()), DEFAULT_ROOT_USER,
 +        opts.rootpass);
 +  }
-   
++
 +  public static void initMetadataConfig(String tableId) throws IOException {
 +    try {
 +      Configuration conf = CachedConfiguration.getInstance();
 +      int max = conf.getInt("dfs.replication.max", 512);
 +      // Hadoop 0.23 switched the min value configuration name
 +      int min = Math.max(conf.getInt("dfs.replication.min", 1), conf.getInt("dfs.namenode.replication.min", 1));
 +      if (max < 5)
 +        setMetadataReplication(max, "max");
 +      if (min > 5)
 +        setMetadataReplication(min, "min");
 +      for (Entry<String,String> entry : initialMetadataConf.entrySet()) {
 +        if (!TablePropUtil.setTableProperty(RootTable.ID, entry.getKey(), entry.getValue()))
 +          throw new IOException("Cannot create per-table property " + entry.getKey());
 +        if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), entry.getValue()))
 +          throw new IOException("Cannot create per-table property " + entry.getKey());
 +      }
 +    } catch (Exception e) {
 +      log.fatal("error talking to zookeeper", e);
 +      throw new IOException(e);
 +    }
 +  }
-   
++
 +  protected static void initMetadataConfig() throws IOException {
 +    initMetadataConfig(RootTable.ID);
 +    initMetadataConfig(MetadataTable.ID);
 +  }
-   
++
 +  private static void setMetadataReplication(int replication, String reason) throws IOException {
 +    String rep = getConsoleReader().readLine(
 +        "Your HDFS replication " + reason + " is not compatible with our default " + MetadataTable.NAME + " replication of 5. What do you want to set your "
 +            + MetadataTable.NAME + " replication to? (" + replication + ") ");
 +    if (rep == null || rep.length() == 0)
 +      rep = Integer.toString(replication);
 +    else
 +      // Lets make sure it's a number
 +      Integer.parseInt(rep);
 +    initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep);
 +  }
-   
++
 +  public static boolean isInitialized(VolumeManager fs) throws IOException {
 +    return (fs.exists(ServerConstants.getInstanceIdLocation()) || fs.exists(ServerConstants.getDataVersionLocation()));
 +  }
-   
++
 +  static class Opts extends Help {
 +    @Parameter(names = "--reset-security", description = "just update the security information")
 +    boolean resetSecurity = false;
 +    @Parameter(names = "--clear-instance-name", description = "delete any existing instance name without prompting")
 +    boolean clearInstanceName = false;
 +    @Parameter(names = "--instance-name", description = "the instance name, if not provided, will prompt")
 +    String cliInstanceName;
 +    @Parameter(names = "--password", description = "set the password on the command line")
 +    String cliPassword;
-     
++
 +    byte[] rootpass = null;
 +  }
-   
++
 +  public static void main(String[] args) {
 +    Opts opts = new Opts();
 +    opts.parseArgs(Initialize.class.getName(), args);
-     
++
 +    try {
 +      SecurityUtil.serverLogin();
 +      Configuration conf = CachedConfiguration.getInstance();
-       
++
 +      @SuppressWarnings("deprecation")
 +      VolumeManager fs = VolumeManagerImpl.get(SiteConfiguration.getSiteConfiguration());
-       
++
 +      if (opts.resetSecurity) {
 +        if (isInitialized(fs)) {
 +          opts.rootpass = getRootPassword(opts);
 +          initSecurity(opts, HdfsZooInstance.getInstance().getInstanceID());
 +        } else {
 +          log.fatal("Attempted to reset security on accumulo before it was initialized");
 +        }
 +      } else if (!doInit(opts, conf, fs))
 +        System.exit(-1);
 +    } catch (Exception e) {
 +      log.fatal(e, e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8a34937a/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
----------------------------------------------------------------------
diff --cc server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
index 6d0f417,0000000..d308d06
mode 100644,000000..100644
--- a/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
@@@ -1,152 -1,0 +1,157 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.init;
 +
++import static org.easymock.EasyMock.anyObject;
++import static org.easymock.EasyMock.createMock;
++import static org.easymock.EasyMock.expect;
++import static org.easymock.EasyMock.expectLastCall;
++import static org.easymock.EasyMock.replay;
++import static org.junit.Assert.assertFalse;
++import static org.junit.Assert.assertTrue;
++
 +import java.io.IOException;
- import java.net.URI;
 +
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
- import org.easymock.Capture;
- import static org.easymock.EasyMock.*;
 +import org.junit.After;
 +import org.junit.Before;
++import org.junit.Ignore;
 +import org.junit.Test;
- import static org.junit.Assert.*;
 +
 +/**
 + * This test is not thread-safe.
 + */
 +public class InitializeTest {
 +  private Configuration conf;
 +  private VolumeManager fs;
 +  private SiteConfiguration sconf;
 +  private IZooReaderWriter zooOrig;
 +  private IZooReaderWriter zoo;
 +
 +  @Before
 +  public void setUp() throws Exception {
 +    conf = createMock(Configuration.class);
 +    fs = createMock(VolumeManager.class);
 +    sconf = createMock(SiteConfiguration.class);
 +    zoo = createMock(IZooReaderWriter.class);
 +    zooOrig = Initialize.getZooReaderWriter();
 +    Initialize.setZooReaderWriter(zoo);
 +  }
 +
 +  @After
 +  public void tearDown() {
 +    Initialize.setZooReaderWriter(zooOrig);
 +  }
 +
 +  @Test
 +  public void testIsInitialized_HasInstanceId() throws Exception {
 +    expect(fs.exists(anyObject(Path.class))).andReturn(true);
 +    replay(fs);
 +    assertTrue(Initialize.isInitialized(fs));
 +  }
 +
 +  @Test
 +  public void testIsInitialized_HasDataVersion() throws Exception {
 +    expect(fs.exists(anyObject(Path.class))).andReturn(false);
 +    expect(fs.exists(anyObject(Path.class))).andReturn(true);
 +    replay(fs);
 +    assertTrue(Initialize.isInitialized(fs));
 +  }
 +
 +  @Test
 +  public void testCheckInit_NoZK() throws Exception {
 +    expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
 +    expectLastCall().anyTimes();
 +    expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1");
 +    replay(sconf);
 +    expect(zoo.exists("/")).andReturn(false);
 +    replay(zoo);
 +
 +    assertFalse(Initialize.checkInit(conf, fs, sconf));
 +  }
 +
 +  @Test
 +  public void testCheckInit_AlreadyInit() throws Exception {
 +    expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
 +    expectLastCall().anyTimes();
 +    expect(sconf.get(Property.INSTANCE_DFS_DIR)).andReturn("/bar");
 +    expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1");
 +    expect(sconf.get(Property.INSTANCE_SECRET)).andReturn(Property.INSTANCE_SECRET.getDefaultValue());
 +    replay(sconf);
 +    expect(zoo.exists("/")).andReturn(true);
 +    replay(zoo);
 +    expect(fs.exists(anyObject(Path.class))).andReturn(true);
 +    replay(fs);
 +
 +    assertFalse(Initialize.checkInit(conf, fs, sconf));
 +  }
 +
- /* Cannot test, need to mock static FileSystem.getDefaultUri()
++  // Cannot test, need to mock static FileSystem.getDefaultUri()
++  @Ignore
 +  @Test
 +  public void testCheckInit_AlreadyInit_DefaultUri() throws Exception {
 +    expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("");
 +    expectLastCall().anyTimes();
 +    expect(sconf.get(Property.INSTANCE_DFS_DIR)).andReturn("/bar");
 +    expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1");
 +    expect(sconf.get(Property.INSTANCE_SECRET)).andReturn(Property.INSTANCE_SECRET.getDefaultValue());
 +    replay(sconf);
 +    expect(zoo.exists("/")).andReturn(true);
 +    replay(zoo);
 +    // expect(fs.getUri()).andReturn(new URI("hdfs://default"));
 +    expect(fs.exists(anyObject(Path.class))).andReturn(true);
 +    replay(fs);
 +
 +    assertFalse(Initialize.checkInit(conf, fs, sconf));
 +  }
- */
 +
 +  @Test(expected = IOException.class)
 +  public void testCheckInit_FSException() throws Exception {
 +    expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
 +    expectLastCall().anyTimes();
 +    expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1");
 +    expect(sconf.get(Property.INSTANCE_SECRET)).andReturn(Property.INSTANCE_SECRET.getDefaultValue());
 +    replay(sconf);
 +    expect(zoo.exists("/")).andReturn(true);
 +    replay(zoo);
 +    expect(fs.exists(anyObject(Path.class))).andThrow(new IOException());
 +    replay(fs);
 +
 +    Initialize.checkInit(conf, fs, sconf);
 +  }
 +
 +  @Test
 +  public void testCheckInit_OK() throws Exception {
 +    expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
 +    expectLastCall().anyTimes();
 +    expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1");
 +    expect(sconf.get(Property.INSTANCE_SECRET)).andReturn(Property.INSTANCE_SECRET.getDefaultValue());
 +    replay(sconf);
 +    expect(zoo.exists("/")).andReturn(true);
 +    replay(zoo);
 +    expect(fs.exists(anyObject(Path.class))).andReturn(false);
 +    expect(fs.exists(anyObject(Path.class))).andReturn(false);
 +    replay(fs);
 +
 +    assertTrue(Initialize.checkInit(conf, fs, sconf));
 +  }
 +}