You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by cj...@apache.org on 2013/10/05 04:28:31 UTC

[1/2] Squashed commit of the following:

Updated Branches:
  refs/heads/master 7da1164d8 -> b96701f22


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index a1c3f70..ce021a9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.client.mapreduce.lib.util;
 
+import static org.apache.accumulo.core.util.ArgumentChecker.notNull;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -42,11 +44,11 @@ import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mock.MockTabletLocator;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.commons.codec.binary.Base64;
@@ -58,25 +60,27 @@ import org.apache.hadoop.util.StringUtils;
  * @since 1.5.0
  */
 public class InputConfigurator extends ConfiguratorBase {
-
+  
   /**
    * Configuration keys for {@link Scanner}.
    * 
    * @since 1.5.0
    */
-  public static enum ScanOpts {
-    TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS
+  public static enum ScanOpts { 
+    TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS, TABLE_CONFIGS
   }
-
+  
   /**
    * Configuration keys for various features.
    * 
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static enum Features {
     AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE
   }
-
+  
   /**
    * Sets the name of the input table, over which this job will scan.
    * 
@@ -87,27 +91,29 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param tableName
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setInputTableName(Class<?> implementingClass, Configuration conf, String tableName) {
-    ArgumentChecker.notNull(tableName);
+    notNull(tableName);
     conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName);
   }
-
+  
   /**
-   * Gets the table name from the configuration.
+   * Sets the name of the input table, over which this job will scan.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
    * @param conf
    *          the Hadoop configuration object to configure
-   * @return the table name
    * @since 1.5.0
-   * @see #setInputTableName(Class, Configuration, String)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static String getInputTableName(Class<?> implementingClass, Configuration conf) {
     return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
   }
-
+  
   /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
@@ -123,7 +129,7 @@ public class InputConfigurator extends ConfiguratorBase {
     if (auths != null && !auths.isEmpty())
       conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), auths.serialize());
   }
-
+  
   /**
    * Gets the authorizations to set for the scans from the configuration.
    * 
@@ -139,9 +145,9 @@ public class InputConfigurator extends ConfiguratorBase {
     String authString = conf.get(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS));
     return authString == null ? Authorizations.EMPTY : new Authorizations(authString.getBytes());
   }
-
+  
   /**
-   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   * Sets the input ranges to scan on all input tables for this job. If not set, the entire table will be scanned.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -149,23 +155,28 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the Hadoop configuration object to configure
    * @param ranges
    *          the ranges that will be mapped over
+   * @throws IllegalArgumentException
+   *           if the ranges cannot be encoded into base 64
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setRanges(Class<?> implementingClass, Configuration conf, Collection<Range> ranges) {
-    ArgumentChecker.notNull(ranges);
+    notNull(ranges);
+    
     ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
     try {
       for (Range r : ranges) {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         r.write(new DataOutputStream(baos));
-        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()), Constants.UTF8));
+        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
       }
+      conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
     } catch (IOException ex) {
       throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
     }
-    conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
   }
-
+  
   /**
    * Gets the ranges to scan over from a job.
    * 
@@ -178,20 +189,60 @@ public class InputConfigurator extends ConfiguratorBase {
    *           if the ranges have been encoded improperly
    * @since 1.5.0
    * @see #setRanges(Class, Configuration, Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static List<Range> getRanges(Class<?> implementingClass, Configuration conf) throws IOException {
-    ArrayList<Range> ranges = new ArrayList<Range>();
-    for (String rangeString : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES))) {
-      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(Constants.UTF8)));
+    
+    Collection<String> encodedRanges = conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES));
+    List<Range> ranges = new ArrayList<Range>();
+    for (String rangeString : encodedRanges) {
+      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
       Range range = new Range();
       range.readFields(new DataInputStream(bais));
       ranges.add(range);
     }
     return ranges;
   }
-
+  
   /**
-   * Restricts the columns that will be mapped over for this job.
+   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return a list of iterators
+   * @since 1.5.0
+   * @see #addIterator(Class, Configuration, IteratorSetting)
+   * @deprecated since 1.6.0
+   */
+  @Deprecated
+  public static List<IteratorSetting> getIterators(Class<?> implementingClass, Configuration conf) {
+    String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
+    
+    // If no iterators are present, return an empty list
+    if (iterators == null || iterators.isEmpty())
+      return new ArrayList<IteratorSetting>();
+    
+    // Compose the set of iterators encoded in the job configuration
+    StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR);
+    List<IteratorSetting> list = new ArrayList<IteratorSetting>();
+    try {
+      while (tokens.hasMoreTokens()) {
+        String itstring = tokens.nextToken();
+        ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
+        list.add(new IteratorSetting(new DataInputStream(bais)));
+        bais.close();
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("couldn't decode iterator settings");
+    }
+    return list;
+  }
+  
+  /**
+   * Restricts the columns that will be mapped over for this job. This applies the columns to all tables that have been set on the job.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -200,15 +251,20 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param columnFamilyColumnQualifierPairs
    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
+   * @throws IllegalArgumentException
+   *           if the column family is null
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void fetchColumns(Class<?> implementingClass, Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
-    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
-    ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
+    notNull(columnFamilyColumnQualifierPairs);
+    ArrayList<String> columnStrings = new ArrayList<String>();
     for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+      
       if (column.getFirst() == null)
         throw new IllegalArgumentException("Column family can not be null");
-
+      
       String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), Constants.UTF8);
       if (column.getSecond() != null)
         col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), Constants.UTF8);
@@ -216,7 +272,7 @@ public class InputConfigurator extends ConfiguratorBase {
     }
     conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings.toArray(new String[0]));
   }
-
+  
   /**
    * Gets the columns to be mapped over from this job.
    * 
@@ -227,7 +283,9 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return a set of columns
    * @since 1.5.0
    * @see #fetchColumns(Class, Configuration, Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static Set<Pair<Text,Text>> getFetchedColumns(Class<?> implementingClass, Configuration conf) {
     Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
     for (String col : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS))) {
@@ -238,9 +296,9 @@ public class InputConfigurator extends ConfiguratorBase {
     }
     return columns;
   }
-
+  
   /**
-   * Encode an iterator on the input for this job.
+   * Encode an iterator on the input for all tables associated with this job.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property configuration key
@@ -248,8 +306,12 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the Hadoop configuration object to configure
    * @param cfg
    *          the configuration of the iterator
+   * @throws IllegalArgumentException
+   *           if the iterator can't be serialized into the configuration
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void addIterator(Class<?> implementingClass, Configuration conf, IteratorSetting cfg) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     String newIter;
@@ -260,8 +322,9 @@ public class InputConfigurator extends ConfiguratorBase {
     } catch (IOException e) {
       throw new IllegalArgumentException("unable to serialize IteratorSetting");
     }
-
-    String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
+    
+    String confKey = enumToConfKey(implementingClass, ScanOpts.ITERATORS);
+    String iterators = conf.get(confKey);
     // No iterators specified yet, create a new string
     if (iterators == null || iterators.isEmpty()) {
       iterators = newIter;
@@ -270,43 +333,9 @@ public class InputConfigurator extends ConfiguratorBase {
       iterators = iterators.concat(StringUtils.COMMA_STR + newIter);
     }
     // Store the iterators w/ the job
-    conf.set(enumToConfKey(implementingClass, ScanOpts.ITERATORS), iterators);
-  }
-
-  /**
-   * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
-   * 
-   * @param implementingClass
-   *          the class whose name will be used as a prefix for the property configuration key
-   * @param conf
-   *          the Hadoop configuration object to configure
-   * @return a list of iterators
-   * @since 1.5.0
-   * @see #addIterator(Class, Configuration, IteratorSetting)
-   */
-  public static List<IteratorSetting> getIterators(Class<?> implementingClass, Configuration conf) {
-    String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
-
-    // If no iterators are present, return an empty list
-    if (iterators == null || iterators.isEmpty())
-      return new ArrayList<IteratorSetting>();
-
-    // Compose the set of iterators encoded in the job configuration
-    StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR);
-    List<IteratorSetting> list = new ArrayList<IteratorSetting>();
-    try {
-      while (tokens.hasMoreTokens()) {
-        String itstring = tokens.nextToken();
-        ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
-        list.add(new IteratorSetting(new DataInputStream(bais)));
-        bais.close();
-      }
-    } catch (IOException e) {
-      throw new IllegalArgumentException("couldn't decode iterator settings");
-    }
-    return list;
+    conf.set(confKey, iterators);
   }
-
+  
   /**
    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
@@ -322,11 +351,13 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the feature is enabled if true, disabled otherwise
    * @see #setRanges(Class, Configuration, Collection)
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setAutoAdjustRanges(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
     conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has auto-adjust ranges enabled.
    * 
@@ -337,11 +368,13 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return false if the feature is disabled, true otherwise
    * @since 1.5.0
    * @see #setAutoAdjustRanges(Class, Configuration, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static Boolean getAutoAdjustRanges(Class<?> implementingClass, Configuration conf) {
     return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true);
   }
-
+  
   /**
    * Controls the use of the {@link IsolatedScanner} in this job.
    * 
@@ -355,11 +388,13 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setScanIsolation(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
     conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has isolation enabled.
    * 
@@ -370,11 +405,13 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setScanIsolation(Class, Configuration, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static Boolean isIsolated(Class<?> implementingClass, Configuration conf) {
     return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false);
   }
-
+  
   /**
    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
@@ -389,11 +426,13 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setLocalIterators(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
     conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration uses local iterators.
    * 
@@ -404,11 +443,13 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setLocalIterators(Class, Configuration, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static Boolean usesLocalIterators(Class<?> implementingClass, Configuration conf) {
     return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false);
   }
-
+  
   /**
    * <p>
    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
@@ -441,11 +482,13 @@ public class InputConfigurator extends ConfiguratorBase {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setOfflineTableScan(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
     conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has the offline table scan feature enabled.
    * 
@@ -456,11 +499,98 @@ public class InputConfigurator extends ConfiguratorBase {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setOfflineTableScan(Class, Configuration, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated 
   public static Boolean isOfflineScan(Class<?> implementingClass, Configuration conf) {
     return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false);
   }
-
+  
+  /**
+   * Sets configurations for multiple tables at a time.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param tconf
+   *          an array of {@link TableQueryConfig} objects to associate with the job
+   * @since 1.6.0
+   */
+  public static void setTableQueryConfigs(Class<?> implementingClass, Configuration conf, TableQueryConfig... tconf) {
+    List<String> tableQueryConfigStrings = new ArrayList<String>();
+    for (TableQueryConfig queryConfig : tconf) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      try {
+        queryConfig.write(new DataOutputStream(baos));
+      } catch (IOException e) {
+        throw new IllegalStateException("Configuration for " + queryConfig.getTableName() + " could not be serialized.");
+      }
+      tableQueryConfigStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
+    }
+    String confKey = enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS);
+    conf.setStrings(confKey, tableQueryConfigStrings.toArray(new String[0]));
+  }
+  
+  /**
+   * Returns all {@link TableQueryConfig} objects associated with this job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return all of the table query configs for the job
+   * @since 1.6.0
+   */
+  public static List<TableQueryConfig> getTableQueryConfigs(Class<?> implementingClass, Configuration conf) {
+    List<TableQueryConfig> configs = new ArrayList<TableQueryConfig>();
+    Collection<String> configStrings = conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS));
+    if (configStrings != null) {
+      for (String str : configStrings) {
+        try {
+          byte[] bytes = Base64.decodeBase64(str.getBytes());
+          ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+          configs.add(new TableQueryConfig(new DataInputStream(bais)));
+          bais.close();
+        } catch (IOException e) {
+          throw new IllegalStateException("The table query configurations could not be deserialized from the given configuration");
+        }
+      }
+    }
+    TableQueryConfig defaultQueryConfig;
+    try {
+      defaultQueryConfig = getDefaultTableConfig(implementingClass, conf);
+    } catch (IOException e) {
+      throw new IllegalStateException("There was an error deserializing the default table configuration.");
+    }
+    if (defaultQueryConfig != null)
+      configs.add(defaultQueryConfig);
+    
+    return configs;
+  }
+  
+  /**
+   * Returns the {@link TableQueryConfig} for the given table
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param tableName
+   *          the table name for which to fetch the table query config
+   * @return the table query config for the given table name (if it exists) and null if it does not
+   * @since 1.6.0
+   */
+  public static TableQueryConfig getTableQueryConfig(Class<?> implementingClass, Configuration conf, String tableName) {
+    List<TableQueryConfig> queryConfigs = getTableQueryConfigs(implementingClass, conf);
+    for (TableQueryConfig queryConfig : queryConfigs) {
+      if (queryConfig.getTableName().equals(tableName)) {
+        return queryConfig;
+      }
+    }
+    return null;
+  }
+  
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
    * 
@@ -468,20 +598,21 @@ public class InputConfigurator extends ConfiguratorBase {
    *          the class whose name will be used as a prefix for the property configuration key
    * @param conf
    *          the Hadoop configuration object to configure
+   * @param tableName
+   *          The table name for which to initialize the {@link TabletLocator}
    * @return an Accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf) throws TableNotFoundException {
+  public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf, String tableName) throws TableNotFoundException {
     String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
     if ("MockInstance".equals(instanceType))
       return new MockTabletLocator();
     Instance instance = getInstance(implementingClass, conf);
-    String tableName = getInputTableName(implementingClass, conf);
     return TabletLocator.getLocator(instance, new Text(Tables.getTableId(instance, tableName)));
   }
-
+  
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
@@ -507,17 +638,23 @@ public class InputConfigurator extends ConfiguratorBase {
       Connector c = getInstance(implementingClass, conf).getConnector(principal, token);
       if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
-      if (!c.securityOperations().hasTablePermission(principal, getInputTableName(implementingClass, conf), TablePermission.READ))
-        throw new IOException("Unable to access table");
-
-      if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) {
-        // validate that any scan-time iterators can be loaded by the the tablet servers
-        for (IteratorSetting iter : getIterators(implementingClass, conf)) {
-          if (!c.tableOperations().testClassLoad(getInputTableName(implementingClass, conf), iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
-            throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+      
+      for (TableQueryConfig tableConfig : getTableQueryConfigs(implementingClass, conf)) {
+        if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), tableConfig.getTableName(), TablePermission.READ))
+          throw new IOException("Unable to access table");
+      }
+      
+      for (TableQueryConfig tableConfig : getTableQueryConfigs(implementingClass, conf)) {
+        if (!tableConfig.shouldUseLocalIterators()) {
+          if (tableConfig.getIterators() != null) {
+            for (IteratorSetting iter : tableConfig.getIterators()) {
+              if (!c.tableOperations().testClassLoad(tableConfig.getTableName(), iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
+                throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+              
+            }
+          }
         }
       }
-
     } catch (AccumuloException e) {
       throw new IOException(e);
     } catch (AccumuloSecurityException e) {
@@ -526,5 +663,36 @@ public class InputConfigurator extends ConfiguratorBase {
       throw new IOException(e);
     }
   }
-
+  
+  /**
+   * Returns the {@link TableQueryConfig} for the configuration based on the properties set using the single-table input methods.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration key
+   * @param conf
+   *          the Hadoop instance for which to retrieve the configuration
+   * @return the config object built from the single input table properties set on the job
+   * @throws IOException
+   * @since 1.6.0
+   */
+  protected static TableQueryConfig getDefaultTableConfig(Class<?> implementingClass, Configuration conf) throws IOException {
+    String tableName = getInputTableName(implementingClass, conf);
+    if (tableName != null) {
+      TableQueryConfig queryConfig = new TableQueryConfig(getInputTableName(implementingClass, conf));
+      List<IteratorSetting> itrs = getIterators(implementingClass, conf);
+      if (itrs != null)
+        queryConfig.setIterators(itrs);
+      Set<Pair<Text,Text>> columns = getFetchedColumns(implementingClass, conf);
+      if (columns != null)
+        queryConfig.fetchColumns(columns);
+      List<Range> ranges = getRanges(implementingClass, conf);
+      if (ranges != null)
+        queryConfig.setRanges(ranges);
+      
+      queryConfig.setAutoAdjustRanges(getAutoAdjustRanges(implementingClass, conf)).setUseIsolatedScanners(isIsolated(implementingClass, conf))
+          .setUseLocalIterators(usesLocalIterators(implementingClass, conf)).setOfflineScan(isOfflineScan(implementingClass, conf));
+      return queryConfig;
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/conf/TableQueryConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/TableQueryConfig.java b/core/src/main/java/org/apache/accumulo/core/conf/TableQueryConfig.java
new file mode 100644
index 0000000..a5278fa
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/conf/TableQueryConfig.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.conf;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class to holds a query configuration for a table. It contains all the properties needed to specify how rows should be returned from the table.
+ */
+public class TableQueryConfig implements Writable {
+  
+  private String tableName;
+  private List<IteratorSetting> iterators;
+  private List<Range> ranges;
+  private Set<Pair<Text,Text>> columns;
+  
+  private boolean autoAdjustRanges = true;
+  private boolean useLocalIterators = false;
+  private boolean useIsolatedScanners = false;
+  private boolean offlineScan = false;
+  
+  public TableQueryConfig(String tableName) {
+    checkNotNull(tableName);
+    this.tableName = tableName;
+  }
+  
+  public TableQueryConfig(DataInput input) throws IOException {
+    readFields(input);
+  }
+  
+  /**
+   * Returns the table name associated with this configuration
+   */
+  public String getTableName() {
+    return tableName;
+  }
+  
+  /**
+   * Sets the input ranges to scan for all tables associated with this job. This will be added to any per-table ranges that have been set using
+   * 
+   * @param ranges
+   *          the ranges that will be mapped over
+   * @since 1.6.0
+   */
+  public TableQueryConfig setRanges(List<Range> ranges) {
+    this.ranges = ranges;
+    return this;
+  }
+  
+  /**
+   * Returns the ranges to be queried in the configuration
+   */
+  public List<Range> getRanges() {
+    return ranges != null ? ranges : new ArrayList<Range>();
+  }
+  
+  /**
+   * Restricts the columns that will be mapped over for this job for the default input table.
+   * 
+   * @param columns
+   *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
+   *          selected. An empty set is the default and is equivalent to scanning the all columns.
+   * @since 1.6.0
+   */
+  public TableQueryConfig fetchColumns(Set<Pair<Text,Text>> columns) {
+    this.columns = columns;
+    return this;
+  }
+  
+  /**
+   * Returns the columns to be fetched for this configuration
+   */
+  public Set<Pair<Text,Text>> getFetchedColumns() {
+    return columns != null ? columns : new HashSet<Pair<Text,Text>>();
+  }
+  
+  /**
+   * Set iterators on to be used in the query.
+   * 
+   * @param iterators
+   *          the configurations for the iterators
+   * @since 1.6.0
+   */
+  public TableQueryConfig setIterators(List<IteratorSetting> iterators) {
+    this.iterators = iterators;
+    return this;
+  }
+  
+  /**
+   * Returns the iterators to be set on this configuration
+   */
+  public List<IteratorSetting> getIterators() {
+    return iterators != null ? iterators : new ArrayList<IteratorSetting>();
+  }
+  
+  /**
+   * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
+   * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
+   * 
+   * <p>
+   * By default, this feature is <b>enabled</b>.
+   * 
+   * @param autoAdjustRanges
+   *          the feature is enabled if true, disabled otherwise
+   * @see #setRanges(java.util.List)
+   * @since 1.6.0
+   */
+  public TableQueryConfig setAutoAdjustRanges(boolean autoAdjustRanges) {
+    this.autoAdjustRanges = autoAdjustRanges;
+    return this;
+  }
+  
+  /**
+   * Determines whether a configuration has auto-adjust ranges enabled.
+   * 
+   * @return false if the feature is disabled, true otherwise
+   * @since 1.6.0
+   * @see #setAutoAdjustRanges(boolean)
+   */
+  public boolean shouldAutoAdjustRanges() {
+    return autoAdjustRanges;
+  }
+  
+  /**
+   * Controls the use of the {@link org.apache.accumulo.core.client.ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack
+   * to be constructed within the Map task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be
+   * available on the classpath for the task.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param useLocalIterators
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.6.0
+   */
+  public TableQueryConfig setUseLocalIterators(boolean useLocalIterators) {
+    this.useLocalIterators = useLocalIterators;
+    return this;
+  }
+  
+  /**
+   * Determines whether a configuration uses local iterators.
+   * 
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.6.0
+   * @see #setUseLocalIterators(boolean)
+   * @deprecated since 1.6.0
+   */
+  public boolean shouldUseLocalIterators() {
+    return useLocalIterators;
+  }
+  
+  /**
+   * <p>
+   * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
+   * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
+   * fail.
+   * 
+   * <p>
+   * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
+   * 
+   * <p>
+   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
+   * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
+   * 
+   * <p>
+   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
+   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
+   * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
+   * 
+   * <p>
+   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
+   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param offlineScan
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.6.0
+   */
+  public TableQueryConfig setOfflineScan(boolean offlineScan) {
+    this.offlineScan = offlineScan;
+    return this;
+  }
+  
+  /**
+   * Determines whether a configuration has the offline table scan feature enabled.
+   * 
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.6.0
+   * @see #setOfflineScan(boolean)
+   */
+  public boolean isOfflineScan() {
+    return offlineScan;
+  }
+  
+  /**
+   * Controls the use of the {@link org.apache.accumulo.core.client.IsolatedScanner} in this job.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param useIsolatedScanners
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.6.0
+   */
+  public TableQueryConfig setUseIsolatedScanners(boolean useIsolatedScanners) {
+    this.useIsolatedScanners = useIsolatedScanners;
+    return this;
+  }
+  
+  /**
+   * Determines whether a configuration has isolation enabled.
+   * 
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.6.0
+   * @see #setUseIsolatedScanners(boolean)
+   */
+  public boolean shouldUseIsolatedScanners() {
+    return useIsolatedScanners;
+  }
+  
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeUTF(tableName);
+    if (iterators != null) {
+      dataOutput.writeInt(iterators.size());
+      for (IteratorSetting setting : iterators)
+        setting.write(dataOutput);
+    } else {
+      dataOutput.writeInt(0);
+    }
+    if (ranges != null) {
+      dataOutput.writeInt(ranges.size());
+      for (Range range : ranges)
+        range.write(dataOutput);
+    } else {
+      dataOutput.writeInt(0);
+    }
+    if (columns != null) {
+      dataOutput.writeInt(columns.size());
+      for (Pair<Text,Text> column : columns) {
+        if (column.getSecond() == null) {
+          dataOutput.writeInt(1);
+          column.getFirst().write(dataOutput);
+        } else {
+          dataOutput.writeInt(2);
+          column.getFirst().write(dataOutput);
+          column.getSecond().write(dataOutput);
+        }
+      }
+    } else {
+      dataOutput.writeInt(0);
+    }
+    dataOutput.writeBoolean(autoAdjustRanges);
+    dataOutput.writeBoolean(useLocalIterators);
+    dataOutput.writeBoolean(useIsolatedScanners);
+  }
+  
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    this.tableName = dataInput.readUTF();
+    // load iterators
+    long iterSize = dataInput.readInt();
+    if (iterSize > 0)
+      iterators = new ArrayList<IteratorSetting>();
+    for (int i = 0; i < iterSize; i++)
+      iterators.add(new IteratorSetting(dataInput));
+    // load ranges
+    long rangeSize = dataInput.readInt();
+    if (rangeSize > 0)
+      ranges = new ArrayList<Range>();
+    for (int i = 0; i < rangeSize; i++) {
+      Range range = new Range();
+      range.readFields(dataInput);
+      ranges.add(range);
+    }
+    // load columns
+    long columnSize = dataInput.readInt();
+    if (columnSize > 0)
+      columns = new HashSet<Pair<Text,Text>>();
+    for (int i = 0; i < columnSize; i++) {
+      long numPairs = dataInput.readInt();
+      Text colFam = new Text();
+      colFam.readFields(dataInput);
+      if (numPairs == 1) {
+        columns.add(new Pair<Text,Text>(colFam, null));
+      } else if (numPairs == 2) {
+        Text colQual = new Text();
+        colQual.readFields(dataInput);
+        columns.add(new Pair<Text,Text>(colFam, colQual));
+      }
+    }
+    autoAdjustRanges = dataInput.readBoolean();
+    useLocalIterators = dataInput.readBoolean();
+    useIsolatedScanners = dataInput.readBoolean();
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+    
+    TableQueryConfig that = (TableQueryConfig) o;
+    
+    if (autoAdjustRanges != that.autoAdjustRanges)
+      return false;
+    if (offlineScan != that.offlineScan)
+      return false;
+    if (useIsolatedScanners != that.useIsolatedScanners)
+      return false;
+    if (useLocalIterators != that.useLocalIterators)
+      return false;
+    if (columns != null ? !columns.equals(that.columns) : that.columns != null)
+      return false;
+    if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null)
+      return false;
+    if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null)
+      return false;
+    if (tableName != null ? !tableName.equals(that.tableName) : that.tableName != null)
+      return false;
+    
+    return true;
+  }
+  
+  @Override
+  public int hashCode() {
+    int result = tableName != null ? tableName.hashCode() : 0;
+    result = 31 * result + (iterators != null ? iterators.hashCode() : 0);
+    result = 31 * result + (ranges != null ? ranges.hashCode() : 0);
+    result = 31 * result + (columns != null ? columns.hashCode() : 0);
+    result = 31 * result + (autoAdjustRanges ? 1 : 0);
+    result = 31 * result + (useLocalIterators ? 1 : 0);
+    result = 31 * result + (useIsolatedScanners ? 1 : 0);
+    result = 31 * result + (offlineScan ? 1 : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java b/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
index 0c8ba07..2379873 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
@@ -60,4 +60,9 @@ public class ArgumentChecker {
     if (i <= 0)
       throw new IllegalArgumentException("integer should be > 0, was " + i);
   }
+  
+  public static final void notEmpty(Iterable arg) {
+    if (!arg.iterator().hasNext())
+      throw new IllegalArgumentException("Argument should not be empty");
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java
index 4f527e1..d440a6c 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormatTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -31,12 +32,15 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.RegExFilter;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Text;
@@ -55,6 +59,7 @@ public class AccumuloInputFormatTest {
   private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName();
   private static final String INSTANCE_NAME = PREFIX + "_mapred_instance";
   private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1";
+  private static final String TEST_TABLE_2 = PREFIX + "_mapred_table_2";
   
   /**
    * Check that the iterator configuration is getting stored in the Job conf correctly.
@@ -64,7 +69,6 @@ public class AccumuloInputFormatTest {
   @Test
   public void testSetIterator() throws IOException {
     JobConf job = new JobConf();
-    
     IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator");
     AccumuloInputFormat.addIterator(job, is);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -204,10 +208,11 @@ public class AccumuloInputFormatTest {
       @Override
       public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
         try {
+          String tableName = ((InputFormatBase.RangeInputSplit) reporter.getInputSplit()).getTableName();
           if (key != null)
             assertEquals(key.getRow().toString(), new String(v.get()));
-          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
-          assertEquals(new String(v.get()), String.format("%09x", count));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
         } catch (AssertionError e) {
           e1 = e;
         }
@@ -232,13 +237,14 @@ public class AccumuloInputFormatTest {
     @Override
     public int run(String[] args) throws Exception {
       
-      if (args.length != 3) {
-        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>");
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table1> <table2>");
       }
       
       String user = args[0];
       String pass = args[1];
-      String table = args[2];
+      String table1 = args[2];
+      String table2 = args[3];
       
       JobConf job = new JobConf(getConf());
       job.setJarByClass(this.getClass());
@@ -246,9 +252,13 @@ public class AccumuloInputFormatTest {
       job.setInputFormat(AccumuloInputFormat.class);
       
       AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
-      AccumuloInputFormat.setInputTableName(job, table);
       AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
       
+      TableQueryConfig tableConfig1 = new TableQueryConfig(table1);
+      TableQueryConfig tableConfig2 = new TableQueryConfig(table2);
+      
+      AccumuloInputFormat.setTableQueryConfigs(job, tableConfig1, tableConfig2);
+      
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
       job.setMapOutputValueClass(Value.class);
@@ -269,16 +279,70 @@ public class AccumuloInputFormatTest {
     MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
     Connector c = mockInstance.getConnector("root", new PasswordToken(""));
     c.tableOperations().create(TEST_TABLE_1);
+    c.tableOperations().create(TEST_TABLE_2);
     BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig());
     for (int i = 0; i < 100; i++) {
-      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
-      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
-      bw.addMutation(m);
+      Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1)));
+      t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes()));
+      bw.addMutation(t1m);
+      Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1)));
+      t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes()));
+      bw2.addMutation(t2m);
     }
     bw.close();
+    bw2.close();
     
-    MRTester.main(new String[] {"root", "", TEST_TABLE_1});
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
     assertNull(e1);
     assertNull(e2);
   }
+  
+  /**
+   * Verify {@link org.apache.accumulo.core.conf.TableQueryConfig} objects get correctly serialized in the JobContext.
+   */
+  @Test
+  public void testTableQueryConfigSerialization() throws IOException {
+    
+    JobConf job = new JobConf();
+    
+    TableQueryConfig table1 = new TableQueryConfig(TEST_TABLE_1).setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text, Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+    
+    TableQueryConfig table2 = new TableQueryConfig(TEST_TABLE_2).setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text, Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+    
+    AccumuloInputFormat.setTableQueryConfigs(job, table1, table2);
+    
+    assertEquals(table1, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_1));
+    assertEquals(table2, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_2));
+  }
+  
+  /**
+   * Verify that union of legacy input and new multi-table input get returned for backwards compatibility.
+   */
+  @Test
+  public void testTableQueryConfigSingleAndMultitableMethods() throws IOException {
+    
+    JobConf job = new JobConf();
+    
+    TableQueryConfig table1 = new TableQueryConfig(TEST_TABLE_1).setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text, Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+    
+    TableQueryConfig table2 = new TableQueryConfig(TEST_TABLE_2).setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text, Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+    
+    AccumuloInputFormat.setTableQueryConfigs(job, table1);
+    AccumuloInputFormat.setInputTableName(job, table2.getTableName());
+    AccumuloInputFormat.setRanges(job, table2.getRanges());
+    AccumuloInputFormat.fetchColumns(job, table2.getFetchedColumns());
+    AccumuloInputFormat.addIterator(job, table2.getIterators().get(0));
+    
+    assertEquals(table1, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_1));
+    assertEquals(table2, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_2));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index c9539c4..6f92dec 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -31,12 +32,15 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.RegExFilter;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -53,6 +57,7 @@ public class AccumuloInputFormatTest {
   private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName();
   private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
   private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
+  private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2";
   
   /**
    * Check that the iterator configuration is getting stored in the Job conf correctly.
@@ -196,6 +201,7 @@ public class AccumuloInputFormatTest {
   private static AssertionError e2 = null;
   
   private static class MRTester extends Configured implements Tool {
+    
     private static class TestMapper extends Mapper<Key,Value,Key,Value> {
       Key key = null;
       int count = 0;
@@ -203,10 +209,11 @@ public class AccumuloInputFormatTest {
       @Override
       protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
         try {
+          String tableName = ((InputFormatBase.RangeInputSplit) context.getInputSplit()).getTableName();
           if (key != null)
             assertEquals(key.getRow().toString(), new String(v.get()));
-          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
-          assertEquals(new String(v.get()), String.format("%09x", count));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
         } catch (AssertionError e) {
           e1 = e;
         }
@@ -227,13 +234,14 @@ public class AccumuloInputFormatTest {
     @Override
     public int run(String[] args) throws Exception {
       
-      if (args.length != 3) {
-        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>");
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table1> <table2>");
       }
       
       String user = args[0];
       String pass = args[1];
-      String table = args[2];
+      String table1 = args[2];
+      String table2 = args[3];
       
       Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
       job.setJarByClass(this.getClass());
@@ -241,7 +249,11 @@ public class AccumuloInputFormatTest {
       job.setInputFormatClass(AccumuloInputFormat.class);
       
       AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
-      AccumuloInputFormat.setInputTableName(job, table);
+      
+      TableQueryConfig tableConfig1 = new TableQueryConfig(table1);
+      TableQueryConfig tableConfig2 = new TableQueryConfig(table2);
+      
+      AccumuloInputFormat.setTableQueryConfigs(job, tableConfig1, tableConfig2);
       AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
       
       job.setMapperClass(TestMapper.class);
@@ -261,21 +273,78 @@ public class AccumuloInputFormatTest {
     }
   }
   
+  /**
+   * Generate incrementing counts and attach table name to the key/value so that order and multi-table data can be verified.
+   */
   @Test
   public void testMap() throws Exception {
     MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
     Connector c = mockInstance.getConnector("root", new PasswordToken(""));
     c.tableOperations().create(TEST_TABLE_1);
+    c.tableOperations().create(TEST_TABLE_2);
     BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig());
     for (int i = 0; i < 100; i++) {
-      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
-      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
-      bw.addMutation(m);
+      Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1)));
+      t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes()));
+      bw.addMutation(t1m);
+      Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1)));
+      t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes()));
+      bw2.addMutation(t2m);
     }
     bw.close();
+    bw2.close();
     
-    MRTester.main(new String[] {"root", "", TEST_TABLE_1});
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
     assertNull(e1);
     assertNull(e2);
   }
+  
+  /**
+   * Verify {@link TableQueryConfig} objects get correctly serialized in the JobContext.
+   */
+  @Test
+  public void testTableQueryConfigSerialization() throws IOException {
+    
+    Job job = new Job();
+    
+    TableQueryConfig table1 = new TableQueryConfig(TEST_TABLE_1).setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text, Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+    
+    TableQueryConfig table2 = new TableQueryConfig(TEST_TABLE_2).setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text, Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+    
+    AccumuloInputFormat.setTableQueryConfigs(job, table1, table2);
+    
+    assertEquals(table1, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_1));
+    assertEquals(table2, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_2));
+  }
+  
+  /**
+   * Verify that union of legacy input and new multi-table input get returned for backwards compatibility.
+   */
+  @Test
+  public void testTableQueryConfigSingleAndMultitableMethods() throws IOException {
+    
+    Job job = new Job();
+    
+    TableQueryConfig table1 = new TableQueryConfig(TEST_TABLE_1).setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text, Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+    
+    TableQueryConfig table2 = new TableQueryConfig(TEST_TABLE_2).setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text, Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+    
+    AccumuloInputFormat.setTableQueryConfigs(job, table1);
+    AccumuloInputFormat.setInputTableName(job, table2.getTableName());
+    AccumuloInputFormat.setRanges(job, table2.getRanges());
+    AccumuloInputFormat.fetchColumns(job, table2.getFetchedColumns());
+    AccumuloInputFormat.addIterator(job, table2.getIterators().get(0));
+    
+    assertEquals(table1, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_1));
+    assertEquals(table2, AccumuloInputFormat.getTableQueryConfig(job, TEST_TABLE_2));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java b/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
new file mode 100644
index 0000000..ac3340f
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
@@ -0,0 +1,91 @@
+package org.apache.accumulo.core.conf;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TableQueryConfigTest {
+  
+  private static final String TEST_TABLE = "TEST_TABLE";
+  private TableQueryConfig tableQueryConfig;
+  
+  @Before
+  public void setUp() {
+    tableQueryConfig = new TableQueryConfig(TEST_TABLE);
+  }
+  
+  @Test
+  public void testSerialization_OnlyTable() throws IOException {
+    byte[] serialized = serialize(tableQueryConfig);
+    TableQueryConfig actualConfig = deserialize(serialized);
+    
+    assertEquals(tableQueryConfig, actualConfig);
+  }
+  
+  @Test
+  public void testSerialization_ranges() throws IOException {
+    List<Range> ranges = new ArrayList<Range>();
+    ranges.add(new Range("a", "b"));
+    ranges.add(new Range("c", "d"));
+    tableQueryConfig.setRanges(ranges);
+    
+    byte[] serialized = serialize(tableQueryConfig);
+    TableQueryConfig actualConfig = deserialize(serialized);
+    
+    assertEquals(ranges, actualConfig.getRanges());
+  }
+  
+  @Test
+  public void testSerialization_columns() throws IOException {
+    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
+    columns.add(new Pair<Text,Text>(new Text("cf1"), new Text("cq1")));
+    columns.add(new Pair<Text,Text>(new Text("cf2"), null));
+    tableQueryConfig.fetchColumns(columns);
+    
+    byte[] serialized = serialize(tableQueryConfig);
+    TableQueryConfig actualConfig = deserialize(serialized);
+    
+    assertEquals(actualConfig.getFetchedColumns(), columns);
+  }
+  
+  @Test
+  public void testSerialization_iterators() throws IOException {
+    List<IteratorSetting> settings = new ArrayList<IteratorSetting>();
+    settings.add(new IteratorSetting(50, "iter", "iterclass"));
+    settings.add(new IteratorSetting(55, "iter2", "iterclass2"));
+    tableQueryConfig.setIterators(settings);
+    byte[] serialized = serialize(tableQueryConfig);
+    TableQueryConfig actualConfig = deserialize(serialized);
+    assertEquals(actualConfig.getIterators(), settings);
+    
+  }
+  
+  private byte[] serialize(TableQueryConfig tableQueryConfig) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    tableQueryConfig.write(new DataOutputStream(baos));
+    baos.close();
+    return baos.toByteArray();
+  }
+  
+  private TableQueryConfig deserialize(byte[] bytes) throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+    TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais));
+    bais.close();
+    return actualConfig;
+  }
+}


[2/2] git commit: Squashed commit of the following:

Posted by cj...@apache.org.
Squashed commit of the following:

commit 3227a822379718d6c1297f11d7af37a716f78a60
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Tue Oct 1 23:20:34 2013 -0400

    Adding the following:
    - Deprecation to InputConfigurator, mapred.InputFormatBase, mapreduce.InputFormatBase
    - Comments to TableQueryConfig
    - Multi-table support to mapred.InputFormatBase

    ACCUMULO-391

commit 6648e8a1c97939f740b24f9368ecda9f7072cbd2
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Tue Oct 1 21:45:37 2013 -0400

    Fixing some more formatting. Adding license headers. ACCUMULO-391

commit 53bcc85689510fc988c9e9f6aff0da0cb7091c6c
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Mon Sep 30 21:01:55 2013 -0400

    Cleaning up tests. Adding test for legacy input for base + new multi-table methods. ACCUMULO-391

commit e4e05c804ea7f486290181f0246cf6b2880f5d1a
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Sun Sep 29 21:05:55 2013 -0400

    Fixing some formatting. Adding some comments. ACCUMULO-391

commit 10b4eb8206ab4395ef2d4df375b52a7ffe77d655
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Sun Sep 29 20:37:07 2013 -0400

    ACCUMULO-1732 Using table id in RangeInputSplit so that it can be resolved back to "working" table name in mappers. Scanner uses the "working" table name while everything else can still safely use the original configured table name.

commit 7b8585f0333c09674f7612b4dc24887f684413fe
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Sat Sep 28 23:23:48 2013 -0400

    Removing deprecation for now until we have some discussions. Updating/adding comments. ACCUMULO-391

commit 273ee49530de28c2c5dfe39c80ab0c90c3c3a95f
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Sat Sep 28 23:01:04 2013 -0400

    The legacy mapred InputFormatBase now verifies (and fixes the scanner for) a possible change in table name that could happen between the configuration of the map/reduce job and the actual processing of the scanner for a specific split. In that case, the most recent table name associated with the id is always used for the scanner (though the table name that was expected during job setup is still used in the RangeInputSplit). ACCUMULO-391

commit e6a7c962f707487d832ba4b16c1f9066d13ff8f1
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Sat Sep 28 22:53:42 2013 -0400

    The original single-table setters/getters now populate a "default" TableQueryConfig object under the hood. This should make the switch over much easier. Deprecated single table methods in light of the API changes for the new configuration object. ACCUMULO-391

commit fdf4cadb16c29fc03a610cf83399ee26d7f83bc9
Author: Corey J. Nolet <cj...@gmail.com>
Date:   Sat Sep 28 21:58:40 2013 -0400

    Adding new TableQueryConfig object for setting multiple table info in the InputFormatBase


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b96701f2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b96701f2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b96701f2

Branch: refs/heads/master
Commit: b96701f220ecb3e891a71741179b867429fa1d39
Parents: 7da1164
Author: Corey J. Nolet <cj...@gmail.com>
Authored: Wed Oct 2 20:43:43 2013 -0400
Committer: Corey J. Nolet <cj...@gmail.com>
Committed: Wed Oct 2 20:43:43 2013 -0400

----------------------------------------------------------------------
 .../core/client/mapred/AccumuloInputFormat.java |   1 +
 .../core/client/mapred/InputFormatBase.java     | 440 ++++++++++-------
 .../client/mapreduce/AccumuloInputFormat.java   |   2 +-
 .../core/client/mapreduce/InputFormatBase.java  | 475 ++++++++++++-------
 .../mapreduce/lib/util/InputConfigurator.java   | 346 ++++++++++----
 .../accumulo/core/conf/TableQueryConfig.java    | 369 ++++++++++++++
 .../accumulo/core/util/ArgumentChecker.java     |   5 +
 .../client/mapred/AccumuloInputFormatTest.java  |  86 +++-
 .../mapreduce/AccumuloInputFormatTest.java      |  89 +++-
 .../core/conf/TableQueryConfigTest.java         |  91 ++++
 10 files changed, 1452 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
index bbbd0c3..b3b9fc7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.Reporter;
  * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)}
  * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)}
  * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)}
+ * <li>{@link AccumuloInputFormat#setTableQueryConfigs(JobConf, org.apache.accumulo.core.conf.TableQueryConfig...)}</li>
  * </ul>
  * 
  * Other static methods are optional.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
index c796cd2..ae51581 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -47,6 +48,7 @@ import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.PartialKey;
@@ -68,6 +70,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
  * <p>
@@ -79,10 +83,10 @@ import org.apache.log4j.Logger;
  * See {@link AccumuloInputFormat} for an example implementation.
  */
 public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
-
+  
   private static final Class<?> CLASS = AccumuloInputFormat.class;
   protected static final Logger log = Logger.getLogger(CLASS);
-
+  
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -102,7 +106,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
-
+  
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    * 
@@ -121,7 +125,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
     InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
   }
-
+  
   /**
    * Determines if the connector has been configured.
    * 
@@ -134,7 +138,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Boolean isConnectorInfoSet(JobConf job) {
     return InputConfigurator.isConnectorInfoSet(CLASS, job);
   }
-
+  
   /**
    * Gets the user name from the configuration.
    * 
@@ -147,7 +151,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getPrincipal(JobConf job) {
     return InputConfigurator.getPrincipal(CLASS, job);
   }
-
+  
   /**
    * Gets the serialized token class from either the configuration or the token file.
    * 
@@ -158,7 +162,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static String getTokenClass(JobConf job) {
     return getAuthenticationToken(job).getClass().getName();
   }
-
+  
   /**
    * Gets the serialized token from either the configuration or the token file.
    * 
@@ -169,7 +173,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static byte[] getToken(JobConf job) {
     return AuthenticationTokenSerializer.serialize(getAuthenticationToken(job));
   }
-
+  
   /**
    * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured.
    * 
@@ -183,7 +187,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static AuthenticationToken getAuthenticationToken(JobConf job) {
     return InputConfigurator.getAuthenticationToken(CLASS, job);
   }
-
+  
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
    * 
@@ -198,7 +202,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) {
     InputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers);
   }
-
+  
   /**
    * Configures a {@link MockInstance} for this job.
    * 
@@ -211,7 +215,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setMockInstance(JobConf job, String instanceName) {
     InputConfigurator.setMockInstance(CLASS, job, instanceName);
   }
-
+  
   /**
    * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
@@ -225,7 +229,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Instance getInstance(JobConf job) {
     return InputConfigurator.getInstance(CLASS, job);
   }
-
+  
   /**
    * Sets the log level for this job.
    * 
@@ -238,7 +242,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setLogLevel(JobConf job, Level level) {
     InputConfigurator.setLogLevel(CLASS, job, level);
   }
-
+  
   /**
    * Gets the log level from this configuration.
    * 
@@ -251,7 +255,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Level getLogLevel(JobConf job) {
     return InputConfigurator.getLogLevel(CLASS, job);
   }
-
+  
   /**
    * Sets the name of the input table, over which this job will scan.
    * 
@@ -260,11 +264,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param tableName
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setInputTableName(JobConf job, String tableName) {
     InputConfigurator.setInputTableName(CLASS, job, tableName);
   }
-
+  
   /**
    * Gets the table name from the configuration.
    * 
@@ -273,11 +279,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return the table name
    * @since 1.5.0
    * @see #setInputTableName(JobConf, String)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static String getInputTableName(JobConf job) {
     return InputConfigurator.getInputTableName(CLASS, job);
   }
-
+  
   /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
@@ -290,7 +298,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public static void setScanAuthorizations(JobConf job, Authorizations auths) {
     InputConfigurator.setScanAuthorizations(CLASS, job, auths);
   }
-
+  
   /**
    * Gets the authorizations to set for the scans from the configuration.
    * 
@@ -303,7 +311,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static Authorizations getScanAuthorizations(JobConf job) {
     return InputConfigurator.getScanAuthorizations(CLASS, job);
   }
-
+  
   /**
    * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
    * 
@@ -312,11 +320,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param ranges
    *          the ranges that will be mapped over
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setRanges(JobConf job, Collection<Range> ranges) {
     InputConfigurator.setRanges(CLASS, job, ranges);
   }
-
+  
   /**
    * Gets the ranges to scan over from a job.
    * 
@@ -327,11 +337,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *           if the ranges have been encoded improperly
    * @since 1.5.0
    * @see #setRanges(JobConf, Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static List<Range> getRanges(JobConf job) throws IOException {
     return InputConfigurator.getRanges(CLASS, job);
   }
-
+  
   /**
    * Restricts the columns that will be mapped over for this job.
    * 
@@ -341,11 +353,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void fetchColumns(JobConf job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job, columnFamilyColumnQualifierPairs);
   }
-
+  
   /**
    * Gets the columns to be mapped over from this job.
    * 
@@ -354,11 +368,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return a set of columns
    * @since 1.5.0
    * @see #fetchColumns(JobConf, Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobConf job) {
     return InputConfigurator.getFetchedColumns(CLASS, job);
   }
-
+  
   /**
    * Encode an iterator on the input for this job.
    * 
@@ -367,11 +383,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param cfg
    *          the configuration of the iterator
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void addIterator(JobConf job, IteratorSetting cfg) {
     InputConfigurator.addIterator(CLASS, job, cfg);
   }
-
+  
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
    * 
@@ -380,11 +398,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return a list of iterators
    * @since 1.5.0
    * @see #addIterator(JobConf, IteratorSetting)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static List<IteratorSetting> getIterators(JobConf job) {
     return InputConfigurator.getIterators(CLASS, job);
   }
-
+  
   /**
    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
@@ -398,11 +418,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *          the feature is enabled if true, disabled otherwise
    * @see #setRanges(JobConf, Collection)
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setAutoAdjustRanges(JobConf job, boolean enableFeature) {
     InputConfigurator.setAutoAdjustRanges(CLASS, job, enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has auto-adjust ranges enabled.
    * 
@@ -411,11 +433,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return false if the feature is disabled, true otherwise
    * @since 1.5.0
    * @see #setAutoAdjustRanges(JobConf, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean getAutoAdjustRanges(JobConf job) {
     return InputConfigurator.getAutoAdjustRanges(CLASS, job);
   }
-
+  
   /**
    * Controls the use of the {@link IsolatedScanner} in this job.
    * 
@@ -427,11 +451,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setScanIsolation(JobConf job, boolean enableFeature) {
     InputConfigurator.setScanIsolation(CLASS, job, enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has isolation enabled.
    * 
@@ -440,11 +466,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setScanIsolation(JobConf, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean isIsolated(JobConf job) {
     return InputConfigurator.isIsolated(CLASS, job);
   }
-
+  
   /**
    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
@@ -457,11 +485,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setLocalIterators(JobConf job, boolean enableFeature) {
     InputConfigurator.setLocalIterators(CLASS, job, enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration uses local iterators.
    * 
@@ -470,11 +500,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setLocalIterators(JobConf, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean usesLocalIterators(JobConf job) {
     return InputConfigurator.usesLocalIterators(CLASS, job);
   }
-
+  
   /**
    * <p>
    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
@@ -505,11 +537,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setOfflineTableScan(JobConf job, boolean enableFeature) {
     InputConfigurator.setOfflineTableScan(CLASS, job, enableFeature);
   }
-
+  
   /**
    * Determines whether a configuration has the offline table scan feature enabled.
    * 
@@ -518,11 +552,61 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setOfflineTableScan(JobConf, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean isOfflineScan(JobConf job) {
     return InputConfigurator.isOfflineScan(CLASS, job);
   }
-
+  
+  /**
+   * Sets the {@link TableQueryConfig} objects on the given Hadoop configuration
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param configs
+   *          the table query configs to be set on the configuration.
+   * @since 1.6.0
+   */
+  public static void setTableQueryConfigs(JobConf job, TableQueryConfig... configs) {
+    checkNotNull(configs);
+    InputConfigurator.setTableQueryConfigs(CLASS, job, configs);
+  }
+  
+  /**
+   * Fetches all {@link TableQueryConfig}s that have been set on the given Hadoop configuration.
+   * 
+   * <p>
+   * Note this also returns the {@link TableQueryConfig} representing the table configurations set through the single table input methods (
+   * {@link #setInputTableName(JobConf, String)}, {@link #setRanges(JobConf, java.util.Collection)}, {@link #fetchColumns(JobConf, java.util.Collection)},
+   * {@link #addIterator(JobConf, IteratorSetting)}, etc...)
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @return
+   * @since 1.6.0
+   */
+  public static List<TableQueryConfig> getTableQueryConfigs(JobConf job) {
+    return InputConfigurator.getTableQueryConfigs(CLASS, job);
+  }
+  
+  /**
+   * Fetches a {@link TableQueryConfig} that has been set on the configuration for a specific table.
+   * 
+   * <p>
+   * null is returned in the event that the table doesn't exist.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableName
+   *          the table name for which to grab the config object
+   * @return the {@link TableQueryConfig} for the given table
+   * @since 1.6.0
+   */
+  public static TableQueryConfig getTableQueryConfig(JobConf job, String tableName) {
+    return InputConfigurator.getTableQueryConfig(CLASS, job, tableName);
+  }
+  
   /**
    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
    * 
@@ -533,13 +617,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobConf job) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, job);
+  protected static TabletLocator getTabletLocator(JobConf job, String tableName) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, job, tableName);
   }
-
+  
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
   /**
-   * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
+   * Check whether a configuration is fully configured to be used with an Accumulo {@link InputFormat}.
    * 
    * @param job
    *          the Hadoop context for the configured job
@@ -550,7 +634,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   protected static void validateOptions(JobConf job) throws IOException {
     InputConfigurator.validateOptions(CLASS, job);
   }
-
+  
   /**
    * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
    * types.
@@ -565,7 +649,7 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
-
+    
     /**
      * Apply the configured iterators from the configuration to the scanner.
      * 
@@ -574,13 +658,13 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
      * @param scanner
      *          the scanner to configure
      */
-    protected void setupIterators(JobConf job, Scanner scanner) {
-      List<IteratorSetting> iterators = getIterators(job);
-      for (IteratorSetting iterator : iterators) {
+    protected void setupIterators(JobConf job, Scanner scanner, String tableName) {
+      TableQueryConfig config = getTableQueryConfig(job, tableName);
+      List<IteratorSetting> iterators = config.getIterators();
+      for (IteratorSetting iterator : iterators)
         scanner.addScanIterator(iterator);
-      }
     }
-
+    
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
@@ -592,32 +676,47 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       String user = getPrincipal(job);
       AuthenticationToken token = getAuthenticationToken(job);
       Authorizations authorizations = getScanAuthorizations(job);
-
+      
+      TableQueryConfig tableConfig = getTableQueryConfig(job, split.getTableName());
+      
+      // in case the table name changed, we can still use the previous name for terms of configuration,
+      // but for the scanner, we'll need to reference the new table name.
+      String actualNameForId = split.getTableName();
+      if (!(instance instanceof MockInstance)) {
+        try {
+          actualNameForId = Tables.getTableName(instance, split.getTableId());
+          if (!actualNameForId.equals(split.getTableName()))
+            log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
+        } catch (TableNotFoundException e) {
+          throw new IOException("The specified table was not found for id=" + split.getTableId());
+        }
+      }
+      
       try {
         log.debug("Creating connector with user: " + user);
         Connector conn = instance.getConnector(user, token);
         log.debug("Creating scanner for table: " + getInputTableName(job));
         log.debug("Authorizations are: " + authorizations);
-        if (isOfflineScan(job)) {
-          scanner = new OfflineScanner(instance, new Credentials(user, token), Tables.getTableId(instance, getInputTableName(job)), authorizations);
+        if (tableConfig.isOfflineScan()) {
+          scanner = new OfflineScanner(instance, new Credentials(user, token), split.getTableId(), authorizations);
         } else {
-          scanner = conn.createScanner(getInputTableName(job), authorizations);
+          scanner = conn.createScanner(actualNameForId, authorizations);
         }
-        if (isIsolated(job)) {
+        if (tableConfig.shouldUseIsolatedScanners()) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
         }
-        if (usesLocalIterators(job)) {
+        if (tableConfig.shouldUseLocalIterators()) {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupIterators(job, scanner);
+        setupIterators(job, scanner, split.getTableName());
       } catch (Exception e) {
         throw new IOException(e);
       }
-
+      
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(job)) {
+      for (Pair<Text,Text> c : tableConfig.getFetchedColumns()) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -626,58 +725,58 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
           scanner.fetchColumnFamily(c.getFirst());
         }
       }
-
+      
       scanner.setRange(split.getRange());
-
+      
       numKeysRead = 0;
-
+      
       // do this last after setting all scanner options
       scannerIterator = scanner.iterator();
     }
-
+    
     @Override
     public void close() {}
-
+    
     @Override
     public long getPos() throws IOException {
       return numKeysRead;
     }
-
+    
     @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
       return split.getProgress(currentKey);
     }
-
+    
     protected Key currentKey = null;
-
+    
   }
-
+  
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException,
       AccumuloSecurityException {
-
+    
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
+    
     Instance instance = getInstance(job);
     Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job));
     String tableId = Tables.getTableId(instance, tableName);
-
+    
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
       Tables.clearCache(instance);
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
       }
     }
-
+    
     for (Range range : ranges) {
       Text startRow;
-
+      
       if (range.getStartKey() != null)
         startRow = range.getStartKey().getRow();
       else
         startRow = new Text();
-
+      
       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
       Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
@@ -685,73 +784,73 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
       scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
       scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
       scanner.setRange(metadataRange);
-
+      
       RowIterator rowIter = new RowIterator(scanner);
-
+      
       KeyExtent lastExtent = null;
-
+      
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
         KeyExtent extent = null;
         String location = null;
-
+        
         while (row.hasNext()) {
           Entry<Key,Value> entry = row.next();
           Key key = entry.getKey();
-
+          
           if (key.getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
             last = entry.getValue().toString();
           }
-
+          
           if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
               || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
             location = entry.getValue().toString();
           }
-
+          
           if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
-
+          
         }
-
+        
         if (location != null)
           return null;
-
+        
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-
+        
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-
+        
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
           binnedRanges.put(last, tabletRanges);
         }
-
+        
         List<Range> rangeList = tabletRanges.get(extent);
         if (rangeList == null) {
           rangeList = new ArrayList<Range>();
           tabletRanges.put(extent, rangeList);
         }
-
+        
         rangeList.add(range);
-
+        
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
           break;
         }
-
+        
         lastExtent = extent;
       }
-
+      
     }
-
+    
     return binnedRanges;
   }
-
+  
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
@@ -759,110 +858,111 @@ public abstract class InputFormatBase<K,V> implements InputFormat<K,V> {
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     log.setLevel(getLogLevel(job));
     validateOptions(job);
-
-    String tableName = getInputTableName(job);
-    boolean autoAdjust = getAutoAdjustRanges(job);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job);
-
-    if (ranges.isEmpty()) {
-      ranges = new ArrayList<Range>(1);
-      ranges.add(new Range());
-    }
-
-    // get the metadata information for these ranges
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl;
-    try {
-      if (isOfflineScan(job)) {
-        binnedRanges = binOfflineTable(job, tableName, ranges);
-        while (binnedRanges == null) {
-          // Some tablets were still online, try again
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(job, tableName, ranges);
-        }
-      } else {
-        Instance instance = getInstance(job);
-        String tableId = null;
-        tl = getTabletLocator(job);
-        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
-        tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(job), getAuthenticationToken(job)), ranges, binnedRanges).isEmpty()) {
-          if (!(instance instanceof MockInstance)) {
-            if (tableId == null)
-              tableId = Tables.getTableId(instance, tableName);
-            if (!Tables.exists(instance, tableId))
-              throw new TableDeletedException(tableId);
-            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-              throw new TableOfflineException(instance, tableId);
+    
+    LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
+    List<TableQueryConfig> tableConfigs = getTableQueryConfigs(job);
+    for (TableQueryConfig tableConfig : tableConfigs) {
+      
+      boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+      String tableId = null;
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<Range>(1);
+        ranges.add(new Range());
+      }
+      
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      TabletLocator tl;
+      try {
+        if (tableConfig.isOfflineScan()) {
+          binnedRanges = binOfflineTable(job, tableConfig.getTableName(), ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            binnedRanges = binOfflineTable(job, tableConfig.getTableName(), ranges);
           }
-          binnedRanges.clear();
-          log.warn("Unable to locate bins for specified ranges. Retrying.");
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+        } else {
+          Instance instance = getInstance(job);
+          tl = getTabletLocator(job, tableConfig.getTableName());
+          // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
           tl.invalidateCache();
+          Credentials creds = new Credentials(getPrincipal(job), getAuthenticationToken(job));
+          
+          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+            if (!(instance instanceof MockInstance)) {
+              if (!Tables.exists(instance, tableId))
+                throw new TableDeletedException(tableId);
+              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, tableId);
+              tableId = Tables.getTableId(instance, tableConfig.getTableName());
+            }
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            tl.invalidateCache();
+          }
         }
+      } catch (Exception e) {
+        throw new IOException(e);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
-    HashMap<Range,ArrayList<String>> splitsToAdd = null;
-
-    if (!autoAdjust)
-      splitsToAdd = new HashMap<Range,ArrayList<String>>();
-
-    HashMap<String,String> hostNameCache = new HashMap<String,String>();
-
-    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-      String ip = tserverBin.getKey().split(":", 2)[0];
-      String location = hostNameCache.get(ip);
-      if (location == null) {
-        InetAddress inetAddress = InetAddress.getByName(ip);
-        location = inetAddress.getHostName();
-        hostNameCache.put(ip, location);
-      }
-
-      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-        Range ke = extentRanges.getKey().toDataRange();
-        for (Range r : extentRanges.getValue()) {
-          if (autoAdjust) {
-            // divide ranges into smaller ranges, based on the tablets
-            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
-          } else {
-            // don't divide ranges
-            ArrayList<String> locations = splitsToAdd.get(r);
-            if (locations == null)
-              locations = new ArrayList<String>(1);
-            locations.add(location);
-            splitsToAdd.put(r, locations);
+      
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+      
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<Range,ArrayList<String>>();
+      
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+      for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+        for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          for (Range r : extentRanges.getValue()) {
+            if (autoAdjust) {
+              // divide ranges into smaller ranges, based on the tablets
+              splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, ke.clip(r), new String[] {location}));
+            } else {
+              // don't divide ranges
+              ArrayList<String> locations = splitsToAdd.get(r);
+              if (locations == null)
+                locations = new ArrayList<String>(1);
+              locations.add(location);
+              splitsToAdd.put(r, locations);
+            }
           }
         }
       }
+      
+      if (!autoAdjust)
+        for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
+          splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, entry.getKey(), entry.getValue().toArray(new String[0])));
     }
-
-    if (!autoAdjust)
-      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
-        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
+    
     return splits.toArray(new InputSplit[splits.size()]);
   }
-
+  
   /**
    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
    */
   public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit implements InputSplit {
-
+    
     public RangeInputSplit() {
       super();
     }
-
+    
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       super(split);
     }
-
-    protected RangeInputSplit(String table, Range range, String[] locations) {
-      super(table, range, locations);
+    
+    protected RangeInputSplit(String table, String tableId, Range range, String[] locations) {
+      super(table, tableId, range, locations);
     }
-
   }
-
+  
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
index 1cbb606..44dfc33 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
@@ -39,9 +39,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  * <ul>
  * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
  * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, String)}
- * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)}
  * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
  * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)}
+ * <li>{@link AccumuloInputFormat#setTableQueryConfigs(Job, org.apache.accumulo.core.conf.TableQueryConfig...)} 
  * </ul>
  * 
  * Other static methods are optional.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b96701f2/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 13f9708..acff7e2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -16,6 +16,9 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -26,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -50,7 +54,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -157,6 +161,21 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   }
 
   /**
+   * Gets the table name from the configuration.
+   * 
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the table name
+   * @since 1.5.0
+   * @see #setInputTableName(Job, String)
+   * @deprecated since 1.6.0
+   */
+  @Deprecated
+  protected static String getInputTableName(JobContext context) {
+    return InputConfigurator.getInputTableName(CLASS, getConfiguration(context));
+  }
+
+  /**
    * Gets the serialized token class from either the configuration or the token file.
    * 
    * @since 1.5.0
@@ -268,25 +287,14 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param tableName
    *          the table to use when the tablename is null in the write call
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setInputTableName(Job job, String tableName) {
     InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
   }
 
   /**
-   * Gets the table name from the configuration.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the table name
-   * @since 1.5.0
-   * @see #setInputTableName(Job, String)
-   */
-  protected static String getInputTableName(JobContext context) {
-    return InputConfigurator.getInputTableName(CLASS, getConfiguration(context));
-  }
-
-  /**
    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
    * 
    * @param job
@@ -313,14 +321,16 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   }
 
   /**
-   * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
+   * Sets the input ranges to scan for all tables associated with this job.
    * 
    * @param job
    *          the Hadoop job instance to be configured
    * @param ranges
    *          the ranges that will be mapped over
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setRanges(Job job, Collection<Range> ranges) {
     InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
   }
@@ -331,17 +341,18 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param context
    *          the Hadoop context for the configured job
    * @return the ranges
-   * @throws IOException
-   *           if the ranges have been encoded improperly
    * @since 1.5.0
    * @see #setRanges(Job, Collection)
+   * @see #setRanges(org.apache.hadoop.mapreduce.Job, java.util.Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static List<Range> getRanges(JobContext context) throws IOException {
     return InputConfigurator.getRanges(CLASS, getConfiguration(context));
   }
 
   /**
-   * Restricts the columns that will be mapped over for this job.
+   * Restricts the columns that will be mapped over for this job for the default input table.
    * 
    * @param job
    *          the Hadoop job instance to be configured
@@ -349,7 +360,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
   }
@@ -362,20 +375,24 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return a set of columns
    * @since 1.5.0
    * @see #fetchColumns(Job, Collection)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
     return InputConfigurator.getFetchedColumns(CLASS, getConfiguration(context));
   }
 
   /**
-   * Encode an iterator on the input for this job.
+   * Encode an iterator on the default all tables for this job.
    * 
    * @param job
    *          the Hadoop job instance to be configured
    * @param cfg
    *          the configuration of the iterator
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void addIterator(Job job, IteratorSetting cfg) {
     InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
   }
@@ -388,7 +405,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return a list of iterators
    * @since 1.5.0
    * @see #addIterator(Job, IteratorSetting)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static List<IteratorSetting> getIterators(JobContext context) {
     return InputConfigurator.getIterators(CLASS, getConfiguration(context));
   }
@@ -406,7 +425,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the feature is enabled if true, disabled otherwise
    * @see #setRanges(Job, Collection)
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
     InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -419,12 +440,63 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return false if the feature is disabled, true otherwise
    * @since 1.5.0
    * @see #setAutoAdjustRanges(Job, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean getAutoAdjustRanges(JobContext context) {
     return InputConfigurator.getAutoAdjustRanges(CLASS, getConfiguration(context));
   }
 
   /**
+   * Sets the {@link TableQueryConfig} objects on the given Hadoop configuration
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param configs
+   *          the table query configs to be set on the configuration.
+   * @since 1.6.0
+   */
+  public static void setTableQueryConfigs(Job job, TableQueryConfig... configs) {
+    checkNotNull(configs);
+    InputConfigurator.setTableQueryConfigs(CLASS, getConfiguration(job), configs);
+  }
+
+  /**
+   * Fetches all {@link TableQueryConfig}s that have been set on the given Hadoop configuration.
+   * 
+   * <p>
+   * Note this also returns the {@link TableQueryConfig} representing the table configurations set through the single table input methods like
+   * {@link #setInputTableName(org.apache.hadoop.mapreduce.Job, String)}, {@link #setRanges(org.apache.hadoop.mapreduce.Job, java.util.Collection)},
+   * {@link #fetchColumns(org.apache.hadoop.mapreduce.Job, java.util.Collection)},
+   * {@link #addIterator(org.apache.hadoop.mapreduce.Job, org.apache.accumulo.core.client.IteratorSetting)}, etc...)
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @return
+   * @since 1.6.0
+   */
+  public static List<TableQueryConfig> getTableQueryConfigs(JobContext job) {
+    return InputConfigurator.getTableQueryConfigs(CLASS, getConfiguration(job));
+  }
+
+  /**
+   * Fetches a {@link TableQueryConfig} that has been set on the configuration for a specific table.
+   * 
+   * <p>
+   * null is returned in the event that the table doesn't exist.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableName
+   *          the table name for which to grab the config object
+   * @return the {@link TableQueryConfig} for the given table
+   * @since 1.6.0
+   */
+  public static TableQueryConfig getTableQueryConfig(JobContext job, String tableName) {
+    return InputConfigurator.getTableQueryConfig(CLASS,getConfiguration(job),tableName);
+  }
+
+  /**
    * Controls the use of the {@link IsolatedScanner} in this job.
    * 
    * <p>
@@ -435,7 +507,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setScanIsolation(Job job, boolean enableFeature) {
     InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -448,7 +522,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setScanIsolation(Job, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean isIsolated(JobContext context) {
     return InputConfigurator.isIsolated(CLASS, getConfiguration(context));
   }
@@ -465,7 +541,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setLocalIterators(Job job, boolean enableFeature) {
     InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -478,7 +556,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setLocalIterators(Job, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean usesLocalIterators(JobContext context) {
     return InputConfigurator.usesLocalIterators(CLASS, getConfiguration(context));
   }
@@ -513,7 +593,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param enableFeature
    *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   public static void setOfflineTableScan(Job job, boolean enableFeature) {
     InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
   }
@@ -526,7 +608,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @return true if the feature is enabled, false otherwise
    * @since 1.5.0
    * @see #setOfflineTableScan(Job, boolean)
+   * @deprecated since 1.6.0
    */
+  @Deprecated
   protected static boolean isOfflineScan(JobContext context) {
     return InputConfigurator.isOfflineScan(CLASS, getConfiguration(context));
   }
@@ -536,13 +620,15 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * 
    * @param context
    *          the Hadoop context for the configured job
+   * @param table
+   *          the table for which to initialize the locator
    * @return an Accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
    * @since 1.5.0
    */
-  protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
-    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context));
+  protected static TabletLocator getTabletLocator(JobContext context, String table) throws TableNotFoundException {
+    return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), table);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
@@ -575,60 +661,79 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     protected long numKeysRead;
     protected Iterator<Entry<Key,Value>> scannerIterator;
     protected RangeInputSplit split;
-
+  
     /**
-     * Apply the configured iterators from the configuration to the scanner.
+     * Apply the configured iterators from the configuration to the scanner. This applies both the default iterators and the per-table iterators.
      * 
      * @param context
      *          the Hadoop context for the configured job
      * @param scanner
      *          the scanner to configure
+     * @param tableName
+     *          the table name for which to set up the iterators
      */
-    protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
-      List<IteratorSetting> iterators = getIterators(context);
-      for (IteratorSetting iterator : iterators) {
+    protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName) {
+      TableQueryConfig config = getTableQueryConfig(context, tableName);
+      List<IteratorSetting> iterators = config.getIterators();
+      for (IteratorSetting iterator : iterators)
         scanner.addScanIterator(iterator);
-      }
     }
-
+  
     /**
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
     @Override
     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
+    
       Scanner scanner;
       split = (RangeInputSplit) inSplit;
-      log.debug("Initializing input split: " + split.range);
+      log.debug("Initializing input split: " + split.getRange());
       Instance instance = getInstance(attempt);
       String principal = getPrincipal(attempt);
+    
+      TableQueryConfig tableConfig = getTableQueryConfig(attempt, split.getTableName());
+    
+      // in case the table name changed, we can still use the previous name for terms of configuration,
+      // but for the scanner, we'll need to reference the new table name.
+      String actualNameForId = split.getTableName();
+      if (!(instance instanceof MockInstance)) {
+        try {
+          actualNameForId = Tables.getTableName(instance, split.getTableId());
+          if (!actualNameForId.equals(split.getTableName()))
+            log.debug("Table name changed from " + split.getTableName() + " to " + actualNameForId);
+        } catch (TableNotFoundException e) {
+          throw new IOException("The specified table was not found for id=" + split.getTableId());
+        }
+      }
+    
       AuthenticationToken token = getAuthenticationToken(attempt);
       Authorizations authorizations = getScanAuthorizations(attempt);
-
       try {
         log.debug("Creating connector with user: " + principal);
+      
         Connector conn = instance.getConnector(principal, token);
-        log.debug("Creating scanner for table: " + getInputTableName(attempt));
+        log.debug("Creating scanner for table: " + split.getTableName());
         log.debug("Authorizations are: " + authorizations);
-        if (isOfflineScan(attempt)) {
-          scanner = new OfflineScanner(instance, new Credentials(principal, token), Tables.getTableId(instance, getInputTableName(attempt)), authorizations);
+        if (tableConfig.isOfflineScan()) {
+          scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations);
         } else {
-          scanner = conn.createScanner(getInputTableName(attempt), authorizations);
+          scanner = conn.createScanner(actualNameForId, authorizations);
         }
-        if (isIsolated(attempt)) {
+        if (tableConfig.shouldUseIsolatedScanners()) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
         }
-        if (usesLocalIterators(attempt)) {
+        if (tableConfig.shouldUseLocalIterators()) {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupIterators(attempt, scanner);
+        setupIterators(attempt, scanner, split.getTableName());
       } catch (Exception e) {
         throw new IOException(e);
       }
-
+    
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
+      for (Pair<Text,Text> c : tableConfig.getFetchedColumns()) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -637,35 +742,34 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
           scanner.fetchColumnFamily(c.getFirst());
         }
       }
-
-      scanner.setRange(split.range);
-
+    
+      scanner.setRange(split.getRange());
       numKeysRead = 0;
-
+    
       // do this last after setting all scanner options
       scannerIterator = scanner.iterator();
     }
-
+  
     @Override
     public void close() {}
-
+  
     @Override
     public float getProgress() throws IOException {
       if (numKeysRead > 0 && currentKey == null)
         return 1.0f;
       return split.getProgress(currentKey);
     }
-
+  
     protected K currentK = null;
     protected V currentV = null;
     protected Key currentKey = null;
     protected Value currentValue = null;
-
+  
     @Override
     public K getCurrentKey() throws IOException, InterruptedException {
       return currentK;
     }
-
+  
     @Override
     public V getCurrentValue() throws IOException, InterruptedException {
       return currentV;
@@ -674,28 +778,28 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
 
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
       AccumuloException, AccumuloSecurityException {
-
+  
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
+  
     Instance instance = getInstance(context);
     Connector conn = instance.getConnector(getPrincipal(context), getAuthenticationToken(context));
     String tableId = Tables.getTableId(instance, tableName);
-
+  
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
       Tables.clearCache(instance);
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
       }
     }
-
+  
     for (Range range : ranges) {
       Text startRow;
-
+    
       if (range.getStartKey() != null)
         startRow = range.getStartKey().getRow();
       else
         startRow = new Text();
-
+    
       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
       Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
@@ -703,164 +807,169 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
       scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
       scanner.setRange(metadataRange);
-
+    
       RowIterator rowIter = new RowIterator(scanner);
-
       KeyExtent lastExtent = null;
-
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
         KeyExtent extent = null;
         String location = null;
-
+      
         while (row.hasNext()) {
           Entry<Key,Value> entry = row.next();
           Key key = entry.getKey();
-
+        
           if (key.getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
             last = entry.getValue().toString();
           }
-
+        
           if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
               || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
             location = entry.getValue().toString();
           }
-
+        
           if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
-
+        
         }
-
+      
         if (location != null)
           return null;
-
+      
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-
+      
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-
+      
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
           binnedRanges.put(last, tabletRanges);
         }
-
+      
         List<Range> rangeList = tabletRanges.get(extent);
         if (rangeList == null) {
           rangeList = new ArrayList<Range>();
           tabletRanges.put(extent, rangeList);
         }
-
+      
         rangeList.add(range);
-
+      
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
           break;
         }
-
+      
         lastExtent = extent;
       }
-
+    
     }
-
+  
     return binnedRanges;
   }
 
   /**
-   * Read the metadata table to get tablets and match up ranges to them.
+   * Gets the splits of the tables that have been set on the job.
+   * 
+   * @param conf
+   *          the configuration of the job
+   * @return the splits from the tables based on the ranges.
+   * @throws IOException
+   *           if a table set on the job doesn't exist or an error occurs initializing the tablet locator
    */
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    log.setLevel(getLogLevel(context));
-    validateOptions(context);
-
-    String tableName = getInputTableName(context);
-    boolean autoAdjust = getAutoAdjustRanges(context);
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
-
-    if (ranges.isEmpty()) {
-      ranges = new ArrayList<Range>(1);
-      ranges.add(new Range());
-    }
-
-    // get the metadata information for these ranges
-    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl;
-    try {
-      if (isOfflineScan(context)) {
-        binnedRanges = binOfflineTable(context, tableName, ranges);
-        while (binnedRanges == null) {
-          // Some tablets were still online, try again
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
-          binnedRanges = binOfflineTable(context, tableName, ranges);
-        }
-      } else {
-        Instance instance = getInstance(context);
-        String tableId = null;
-        tl = getTabletLocator(context);
-        // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
-        tl.invalidateCache();
-        while (!tl.binRanges(new Credentials(getPrincipal(context), getAuthenticationToken(context)), ranges, binnedRanges).isEmpty()) {
-          if (!(instance instanceof MockInstance)) {
-            if (tableId == null)
-              tableId = Tables.getTableId(instance, tableName);
-            if (!Tables.exists(instance, tableId))
-              throw new TableDeletedException(tableId);
-            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-              throw new TableOfflineException(instance, tableId);
+  public List<InputSplit> getSplits(JobContext conf) throws IOException {
+    log.setLevel(getLogLevel(conf));
+    validateOptions(conf);
+  
+    LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
+    List<TableQueryConfig> tableConfigs = getTableQueryConfigs(conf);
+    for (TableQueryConfig tableConfig : tableConfigs) {
+    
+      boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+      String tableId = null;
+      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
+      if (ranges.isEmpty()) {
+        ranges = new ArrayList<Range>(1);
+        ranges.add(new Range());
+      }
+    
+      // get the metadata information for these ranges
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      TabletLocator tl;
+      try {
+        if (tableConfig.isOfflineScan()) {
+          binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges);
+          while (binnedRanges == null) {
+            // Some tablets were still online, try again
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            binnedRanges = binOfflineTable(conf, tableConfig.getTableName(), ranges);
+          
           }
-          binnedRanges.clear();
-          log.warn("Unable to locate bins for specified ranges. Retrying.");
-          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+        } else {
+          Instance instance = getInstance(conf);
+          tl = getTabletLocator(conf, tableConfig.getTableName());
+          // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
           tl.invalidateCache();
+          Credentials creds = new Credentials(getPrincipal(conf), getAuthenticationToken(conf));
+        
+          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+            if (!(instance instanceof MockInstance)) {
+              if (!Tables.exists(instance, tableId))
+                throw new TableDeletedException(tableId);
+              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, tableId);
+              tableId = Tables.getTableId(instance, tableConfig.getTableName());
+            }
+            binnedRanges.clear();
+            log.warn("Unable to locate bins for specified ranges. Retrying.");
+            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+            tl.invalidateCache();
+          }
         }
+      } catch (Exception e) {
+        throw new IOException(e);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
-    HashMap<Range,ArrayList<String>> splitsToAdd = null;
-
-    if (!autoAdjust)
-      splitsToAdd = new HashMap<Range,ArrayList<String>>();
-
-    HashMap<String,String> hostNameCache = new HashMap<String,String>();
-
-    for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
-      String ip = tserverBin.getKey().split(":", 2)[0];
-      String location = hostNameCache.get(ip);
-      if (location == null) {
-        InetAddress inetAddress = InetAddress.getByName(ip);
-        location = inetAddress.getHostName();
-        hostNameCache.put(ip, location);
-      }
-
-      for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
-        Range ke = extentRanges.getKey().toDataRange();
-        for (Range r : extentRanges.getValue()) {
-          if (autoAdjust) {
-            // divide ranges into smaller ranges, based on the tablets
-            splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
-          } else {
-            // don't divide ranges
-            ArrayList<String> locations = splitsToAdd.get(r);
-            if (locations == null)
-              locations = new ArrayList<String>(1);
-            locations.add(location);
-            splitsToAdd.put(r, locations);
+    
+      HashMap<Range,ArrayList<String>> splitsToAdd = null;
+    
+      if (!autoAdjust)
+        splitsToAdd = new HashMap<Range,ArrayList<String>>();
+    
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+      for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
+        String ip = tserverBin.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+        for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
+          Range ke = extentRanges.getKey().toDataRange();
+          for (Range r : extentRanges.getValue()) {
+            if (autoAdjust) {
+              // divide ranges into smaller ranges, based on the tablets
+              splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, ke.clip(r), new String[] {location}));
+            } else {
+              // don't divide ranges
+              ArrayList<String> locations = splitsToAdd.get(r);
+              if (locations == null)
+                locations = new ArrayList<String>(1);
+              locations.add(location);
+              splitsToAdd.put(r, locations);
+            }
           }
         }
       }
+    
+      if (!autoAdjust)
+        for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
+          splits.add(new RangeInputSplit(tableConfig.getTableName(), tableId, entry.getKey(), entry.getValue().toArray(new String[0])));
     }
-
-    if (!autoAdjust)
-      for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
-        splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
     return splits;
   }
 
@@ -870,30 +979,53 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   public static class RangeInputSplit extends InputSplit implements Writable {
     private Range range;
     private String[] locations;
-
+    private String tableId;
+    private String tableName;
+  
     public RangeInputSplit() {
       range = new Range();
       locations = new String[0];
+      tableId = "";
+      tableName = "";
     }
-
+  
     public RangeInputSplit(RangeInputSplit split) throws IOException {
       this.setRange(split.getRange());
       this.setLocations(split.getLocations());
+      this.setTableName(split.getTableName());
     }
-
-    protected RangeInputSplit(String table, Range range, String[] locations) {
+  
+    protected RangeInputSplit(String table, String tableId, Range range, String[] locations) {
       this.range = range;
       this.locations = locations;
+      this.tableName = table;
+      this.tableId = tableId;
     }
-
+  
     public Range getRange() {
       return range;
     }
-
+  
     public void setRange(Range range) {
       this.range = range;
     }
-
+  
+    public String getTableName() {
+      return tableName;
+    }
+  
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+  
+    public void setTableId(String tableId) {
+      this.tableId = tableId;
+    }
+  
+    public String getTableId() {
+      return tableId;
+    }
+  
     private static byte[] extractBytes(ByteSequence seq, int numBytes) {
       byte[] bytes = new byte[numBytes + 1];
       bytes[0] = 0;
@@ -905,7 +1037,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       }
       return bytes;
     }
-
+  
     public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
       int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
       BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
@@ -913,7 +1045,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
       return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
     }
-
+  
     public float getProgress(Key currentKey) {
       if (currentKey == null)
         return 0f;
@@ -932,7 +1064,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       // if we can't figure it out, then claim no progress
       return 0f;
     }
-
+  
     /**
      * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
      */
@@ -942,41 +1074,43 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
       int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
       long diff = 0;
-
+    
       byte[] start = startRow.getBytes();
       byte[] stop = stopRow.getBytes();
       for (int i = 0; i < maxCommon; ++i) {
         diff |= 0xff & (start[i] ^ stop[i]);
         diff <<= Byte.SIZE;
       }
-
+    
       if (startRow.getLength() != stopRow.getLength())
         diff |= 0xff;
-
+    
       return diff + 1;
     }
-
+  
     @Override
     public String[] getLocations() throws IOException {
       return locations;
     }
-
+  
     public void setLocations(String[] locations) {
       this.locations = locations;
     }
-
+  
     @Override
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
+      tableName = in.readUTF();
       int numLocs = in.readInt();
       locations = new String[numLocs];
       for (int i = 0; i < numLocs; ++i)
         locations[i] = in.readUTF();
     }
-
+  
     @Override
     public void write(DataOutput out) throws IOException {
       range.write(out);
+      out.writeUTF(tableName);
       out.writeInt(locations.length);
       for (int i = 0; i < locations.length; ++i)
         out.writeUTF(locations[i]);
@@ -994,5 +1128,4 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
       throw new RuntimeException(e);
     }
   }
-
 }