You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/23 02:41:57 UTC

[04/19] WIP Merge branch 'ACCUMULO-1854-merge' into ACCUMULO-1854-1.5-merge

http://git-wip-us.apache.org/repos/asf/accumulo/blob/16a2e0f0/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index c8731b1,0000000..ff14107
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@@ -1,532 -1,0 +1,549 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce.lib.util;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.nio.charset.Charset;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.StringTokenizer;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.TabletLocator;
 +import org.apache.accumulo.core.client.mock.MockTabletLocator;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.CredentialHelper;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.util.ArgumentChecker;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.commons.codec.binary.Base64;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.util.StringUtils;
 +
 +/**
 + * @since 1.5.0
 + */
 +public class InputConfigurator extends ConfiguratorBase {
 +  
 +  /**
 +   * Configuration keys for {@link Scanner}.
 +   * 
 +   * @since 1.5.0
 +   */
 +  public static enum ScanOpts {
 +    TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS
 +  }
 +  
 +  /**
 +   * Configuration keys for various features.
 +   * 
 +   * @since 1.5.0
 +   */
 +  public static enum Features {
 +    AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE
 +  }
 +  
 +  /**
 +   * Sets the name of the input table, over which this job will scan.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param tableName
 +   *          the table to use when the tablename is null in the write call
 +   * @since 1.5.0
 +   */
 +  public static void setInputTableName(Class<?> implementingClass, Configuration conf, String tableName) {
 +    ArgumentChecker.notNull(tableName);
 +    conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName);
 +  }
 +  
 +  /**
 +   * Gets the table name from the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the table name
 +   * @since 1.5.0
 +   * @see #setInputTableName(Class, Configuration, String)
 +   */
 +  public static String getInputTableName(Class<?> implementingClass, Configuration conf) {
 +    return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
 +  }
 +  
 +  /**
 +   * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param auths
 +   *          the user's authorizations
 +   * @since 1.5.0
 +   */
 +  public static void setScanAuthorizations(Class<?> implementingClass, Configuration conf, Authorizations auths) {
 +    if (auths != null && !auths.isEmpty())
 +      conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), auths.serialize());
 +  }
 +  
 +  /**
 +   * Gets the authorizations to set for the scans from the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the Accumulo scan authorizations
 +   * @since 1.5.0
 +   * @see #setScanAuthorizations(Class, Configuration, Authorizations)
 +   */
 +  public static Authorizations getScanAuthorizations(Class<?> implementingClass, Configuration conf) {
 +    String authString = conf.get(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS));
 +    return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.getBytes());
 +  }
 +  
 +  /**
 +   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param ranges
 +   *          the ranges that will be mapped over
 +   * @since 1.5.0
 +   */
 +  public static void setRanges(Class<?> implementingClass, Configuration conf, Collection<Range> ranges) {
 +    ArgumentChecker.notNull(ranges);
 +    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
 +    try {
 +      for (Range r : ranges) {
 +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        r.write(new DataOutputStream(baos));
 +        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()), Charset.forName("UTF-8")));
 +      }
 +    } catch (IOException ex) {
 +      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
 +    }
 +    conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
 +  }
 +  
 +  /**
 +   * Gets the ranges to scan over from a job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the ranges
 +   * @throws IOException
 +   *           if the ranges have been encoded improperly
 +   * @since 1.5.0
 +   * @see #setRanges(Class, Configuration, Collection)
 +   */
 +  public static List<Range> getRanges(Class<?> implementingClass, Configuration conf) throws IOException {
 +    ArrayList<Range> ranges = new ArrayList<Range>();
 +    for (String rangeString : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES))) {
 +      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(Charset.forName("UTF-8"))));
 +      Range range = new Range();
 +      range.readFields(new DataInputStream(bais));
 +      ranges.add(range);
 +    }
 +    return ranges;
 +  }
 +  
 +  /**
 +   * Restricts the columns that will be mapped over for this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param columnFamilyColumnQualifierPairs
 +   *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
 +   *          selected. An empty set is the default and is equivalent to scanning the all columns.
 +   * @since 1.5.0
 +   */
 +  public static void fetchColumns(Class<?> implementingClass, Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
 +    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
++    String[] columnStrings = serializeColumns(columnFamilyColumnQualifierPairs);
++    conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings);
++  }
++
++  public static String[] serializeColumns(Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
++    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
 +    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
 +    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
 +      if (column.getFirst() == null)
 +        throw new IllegalArgumentException("Column family can not be null");
-       
-       String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), Charset.forName("UTF-8"));
++
++      String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())));
 +      if (column.getSecond() != null)
-         col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), Charset.forName("UTF-8"));
++        col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())));
 +      columnStrings.add(col);
 +    }
-     conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings.toArray(new String[0]));
++
++    return columnStrings.toArray(new String[0]);
 +  }
 +  
 +  /**
 +   * Gets the columns to be mapped over from this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return a set of columns
 +   * @since 1.5.0
 +   * @see #fetchColumns(Class, Configuration, Collection)
 +   */
 +  public static Set<Pair<Text,Text>> getFetchedColumns(Class<?> implementingClass, Configuration conf) {
++    ArgumentChecker.notNull(conf);
++
++    return deserializeFetchedColumns(conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS)));
++  }
++
++  public static Set<Pair<Text,Text>> deserializeFetchedColumns(Collection<String> serialized) {
 +    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
-     for (String col : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS))) {
++
++    if (null == serialized) {
++      return columns;
++    }
++
++    for (String col : serialized) {
 +      int idx = col.indexOf(":");
-       Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(Charset.forName("UTF-8"))) : Base64.decodeBase64(col.substring(0, idx).getBytes(
-           Charset.forName("UTF-8"))));
++      Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(Charset.forName("UTF-8"))) : Base64.decodeBase64(col.substring(0, idx).getBytes(Charset.forName("UTF-8"))));
 +      Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
 +      columns.add(new Pair<Text,Text>(cf, cq));
 +    }
 +    return columns;
 +  }
 +  
 +  /**
 +   * Encode an iterator on the input for this job.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param cfg
 +   *          the configuration of the iterator
 +   * @since 1.5.0
 +   */
 +  public static void addIterator(Class<?> implementingClass, Configuration conf, IteratorSetting cfg) {
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    String newIter;
 +    try {
 +      cfg.write(new DataOutputStream(baos));
 +      newIter = new String(Base64.encodeBase64(baos.toByteArray()), Charset.forName("UTF-8"));
 +      baos.close();
 +    } catch (IOException e) {
 +      throw new IllegalArgumentException("unable to serialize IteratorSetting");
 +    }
 +    
 +    String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
 +    // No iterators specified yet, create a new string
 +    if (iterators == null || iterators.isEmpty()) {
 +      iterators = newIter;
 +    } else {
 +      // append the next iterator & reset
 +      iterators = iterators.concat(StringUtils.COMMA_STR + newIter);
 +    }
 +    // Store the iterators w/ the job
 +    conf.set(enumToConfKey(implementingClass, ScanOpts.ITERATORS), iterators);
 +  }
 +  
 +  /**
 +   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return a list of iterators
 +   * @since 1.5.0
 +   * @see #addIterator(Class, Configuration, IteratorSetting)
 +   */
 +  public static List<IteratorSetting> getIterators(Class<?> implementingClass, Configuration conf) {
 +    String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
 +    
 +    // If no iterators are present, return an empty list
 +    if (iterators == null || iterators.isEmpty())
 +      return new ArrayList<IteratorSetting>();
 +    
 +    // Compose the set of iterators encoded in the job configuration
 +    StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR);
 +    List<IteratorSetting> list = new ArrayList<IteratorSetting>();
 +    try {
 +      while (tokens.hasMoreTokens()) {
 +        String itstring = tokens.nextToken();
 +        ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
 +        list.add(new IteratorSetting(new DataInputStream(bais)));
 +        bais.close();
 +      }
 +    } catch (IOException e) {
 +      throw new IllegalArgumentException("couldn't decode iterator settings");
 +    }
 +    return list;
 +  }
 +  
 +  /**
 +   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
 +   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
 +   * 
 +   * <p>
 +   * By default, this feature is <b>enabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @see #setRanges(Class, Configuration, Collection)
 +   * @since 1.5.0
 +   */
 +  public static void setAutoAdjustRanges(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature);
 +  }
 +  
 +  /**
 +   * Determines whether a configuration has auto-adjust ranges enabled.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return false if the feature is disabled, true otherwise
 +   * @since 1.5.0
 +   * @see #setAutoAdjustRanges(Class, Configuration, boolean)
 +   */
 +  public static Boolean getAutoAdjustRanges(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true);
 +  }
 +  
 +  /**
 +   * Controls the use of the {@link IsolatedScanner} in this job.
 +   * 
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.5.0
 +   */
 +  public static void setScanIsolation(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature);
 +  }
 +  
 +  /**
 +   * Determines whether a configuration has isolation enabled.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.5.0
 +   * @see #setScanIsolation(Class, Configuration, boolean)
 +   */
 +  public static Boolean isIsolated(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false);
 +  }
 +  
 +  /**
 +   * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
 +   * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
 +   * 
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.5.0
 +   */
 +  public static void setLocalIterators(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature);
 +  }
 +  
 +  /**
 +   * Determines whether a configuration uses local iterators.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.5.0
 +   * @see #setLocalIterators(Class, Configuration, boolean)
 +   */
 +  public static Boolean usesLocalIterators(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false);
 +  }
 +  
 +  /**
 +   * <p>
 +   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
 +   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
 +   * fail.
 +   * 
 +   * <p>
 +   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
 +   * 
 +   * <p>
 +   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
 +   * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
 +   * 
 +   * <p>
 +   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
 +   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
 +   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
 +   * 
 +   * <p>
 +   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
 +   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
 +   * 
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.5.0
 +   */
 +  public static void setOfflineTableScan(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), enableFeature);
 +  }
 +  
 +  /**
 +   * Determines whether a configuration has the offline table scan feature enabled.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.5.0
 +   * @see #setOfflineTableScan(Class, Configuration, boolean)
 +   */
 +  public static Boolean isOfflineScan(Class<?> implementingClass, Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false);
 +  }
 +  
 +  /**
 +   * Initializes an Accumulo {@link TabletLocator} based on the configuration.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return an Accumulo tablet locator
 +   * @throws TableNotFoundException
 +   *           if the table name set on the configuration doesn't exist
 +   * @since 1.5.0
 +   */
 +  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf) throws TableNotFoundException {
 +    String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
 +    if ("MockInstance".equals(instanceType))
 +      return new MockTabletLocator();
 +    Instance instance = getInstance(implementingClass, conf);
 +    String tableName = getInputTableName(implementingClass, conf);
 +    return TabletLocator.getInstance(instance, new Text(Tables.getTableId(instance, tableName)));
 +  }
 +  
 +  // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
 +  /**
 +   * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
 +   * 
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @throws IOException
 +   *           if the context is improperly configured
 +   * @since 1.5.0
 +   */
 +  public static void validateOptions(Class<?> implementingClass, Configuration conf) throws IOException {
 +    if (!isConnectorInfoSet(implementingClass, conf))
 +      throw new IOException("Input info has not been set.");
 +    String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
 +    if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey))
 +      throw new IOException("Instance info has not been set.");
 +    // validate that we can connect as configured
 +    try {
 +      Connector c = getInstance(implementingClass, conf).getConnector(getPrincipal(implementingClass, conf),
 +          CredentialHelper.extractToken(getTokenClass(implementingClass, conf), getToken(implementingClass, conf)));
 +      if (!c.securityOperations().authenticateUser(getPrincipal(implementingClass, conf),
 +          CredentialHelper.extractToken(getTokenClass(implementingClass, conf), getToken(implementingClass, conf))))
 +        throw new IOException("Unable to authenticate user");
 +      if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), getInputTableName(implementingClass, conf), TablePermission.READ))
 +        throw new IOException("Unable to access table");
 +      
 +      if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) {
 +        // validate that any scan-time iterators can be loaded by the the tablet servers
 +        for (IteratorSetting iter : getIterators(implementingClass, conf)) {
 +          if (!c.tableOperations().testClassLoad(getInputTableName(implementingClass, conf), iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
 +            throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
 +        }
 +      }
 +      
 +    } catch (AccumuloException e) {
 +      throw new IOException(e);
 +    } catch (AccumuloSecurityException e) {
 +      throw new IOException(e);
 +    } catch (TableNotFoundException e) {
 +      throw new IOException(e);
 +    }
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/16a2e0f0/core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java
index 69e3ba1,0000000..5ebab3f
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java
@@@ -1,131 -1,0 +1,131 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.security;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.nio.charset.Charset;
 +
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.commons.codec.binary.Base64;
 +import org.apache.commons.io.output.ByteArrayOutputStream;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TDeserializer;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TSerializer;
 +
 +public class CredentialHelper {
 +  static Logger log = Logger.getLogger(CredentialHelper.class);
 +  
 +  public static TCredentials create(String principal, AuthenticationToken token, String instanceID) throws AccumuloSecurityException {
 +    String className = token.getClass().getName();
 +    return new TCredentials(principal, className, ByteBuffer.wrap(toBytes(token)), instanceID);
 +  }
 +  
 +  public static String asBase64String(TCredentials cred) throws AccumuloSecurityException {
 +    return new String(Base64.encodeBase64(asByteArray(cred)), Charset.forName("UTF-8"));
 +  }
 +  
 +  public static byte[] asByteArray(TCredentials cred) throws AccumuloSecurityException {
 +    TSerializer ts = new TSerializer();
 +    try {
 +      return ts.serialize(cred);
 +    } catch (TException e) {
 +      // This really shouldn't happen
 +      log.error(e, e);
 +      throw new AccumuloSecurityException(cred.getPrincipal(), SecurityErrorCode.SERIALIZATION_ERROR);
 +    }
 +  }
 +  
 +  public static TCredentials fromBase64String(String string) throws AccumuloSecurityException {
 +    return fromByteArray(Base64.decodeBase64(string.getBytes(Charset.forName("UTF-8"))));
 +  }
 +  
 +  public static TCredentials fromByteArray(byte[] serializedCredential) throws AccumuloSecurityException {
 +    if (serializedCredential == null)
 +      return null;
 +    TDeserializer td = new TDeserializer();
 +    try {
 +      TCredentials toRet = new TCredentials();
 +      td.deserialize(toRet, serializedCredential);
 +      return toRet;
 +    } catch (TException e) {
 +      // This really shouldn't happen
 +      log.error(e, e);
 +      throw new AccumuloSecurityException("unknown", SecurityErrorCode.SERIALIZATION_ERROR);
 +    }
 +  }
 +  
 +  public static AuthenticationToken extractToken(TCredentials toAuth) throws AccumuloSecurityException {
 +    return extractToken(toAuth.tokenClassName, toAuth.getToken());
 +  }
 +  
 +  public static TCredentials createSquelchError(String principal, AuthenticationToken token, String instanceID) {
 +    try {
 +      return create(principal, token, instanceID);
 +    } catch (AccumuloSecurityException e) {
 +      log.error(e, e);
 +      return null;
 +    }
 +  }
 +  
 +  public static String tokenAsBase64(AuthenticationToken token) throws AccumuloSecurityException {
 +    return new String(Base64.encodeBase64(toBytes(token)), Charset.forName("UTF-8"));
 +  }
 +  
-   private static byte[] toBytes(AuthenticationToken token) throws AccumuloSecurityException {
++  public static byte[] toBytes(AuthenticationToken token) throws AccumuloSecurityException {
 +    try {
 +      ByteArrayOutputStream bais = new ByteArrayOutputStream();
 +      token.write(new DataOutputStream(bais));
 +      byte[] serializedToken = bais.toByteArray();
 +      bais.close();
 +      return serializedToken;
 +    } catch (IOException e) {
 +      log.error(e, e);
 +      throw new AccumuloSecurityException("unknown", SecurityErrorCode.SERIALIZATION_ERROR);
 +    }
 +    
 +  }
 +  
 +  public static AuthenticationToken extractToken(String tokenClass, byte[] token) throws AccumuloSecurityException {
 +    try {
 +      Object obj = Class.forName(tokenClass).newInstance();
 +      if (obj instanceof AuthenticationToken) {
 +        AuthenticationToken toRet = (AuthenticationToken) obj;
 +        toRet.readFields(new DataInputStream(new ByteArrayInputStream(token)));
 +        return toRet;
 +      }
 +    } catch (ClassNotFoundException cnfe) {
 +      log.error(cnfe, cnfe);
 +    } catch (InstantiationException e) {
 +      log.error(e, e);
 +    } catch (IllegalAccessException e) {
 +      log.error(e, e);
 +    } catch (IOException e) {
 +      log.error(e, e);
 +      throw new AccumuloSecurityException("unknown", SecurityErrorCode.SERIALIZATION_ERROR);
 +    }
 +    throw new AccumuloSecurityException("unknown", SecurityErrorCode.INVALID_TOKEN);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/16a2e0f0/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java
index 0000000,0000000..7239b01
new file mode 100644
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java
@@@ -1,0 -1,0 +1,534 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.core.client.mapreduce;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++import java.io.IOException;
++import java.util.Collection;
++import java.util.Collections;
++import java.util.List;
++import java.util.regex.Pattern;
++
++import org.apache.accumulo.core.client.BatchWriter;
++import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.Instance;
++import org.apache.accumulo.core.client.IteratorSetting;
++import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator;
++import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption;
++import org.apache.accumulo.core.client.mapreduce.InputFormatBase.RegexType;
++import org.apache.accumulo.core.client.mock.MockInstance;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Mutation;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.iterators.user.WholeRowIterator;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.Pair;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.io.Text;
++import org.apache.hadoop.mapreduce.InputSplit;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.mapreduce.JobContext;
++import org.apache.hadoop.mapreduce.JobID;
++import org.apache.hadoop.mapreduce.Mapper;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.hadoop.mapreduce.TaskAttemptContext;
++import org.apache.hadoop.mapreduce.TaskAttemptID;
++import org.apache.log4j.Level;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Test;
++
++public class AccumuloInputFormatTest {
++
++  @After
++  public void tearDown() throws Exception {}
++
++  /**
++   * Test basic setting & getting of max versions.
++   * 
++   * @throws IOException
++   *           Signals that an I/O exception has occurred.
++   */
++  @Test
++  public void testMaxVersions() throws IOException {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++    AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1);
++    int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration());
++    assertEquals(1, version);
++  }
++
++  /**
++   * Test max versions with an invalid value.
++   * 
++   * @throws IOException
++   *           Signals that an I/O exception has occurred.
++   */
++  @Test(expected = IOException.class)
++  public void testMaxVersionsLessThan1() throws IOException {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++    AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0);
++  }
++
++  /**
++   * Test no max version configured.
++   */
++  @Test
++  public void testNoMaxVersion() {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++    assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConfiguration()));
++  }
++
++  /**
++   * Check that the iterator configuration is getting stored in the Job conf correctly.
++   */
++  @SuppressWarnings("deprecation")
++  @Test
++  public void testSetIterator() {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++
++    AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow");
++    Configuration conf = job.getConfiguration();
++    String iterators = conf.get("AccumuloInputFormat.iterators");
++    assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators);
++  }
++
++  @Test
++  public void testAddIterator() {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++
++    AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
++    AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
++    IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
++    iter.addOption("v1", "1");
++    iter.addOption("junk", "\0omg:!\\xyzzy");
++    AccumuloInputFormat.addIterator(job.getConfiguration(), iter);
++
++    List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job.getConfiguration());
++
++    // Check the list size
++    assertTrue(list.size() == 3);
++
++    // Walk the list and make sure our settings are correct
++    AccumuloIterator setting = list.get(0);
++    assertEquals(1, setting.getPriority());
++    assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass());
++    assertEquals("WholeRow", setting.getIteratorName());
++
++    setting = list.get(1);
++    assertEquals(2, setting.getPriority());
++    assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
++    assertEquals("Versions", setting.getIteratorName());
++
++    setting = list.get(2);
++    assertEquals(3, setting.getPriority());
++    assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
++    assertEquals("Count", setting.getIteratorName());
++
++    List<AccumuloIteratorOption> iteratorOptions = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
++    assertEquals(2, iteratorOptions.size());
++    assertEquals("Count", iteratorOptions.get(0).getIteratorName());
++    assertEquals("Count", iteratorOptions.get(1).getIteratorName());
++    assertEquals("v1", iteratorOptions.get(0).getKey());
++    assertEquals("1", iteratorOptions.get(0).getValue());
++    assertEquals("junk", iteratorOptions.get(1).getKey());
++    assertEquals("\0omg:!\\xyzzy", iteratorOptions.get(1).getValue());
++  }
++
++  /**
++   * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There
++   * should be no exceptions thrown when trying to parse these types of option entries.
++   * 
++   * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected.
++   */
++  @Test
++  public void testIteratorOptionEncoding() throws Throwable {
++    String key = "colon:delimited:key";
++    String value = "comma,delimited,value";
++    IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class");
++    someSetting.addOption(key, value);
++    Job job = new Job();
++    AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting);
++
++    final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString();
++
++    assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options"));
++
++    List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
++    assertEquals(1, opts.size());
++    assertEquals(opts.get(0).getKey(), key);
++    assertEquals(opts.get(0).getValue(), value);
++
++    someSetting.addOption(key + "2", value);
++    someSetting.setPriority(2);
++    someSetting.setName("it2");
++    AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting);
++    opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
++    assertEquals(3, opts.size());
++    for (AccumuloIteratorOption opt : opts) {
++      assertEquals(opt.getKey().substring(0, key.length()), key);
++      assertEquals(opt.getValue(), value);
++    }
++  }
++
++  /**
++   * Test getting iterator settings for multiple iterators set
++   */
++  @SuppressWarnings("deprecation")
++  @Test
++  public void testGetIteratorSettings() {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++
++    AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow");
++    AccumuloInputFormat.setIterator(job, 2, "org.apache.accumulo.core.iterators.VersioningIterator", "Versions");
++    AccumuloInputFormat.setIterator(job, 3, "org.apache.accumulo.core.iterators.CountingIterator", "Count");
++
++    List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job);
++
++    // Check the list size
++    assertTrue(list.size() == 3);
++
++    // Walk the list and make sure our settings are correct
++    AccumuloIterator setting = list.get(0);
++    assertEquals(1, setting.getPriority());
++    assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass());
++    assertEquals("WholeRow", setting.getIteratorName());
++
++    setting = list.get(1);
++    assertEquals(2, setting.getPriority());
++    assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
++    assertEquals("Versions", setting.getIteratorName());
++
++    setting = list.get(2);
++    assertEquals(3, setting.getPriority());
++    assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
++    assertEquals("Count", setting.getIteratorName());
++
++  }
++
++  /**
++   * Check that the iterator options are getting stored in the Job conf correctly.
++   */
++  @SuppressWarnings("deprecation")
++  @Test
++  public void testSetIteratorOption() {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++    AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue");
++
++    Configuration conf = job.getConfiguration();
++    String options = conf.get("AccumuloInputFormat.iterators.options");
++    assertEquals(new String("someIterator:aKey:aValue"), options);
++  }
++
++  /**
++   * Test getting iterator options for multiple options set
++   */
++  @SuppressWarnings("deprecation")
++  @Test
++  public void testGetIteratorOption() {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++
++    AccumuloInputFormat.setIteratorOption(job, "iterator1", "key1", "value1");
++    AccumuloInputFormat.setIteratorOption(job, "iterator2", "key2", "value2");
++    AccumuloInputFormat.setIteratorOption(job, "iterator3", "key3", "value3");
++
++    List<AccumuloIteratorOption> list = AccumuloInputFormat.getIteratorOptions(job);
++
++    // Check the list size
++    assertEquals(3, list.size());
++
++    // Walk the list and make sure all the options are correct
++    AccumuloIteratorOption option = list.get(0);
++    assertEquals("iterator1", option.getIteratorName());
++    assertEquals("key1", option.getKey());
++    assertEquals("value1", option.getValue());
++
++    option = list.get(1);
++    assertEquals("iterator2", option.getIteratorName());
++    assertEquals("key2", option.getKey());
++    assertEquals("value2", option.getValue());
++
++    option = list.get(2);
++    assertEquals("iterator3", option.getIteratorName());
++    assertEquals("key3", option.getKey());
++    assertEquals("value3", option.getValue());
++  }
++
++  @SuppressWarnings("deprecation")
++  @Test
++  public void testSetRegex() {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++
++    String regex = ">\"*%<>\'\\";
++
++    AccumuloInputFormat.setRegex(job, RegexType.ROW, regex);
++
++    assertTrue(regex.equals(AccumuloInputFormat.getRegex(job, RegexType.ROW)));
++  }
++
++  static class TestMapper extends Mapper<Key,Value,Key,Value> {
++    Key key = null;
++    int count = 0;
++
++    @Override
++    protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
++      if (key != null)
++        assertEquals(key.getRow().toString(), new String(v.get()));
++      assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
++      assertEquals(new String(v.get()), String.format("%09x", count));
++      key = new Key(k);
++      count++;
++    }
++  }
++
++  @Test
++  public void testMap() throws Exception {
++    MockInstance mockInstance = new MockInstance("testmapinstance");
++    Connector c = mockInstance.getConnector("root", new byte[] {});
++    c.tableOperations().create("testtable");
++    BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
++    for (int i = 0; i < 100; i++) {
++      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
++      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
++      bw.addMutation(m);
++    }
++    bw.close();
++
++    Job job = new Job(new Configuration());
++    job.setInputFormatClass(AccumuloInputFormat.class);
++    job.setMapperClass(TestMapper.class);
++    job.setNumReduceTasks(0);
++    AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
++    AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
++
++    AccumuloInputFormat input = new AccumuloInputFormat();
++    List<InputSplit> splits = input.getSplits(job);
++    assertEquals(splits.size(), 1);
++
++    TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
++    for (InputSplit split : splits) {
++      RangeInputSplit risplit = (RangeInputSplit) split;
++      Assert.assertEquals("root", risplit.getUsername());
++      Assert.assertArrayEquals(new byte[0], risplit.getPassword());
++      Assert.assertEquals("testtable", risplit.getTable());
++      Assert.assertEquals(new Authorizations(), risplit.getAuths());
++      Assert.assertEquals("testmapinstance", risplit.getInstanceName());
++      
++      TaskAttemptID id = new TaskAttemptID();
++      TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
++      RecordReader<Key,Value> reader = input.createRecordReader(split, attempt);
++      Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split);
++      reader.initialize(split, context);
++      mapper.run(context);
++    }
++  }
++
++  @Test
++  public void testSimple() throws Exception {
++    MockInstance mockInstance = new MockInstance("testmapinstance");
++    Connector c = mockInstance.getConnector("root", new byte[] {});
++    c.tableOperations().create("testtable2");
++    BatchWriter bw = c.createBatchWriter("testtable2", 10000L, 1000L, 4);
++    for (int i = 0; i < 100; i++) {
++      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
++      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
++      bw.addMutation(m);
++    }
++    bw.close();
++
++    JobContext job = new JobContext(new Configuration(), new JobID());
++    AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations());
++    AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
++    AccumuloInputFormat input = new AccumuloInputFormat();
++    RangeInputSplit ris = new RangeInputSplit();
++    TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
++    RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
++    rr.initialize(ris, tac);
++
++    TestMapper mapper = new TestMapper();
++    Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), tac.getTaskAttemptID(), rr, null, null, null, ris);
++    while (rr.nextKeyValue()) {
++      mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context);
++    }
++  }
++
++  @SuppressWarnings("deprecation")
++  @Test
++  public void testRegex() throws Exception {
++    MockInstance mockInstance = new MockInstance("testmapinstance");
++    Connector c = mockInstance.getConnector("root", new byte[] {});
++    c.tableOperations().create("testtable3");
++    BatchWriter bw = c.createBatchWriter("testtable3", 10000L, 1000L, 4);
++    for (int i = 0; i < 100; i++) {
++      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
++      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
++      bw.addMutation(m);
++    }
++    bw.close();
++
++    JobContext job = new JobContext(new Configuration(), new JobID());
++    AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable3", new Authorizations());
++    AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
++    final String regex = ".*1.*";
++    AccumuloInputFormat.setRegex(job, RegexType.ROW, regex);
++    AccumuloInputFormat input = new AccumuloInputFormat();
++    RangeInputSplit ris = new RangeInputSplit();
++    TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
++    RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
++    rr.initialize(ris, tac);
++
++    Pattern p = Pattern.compile(regex);
++    while (rr.nextKeyValue()) {
++      Assert.assertTrue(p.matcher(rr.getCurrentKey().getRow().toString()).matches());
++    }
++  }
++
++  @SuppressWarnings("deprecation")
++  @Test
++  public void testCorrectRangeInputSplits() throws Exception {
++    JobContext job = new JobContext(new Configuration(), new JobID());
++
++    String username = "user", table = "table", rowRegex = "row.*", colfRegex = "colf.*", colqRegex = "colq.*";
++    String valRegex = "val.*", instance = "instance";
++    byte[] password = "password".getBytes();
++    Authorizations auths = new Authorizations("foo");
++    Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
++    boolean isolated = true, localIters = true;
++    int maxVersions = 5;
++    Level level = Level.WARN;
++
++    Instance inst = new MockInstance(instance);
++    Connector connector = inst.getConnector(username, password);
++    connector.tableOperations().create(table);
++
++    AccumuloInputFormat.setInputInfo(job, username, password, table, auths);
++    AccumuloInputFormat.setMockInstance(job, instance);
++    AccumuloInputFormat.setRegex(job, RegexType.ROW, rowRegex);
++    AccumuloInputFormat.setRegex(job, RegexType.COLUMN_FAMILY, colfRegex);
++    AccumuloInputFormat.setRegex(job, RegexType.COLUMN_QUALIFIER, colqRegex);
++    AccumuloInputFormat.setRegex(job, RegexType.VALUE, valRegex);
++    AccumuloInputFormat.setIsolated(job, isolated);
++    AccumuloInputFormat.setLocalIterators(job, localIters);
++    AccumuloInputFormat.setMaxVersions(job, maxVersions);
++    AccumuloInputFormat.fetchColumns(job, fetchColumns);
++    AccumuloInputFormat.setLogLevel(job, level);
++    
++    AccumuloInputFormat aif = new AccumuloInputFormat();
++    
++    List<InputSplit> splits = aif.getSplits(job);
++    
++    Assert.assertEquals(1, splits.size());
++    
++    InputSplit split = splits.get(0);
++    
++    Assert.assertEquals(RangeInputSplit.class, split.getClass());
++    
++    RangeInputSplit risplit = (RangeInputSplit) split;
++    
++    Assert.assertEquals(username, risplit.getUsername());
++    Assert.assertEquals(table, risplit.getTable());
++    Assert.assertArrayEquals(password, risplit.getPassword());
++    Assert.assertEquals(auths, risplit.getAuths());
++    Assert.assertEquals(instance, risplit.getInstanceName());
++    Assert.assertEquals(rowRegex, risplit.getRowRegex());
++    Assert.assertEquals(colfRegex, risplit.getColfamRegex());
++    Assert.assertEquals(colqRegex, risplit.getColqualRegex());
++    Assert.assertEquals(valRegex, risplit.getValueRegex());
++    Assert.assertEquals(isolated, risplit.isIsolatedScan());
++    Assert.assertEquals(localIters, risplit.usesLocalIterators());
++    Assert.assertEquals(maxVersions, risplit.getMaxVersions().intValue());
++    Assert.assertEquals(fetchColumns, risplit.getFetchedColumns());
++    Assert.assertEquals(level, risplit.getLogLevel());
++  }
++
++  @Test
++  public void testPartialInputSplitDelegationToConfiguration() throws Exception {
++    MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration");
++    Connector c = mockInstance.getConnector("root", new byte[] {});
++    c.tableOperations().create("testtable");
++    BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
++    for (int i = 0; i < 100; i++) {
++      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
++      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
++      bw.addMutation(m);
++    }
++    bw.close();
++
++    Job job = new Job(new Configuration());
++    job.setInputFormatClass(AccumuloInputFormat.class);
++    job.setMapperClass(TestMapper.class);
++    job.setNumReduceTasks(0);
++    AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
++    AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testPartialInputSplitDelegationToConfiguration");
++
++    AccumuloInputFormat input = new AccumuloInputFormat();
++    List<InputSplit> splits = input.getSplits(job);
++    assertEquals(splits.size(), 1);
++
++    TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
++    
++    RangeInputSplit emptySplit = new RangeInputSplit();
++    
++    // Using an empty split should fall back to the information in the Job's Configuration
++    TaskAttemptID id = new TaskAttemptID();
++    TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
++    RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt);
++    Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit);
++    reader.initialize(emptySplit, context);
++    mapper.run(context);
++  }
++
++  @Test(expected = IOException.class)
++  public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception {
++    MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration");
++    Connector c = mockInstance.getConnector("root", new byte[] {});
++    c.tableOperations().create("testtable");
++    BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
++    for (int i = 0; i < 100; i++) {
++      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
++      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
++      bw.addMutation(m);
++    }
++    bw.close();
++
++    Job job = new Job(new Configuration());
++    job.setInputFormatClass(AccumuloInputFormat.class);
++    job.setMapperClass(TestMapper.class);
++    job.setNumReduceTasks(0);
++    AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
++    AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testPartialFailedInputSplitDelegationToConfiguration");
++
++    AccumuloInputFormat input = new AccumuloInputFormat();
++    List<InputSplit> splits = input.getSplits(job);
++    assertEquals(splits.size(), 1);
++
++    TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
++    
++    RangeInputSplit emptySplit = new RangeInputSplit();
++    emptySplit.setUsername("root");
++    emptySplit.setPassword("anythingelse".getBytes());
++    
++    // Using an empty split should fall back to the information in the Job's Configuration
++    TaskAttemptID id = new TaskAttemptID();
++    TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
++    RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt);
++    Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit);
++    reader.initialize(emptySplit, context);
++    mapper.run(context);
++  }
++}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/16a2e0f0/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
index 0000000,0000000..22fb6e1
new file mode 100644
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
@@@ -1,0 -1,0 +1,100 @@@
++package org.apache.accumulo.core.client.mapreduce;
++
++import java.io.ByteArrayInputStream;
++import java.io.ByteArrayOutputStream;
++import java.io.DataInputStream;
++import java.io.DataOutputStream;
++import java.io.IOException;
++import java.util.Arrays;
++import java.util.HashSet;
++import java.util.Set;
++
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Range;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.Pair;
++import org.apache.hadoop.io.Text;
++import org.apache.log4j.Level;
++import org.junit.Assert;
++import org.junit.Test;
++
++public class RangeInputSplitTest {
++
++  @Test
++  public void testSimpleWritable() throws IOException {
++    RangeInputSplit split = new RangeInputSplit(new Range(new Key("a"), new Key("b")), new String[]{"localhost"});
++    
++    ByteArrayOutputStream baos = new ByteArrayOutputStream();
++    DataOutputStream dos = new DataOutputStream(baos);
++    split.write(dos);
++    
++    RangeInputSplit newSplit = new RangeInputSplit();
++    
++    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
++    DataInputStream dis = new DataInputStream(bais);
++    newSplit.readFields(dis);
++    
++    Assert.assertEquals(split.getRange(), newSplit.getRange());
++    Assert.assertTrue(Arrays.equals(split.getLocations(), newSplit.getLocations()));
++  }
++
++
++
++  @Test
++  public void testAllFieldsWritable() throws IOException {
++    RangeInputSplit split = new RangeInputSplit(new Range(new Key("a"), new Key("b")), new String[]{"localhost"});
++    
++    Set<Pair<Text,Text>> fetchedColumns = new HashSet<Pair<Text,Text>>();
++    
++    fetchedColumns.add(new Pair<Text,Text>(new Text("colf1"), new Text("colq1")));
++    fetchedColumns.add(new Pair<Text,Text>(new Text("colf2"), new Text("colq2")));
++    
++    split.setAuths(new Authorizations("foo"));
++    split.setOffline(true);
++    split.setIsolatedScan(true);
++    split.setUsesLocalIterators(true);
++    split.setMaxVersions(5);
++    split.setRowRegex("row");
++    split.setColfamRegex("colf");
++    split.setColqualRegex("colq");
++    split.setValueRegex("value");
++    split.setFetchedColumns(fetchedColumns);
++    split.setPassword("password".getBytes());
++    split.setUsername("root");
++    split.setInstanceName("instance");
++    split.setMockInstance(true);
++    split.setZooKeepers("localhost");
++    split.setLogLevel(Level.WARN);
++    
++    ByteArrayOutputStream baos = new ByteArrayOutputStream();
++    DataOutputStream dos = new DataOutputStream(baos);
++    split.write(dos);
++    
++    RangeInputSplit newSplit = new RangeInputSplit();
++    
++    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
++    DataInputStream dis = new DataInputStream(bais);
++    newSplit.readFields(dis);
++    
++    Assert.assertEquals(split.getRange(), newSplit.getRange());
++    Assert.assertArrayEquals(split.getLocations(), newSplit.getLocations());
++    
++    Assert.assertEquals(split.getAuths(), newSplit.getAuths());
++    Assert.assertEquals(split.isOffline(), newSplit.isOffline());
++    Assert.assertEquals(split.isIsolatedScan(), newSplit.isOffline());
++    Assert.assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators());
++    Assert.assertEquals(split.getMaxVersions(), newSplit.getMaxVersions());
++    Assert.assertEquals(split.getRowRegex(), newSplit.getRowRegex());
++    Assert.assertEquals(split.getColfamRegex(), newSplit.getColfamRegex());
++    Assert.assertEquals(split.getColqualRegex(), newSplit.getColqualRegex());
++    Assert.assertEquals(split.getValueRegex(), newSplit.getValueRegex());
++    Assert.assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns());
++    Assert.assertEquals(new String(split.getPassword()), new String(newSplit.getPassword()));
++    Assert.assertEquals(split.getUsername(), newSplit.getUsername());
++    Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName());
++    Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance());
++    Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers());
++    Assert.assertEquals(split.getLogLevel(), newSplit.getLogLevel());
++  }
++  
++}