You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/07 06:24:22 UTC

[1/6] git commit: ACCUMULO-1854 Add helper method to extract AOF configuration entries from a Configuration object.

Updated Branches:
  refs/heads/ACCUMULO-1854-multi-aif 921617d4d -> c50a22296


ACCUMULO-1854 Add helper method to extract AOF configuration entries
from a Configuration object.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4111d1ed
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4111d1ed
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4111d1ed

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 4111d1ede797addb29ba1983f751f197cb42deca
Parents: 921617d
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 6 14:08:23 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Nov 6 14:08:23 2013 -0500

----------------------------------------------------------------------
 .../client/mapreduce/AccumuloOutputFormat.java  | 22 +++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4111d1ed/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 9b9041a..66e85fd 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -19,6 +19,9 @@ package org.apache.accumulo.core.client.mapreduce;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -112,6 +115,20 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
     NUM_CONFIGURATIONS_PROCESSED.set(0);
   }
   
+  public static Map<String,String> getRelevantEntries(Configuration conf) {
+    ArgumentChecker.notNull(conf);
+    
+    HashMap<String,String> confEntries = new HashMap<String,String>();
+    for (Entry<String,String> entry : conf) {
+      final String key = entry.getKey();
+      if (0 == key.indexOf(PREFIX)) {
+        confEntries.put(key, entry.getValue());
+      }
+    }
+    
+    return confEntries;
+  }
+  
   /**
    * Configure the output format.
    * 
@@ -612,8 +629,10 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
     final Configuration conf = job.getConfiguration();
     
     if (0 == sequencesToCheck) {
+      log.debug("No configurations loaded, checking the default");
       checkConfiguration(conf, sequencesToCheck);
     } else {
+      log.debug(sequencesToCheck + " configurations loaded");
       for (int i = 1; i <= sequencesToCheck; i++) {
         checkConfiguration(conf, i);
       }
@@ -621,8 +640,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   private void checkConfiguration(Configuration conf, int sequence) throws IOException {
-    if (!conf.getBoolean(merge(OUTPUT_INFO_HAS_BEEN_SET, sequence), false))
+    if (!conf.getBoolean(merge(OUTPUT_INFO_HAS_BEEN_SET, sequence), false)) {
       throw new IOException("Output info for sequence " + sequence + " has not been set.");
+    }
     if (!conf.getBoolean(merge(INSTANCE_HAS_BEEN_SET, sequence), false))
       throw new IOException("Instance info for sequence " + sequence + " has not been set.");
     try {


[4/6] git commit: ACCUMULO-1854 Make the same changes to AOF as AIF has.

Posted by el...@apache.org.
ACCUMULO-1854 Make the same changes to AOF as AIF has.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1fe22381
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1fe22381
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1fe22381

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 1fe223813a246a9943dbc9eda1a71de07ae27f12
Parents: c5dc070
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 6 15:44:19 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Nov 6 15:44:19 2013 -0500

----------------------------------------------------------------------
 .../client/mapreduce/AccumuloOutputFormat.java  | 163 +++++++++++++++----
 .../mapreduce/AccumuloOutputFormatTest.java     |   8 +-
 2 files changed, 136 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fe22381/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 66e85fd..5e5e43d 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -17,10 +17,13 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -41,6 +44,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -91,28 +95,126 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   private static final long DEFAULT_MAX_MUTATION_BUFFER_SIZE = 50 * 1024 * 1024; // 50MB
   private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute
   private static final int DEFAULT_NUM_WRITE_THREADS = 2;
-
-  private static final AtomicInteger NUM_CONFIGURATIONS_LOADED = new AtomicInteger(0);
-  private static final AtomicInteger NUM_CONFIGURATIONS_PROCESSED = new AtomicInteger(0);
+  
   private static final int DEFAULT_SEQUENCE = 0;
   private static final String SEQ_DELIM = ".";
+  
+  private static final String COMMA = ",";
+  private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredSeqs";
+  private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
+  private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
+  private static final String TRUE = "true";
+
+
 
   /**
    * Get a unique identifier for these configurations
    * 
    * @return A unique number to provide to future AccumuloInputFormat calls
    */
-  public static int nextSequence() {
-    return NUM_CONFIGURATIONS_LOADED.incrementAndGet();
+  public static synchronized int nextSequence(Configuration conf) {
+    String value = conf.get(CONFIGURED_SEQUENCES);
+    if (null == value) {
+      conf.set(CONFIGURED_SEQUENCES, "1");
+      return 1;
+    } else {
+      String[] splitValues = StringUtils.split(value, COMMA);
+      int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
+      
+      conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
+      return newValue;
+    }
   }
+  
+  /**
+   * Using the provided Configuration, return the next sequence number to process.
+   * @param conf A Configuration object used to store AccumuloInputFormat information into
+   * @return The next sequence number to process, -1 when finished.
+   * @throws NoSuchElementException
+   */
+  protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException {
+    String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
+    
+    // We haven't set anything, so we need to find the first to return
+    if (null == processedConfs || 0 == processedConfs.length) {
+      // Check to see if the default sequence was used
+      boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
+      
+      // If so, set that we're processing it and return the value of the default
+      if (defaultSeqUsed) {
+        conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
+        return DEFAULT_SEQUENCE;
+      }
+      
+      String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
+      
+      // There was *nothing* loaded, fail.
+      if (null == loadedConfs || 0 == loadedConfs.length) {
+        throw new NoSuchElementException("Sequence was requested to process but none exist to return");
+      }
+      
+      // We have loaded configuration(s), use the first
+      int firstLoaded = Integer.parseInt(loadedConfs[0]);
+      conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
+      
+      return firstLoaded;
+    }
+    
+    // We've previously parsed some confs, need to find the next one to load
+    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
+    String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
+    
+    // We only have the default sequence, no specifics.
+    // Getting here, we already know that we processed that default
+    if (null == configuredSequencesArray) {
+      return -1;
+    }
 
-  protected static String merge(String name, Integer sequence) {
-    return name + SEQ_DELIM + sequence;
+    List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1);
+    
+    // If we used the default sequence ID, add that into the list of configured sequences
+    if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
+      configuredSequences.add(DEFAULT_SEQUENCE);
+    }
+
+    // Add the rest of any sequences to our list
+    for (String configuredSequence : configuredSequencesArray) {
+      configuredSequences.add(Integer.parseInt(configuredSequence));
+    }
+    
+    int lastParsedSeqIndex = configuredSequences.size() - 1;
+    
+    // Find the next sequence number after the one we last processed
+    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
+      int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
+      
+      if (lastLoadedValue == lastProcessedSeq) {
+        break;
+      }
+    }
+    
+    // We either had no sequences to match or we matched the last configured sequence
+    // Both of which are equivalent to no (more) sequences to process
+    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) {
+      return -1;
+    }
+    
+    // Get the value of the sequence at that offset
+    int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
+    conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
+    
+    return nextSequence;
   }
   
-  public static void resetCounters() {
-    NUM_CONFIGURATIONS_LOADED.set(0);
-    NUM_CONFIGURATIONS_PROCESSED.set(0);
+  protected static void setDefaultSequenceUsed(Configuration conf) {
+    String value = conf.get(DEFAULT_SEQ_USED);
+    if (null == value || !TRUE.equals(value)) {
+      conf.setBoolean(DEFAULT_SEQ_USED, true);
+    }
+  }
+
+  protected static String merge(String name, Integer sequence) {
+    return name + SEQ_DELIM + sequence;
   }
   
   public static Map<String,String> getRelevantEntries(Configuration conf) {
@@ -163,6 +265,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    *          the table to use when the tablename is null in the write call
    */
   public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) {
+    setDefaultSequenceUsed(conf);
     setOutputInfo(conf, DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable);
   }
   
@@ -202,6 +305,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
+    setDefaultSequenceUsed(conf);
     setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
   }
   
@@ -224,6 +328,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   public static void setMockInstance(Configuration conf, String instanceName) {
+    setDefaultSequenceUsed(conf);
     setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
   }
   
@@ -241,6 +346,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
+    setDefaultSequenceUsed(conf);
     setMaxMutationBufferSize(conf, DEFAULT_SEQUENCE, numberOfBytes);
   }
   
@@ -256,6 +362,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
+    setDefaultSequenceUsed(conf);
     setMaxLatency(conf, DEFAULT_SEQUENCE, numberOfMilliseconds);
   }
   
@@ -271,6 +378,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
+    setDefaultSequenceUsed(conf);
     setMaxWriteThreads(conf, DEFAULT_SEQUENCE, numberOfThreads);
   }
   
@@ -286,6 +394,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   public static void setLogLevel(Configuration conf, Level level) {
+    setDefaultSequenceUsed(conf);
     setLogLevel(conf, DEFAULT_SEQUENCE, level);
   }
   
@@ -302,6 +411,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   public static void setSimulationMode(Configuration conf) {
+    setDefaultSequenceUsed(conf);
     setSimulationMode(conf, DEFAULT_SEQUENCE);
   }
   
@@ -625,17 +735,15 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
-    final int sequencesToCheck = NUM_CONFIGURATIONS_LOADED.get();
-    final Configuration conf = job.getConfiguration();
+    Configuration conf = job.getConfiguration();
     
-    if (0 == sequencesToCheck) {
-      log.debug("No configurations loaded, checking the default");
-      checkConfiguration(conf, sequencesToCheck);
-    } else {
-      log.debug(sequencesToCheck + " configurations loaded");
-      for (int i = 1; i <= sequencesToCheck; i++) {
-        checkConfiguration(conf, i);
-      }
+    // Avoid using the above methods as they will alter the conf.
+    // We just want to inspect what is loaded
+    String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
+    
+    for (String loadedConf : loadedConfs) {
+      int sequence = Integer.parseInt(loadedConf);
+      checkConfiguration(conf, sequence);
     }
   }
   
@@ -663,18 +771,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   @Override
   public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException {
-    final int sequence;
-    if (0 == NUM_CONFIGURATIONS_LOADED.get()) {
-      sequence = DEFAULT_SEQUENCE;
-      
-      log.debug("No sequence numbers were given, falling back to the default sequence number");
-    } else {
-      sequence = NUM_CONFIGURATIONS_PROCESSED.incrementAndGet();
-      
-      if (sequence > NUM_CONFIGURATIONS_LOADED.get()) {
-        log.warn("Attempting to use AccumuloOutputFormat information from Configuration using a sequence number that wasn't assigned");
-      }
-    }
+    final int sequence = nextSequenceToProcess(attempt.getConfiguration());
+    
+    log.debug("Creating RecordWriter for sequence " + sequence);
     
     try {
       return new AccumuloRecordWriter(attempt, sequence);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fe22381/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
index bc1bd1a..5599cae 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
@@ -132,9 +132,9 @@ public class AccumuloOutputFormatTest {
   
   @Test
   public void testMultiInstanceConfiguration() throws Exception {
-    int seq1 = AccumuloOutputFormat.nextSequence(), seq2 = AccumuloOutputFormat.nextSequence();
-    
     Configuration conf = new Configuration();
+    int seq1 = AccumuloOutputFormat.nextSequence(conf), seq2 = AccumuloOutputFormat.nextSequence(conf);
+    
     AccumuloOutputFormat.setOutputInfo(conf, seq1, "root1", "1".getBytes(), false, "testtable1");
     AccumuloOutputFormat.setMockInstance(conf, seq1, "testinstance1");
     
@@ -161,7 +161,7 @@ public class AccumuloOutputFormatTest {
   @Test
   public void testConfigEntries() throws Exception {
     Configuration conf = new Configuration();
-    int seq1 = AccumuloOutputFormat.nextSequence(), seq2 = AccumuloOutputFormat.nextSequence();
+    int seq1 = AccumuloOutputFormat.nextSequence(conf), seq2 = AccumuloOutputFormat.nextSequence(conf);
     
     AccumuloOutputFormat.setOutputInfo(conf, seq1, "root1", "1".getBytes(), false, "testtable1");
     AccumuloOutputFormat.setZooKeeperInstance(conf, seq1, "instance1", "zk1");
@@ -189,6 +189,8 @@ public class AccumuloOutputFormatTest {
     expected.put(prefix + ".configured.2", "true");
     expected.put(prefix + ".instanceConfigured.2", "true");
     
+    expected.put(prefix + ".configuredSeqs", "1,2");
+    
     Map<String,String> actual = AccumuloOutputFormat.getRelevantEntries(conf);
     
     assertEquals(expected, actual);


[6/6] git commit: ACCUMULO-1854 Ended up re-implementing some of the old approach to get back to a functional state.

Posted by el...@apache.org.
ACCUMULO-1854 Ended up re-implementing some of the old approach to get
back to a functional state.

Couldn't solely use the Configuration for things as getting the same
Configuration each time getSplits is called isn't guaranteed. Since
getSplits are always called serially by one client, we can use that fact
to keep some state and not read the same data many times.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c50a2229
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c50a2229
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c50a2229

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: c50a22296d80042a86639129a02e2b9468dc3330
Parents: 0f10a6f
Author: Josh Elser <el...@apache.org>
Authored: Thu Nov 7 00:16:17 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Nov 7 00:20:00 2013 -0500

----------------------------------------------------------------------
 .../core/client/mapreduce/InputFormatBase.java  | 38 ++++++++++++++---
 .../client/mapreduce/SequencedFormatHelper.java | 45 +++++++++++++++++---
 .../mapreduce/AccumuloOutputFormatTest.java     |  6 +++
 .../client/mapreduce/InputFormatBaseTest.java   |  1 +
 4 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 5c87c13..9ce98ba 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -42,8 +42,8 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.StringTokenizer;
-
-import javax.servlet.jsp.jstl.core.Config;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -83,7 +83,6 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -151,15 +150,22 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   private static final String ITERATORS_DELIM = ",";
 
   private static final String SEQ_DELIM = ".";
-  
 
   private static final String READ_OFFLINE = PREFIX + ".read.offline";
+  
+  private static final AtomicBoolean DEFAULT_SEQUENCE_READ = new AtomicBoolean(false);
+  private static final AtomicInteger SEQUENCES_READ = new AtomicInteger(0);
 
   protected static String merge(String name, Integer sequence) {
     return name + SEQ_DELIM + sequence;
   }
 
 
+  protected static void resetInternals() {
+    DEFAULT_SEQUENCE_READ.set(false);
+    SEQUENCES_READ.set(0);
+  }
+  
   /**
    * Get a unique identifier for these configurations
    * 
@@ -169,8 +175,25 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     return SequencedFormatHelper.nextSequence(conf, PREFIX);
   }
   
-  protected static int nextSequenceToProcess(Configuration conf) {
-    return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX);
+  protected static synchronized int nextSequenceToProcess(Configuration conf) {
+    boolean isDefaultSequenceUsed = SequencedFormatHelper.isDefaultSequenceUsed(conf, PREFIX);
+    
+    if (isDefaultSequenceUsed && !DEFAULT_SEQUENCE_READ.get()) {
+      DEFAULT_SEQUENCE_READ.set(true);
+      return 0;
+    }
+    
+    Integer[] configuredSequences = SequencedFormatHelper.configuredSequences(conf, PREFIX);
+    
+    int sequenceOffset = SEQUENCES_READ.getAndAdd(1);    
+    
+    if (0 == configuredSequences.length && !isDefaultSequenceUsed) {
+      throw new NoSuchElementException();
+    } else if (sequenceOffset >= configuredSequences.length) {
+      return -1;
+    }
+    
+    return configuredSequences[sequenceOffset];
   }
   
   protected static void setDefaultSequenceUsed(Configuration conf) {
@@ -1791,6 +1814,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
+    // Disclaimer: the only reason this works as it does is because getSplits is 
+    // called serially by the JobClient before the job starts (one node, one thread).
+    // If it was called by multiple nodes, this approach would fail miserably.
     final Configuration conf = job.getConfiguration();
     final int sequence = nextSequenceToProcess(conf);
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
index ff18754..ab6dd3a 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
@@ -16,11 +16,11 @@ public class SequencedFormatHelper {
 
   private static final String COMMA = ",";
   private static final String TRUE = "true";
-  protected static final int DEFAULT_SEQUENCE = 0;
+  public static final int DEFAULT_SEQUENCE = 0;
 
-  private static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed";
-  private static final String CONFIGURED_SEQUENCES = ".configuredSeqs";
-  private static final String PROCESSED_SEQUENCES = ".processedSeqs";
+  public static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed";
+  public static final String CONFIGURED_SEQUENCES = ".configuredSeqs";
+  public static final String PROCESSED_SEQUENCES = ".processedSeqs";
 
   /**
    * Get a unique identifier for these configurations
@@ -44,15 +44,46 @@ public class SequencedFormatHelper {
       return newValue;
     }
   }
+  
+  /**
+   * Returns all configured sequences but not the default sequence
+   * @param conf
+   * @param prefix
+   * @return
+   */
+  public static Integer[] configuredSequences(Configuration conf, String prefix) {
+    ArgumentChecker.notNull(conf, prefix);
+    
+    final String configuredSequences = prefix + CONFIGURED_SEQUENCES;
+    String[] values = conf.getStrings(configuredSequences);
+    if (null == values) {
+      return new Integer[0];
+    }
+    
+    Integer[] intValues = new Integer[values.length];
+    for (int i = 0; i < values.length; i++) {
+      intValues[i] = Integer.parseInt(values[i]);
+    }
+    
+    return intValues;
+  }
 
+  protected static boolean isDefaultSequenceUsed(Configuration conf, String prefix) {
+    ArgumentChecker.notNull(conf, prefix);
+    
+    final String defaultSequenceUsedKey = prefix + DEFAULT_SEQ_USED;
+    
+    return conf.getBoolean(defaultSequenceUsedKey, false);
+  }
+  
   protected static void setDefaultSequenceUsed(Configuration conf, String prefix) {
     ArgumentChecker.notNull(conf, prefix);
 
-    final String configuredSequences = prefix + DEFAULT_SEQ_USED;
+    final String defaultSequenceUsedKey = prefix + DEFAULT_SEQ_USED;
 
-    String value = conf.get(configuredSequences);
+    String value = conf.get(defaultSequenceUsedKey);
     if (null == value || !TRUE.equals(value)) {
-      conf.setBoolean(configuredSequences, true);
+      conf.setBoolean(defaultSequenceUsedKey, true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
index 5599cae..c4c2e76 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -77,6 +78,11 @@ public class AccumuloOutputFormatTest {
     }
   }
   
+  @Before
+  public void clearInputFormatState() {
+    InputFormatBase.resetInternals();
+  }
+  
   @Test
   public void testMR() throws Exception {
     MockInstance mockInstance = new MockInstance("testmrinstance");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
index 9d167a9..f52c7a1 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
@@ -14,6 +14,7 @@ public class InputFormatBaseTest {
   
   @Before
   public void setup() {
+    InputFormatBase.resetInternals();
     conf = new Configuration();
   }
 


[3/6] git commit: ACCUMULO-1854 More unit tests and fix a bug.

Posted by el...@apache.org.
ACCUMULO-1854 More unit tests and fix a bug.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c5dc070f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c5dc070f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c5dc070f

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: c5dc070f0c10c0f9b00647934edd35e22a6b036c
Parents: 5d3c3d5
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 6 14:46:05 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Nov 6 14:46:05 2013 -0500

----------------------------------------------------------------------
 .../core/client/mapreduce/InputFormatBase.java  |  8 +-
 .../client/mapreduce/InputFormatBaseTest.java   | 82 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5dc070f/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 32240b7..7042f19 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -229,9 +229,11 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     // If we used the default sequence ID, add that into the list of configured sequences
     if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
       configuredSequences.add(DEFAULT_SEQUENCE);
-      for (String configuredSequence : configuredSequencesArray) {
-        configuredSequences.add(Integer.parseInt(configuredSequence));
-      }
+    }
+
+    // Add the rest of any sequences to our list
+    for (String configuredSequence : configuredSequencesArray) {
+      configuredSequences.add(Integer.parseInt(configuredSequence));
     }
     
     int lastParsedSeqIndex = configuredSequences.size() - 1;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5dc070f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
new file mode 100644
index 0000000..9d167a9
--- /dev/null
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
@@ -0,0 +1,82 @@
+package org.apache.accumulo.core.client.mapreduce;
+
+import java.util.NoSuchElementException;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InputFormatBaseTest {
+  
+  private Configuration conf;
+  
+  @Before
+  public void setup() {
+    conf = new Configuration();
+  }
+
+  @Test
+  public void testDefaultSequence() {
+    AccumuloInputFormat.setInputInfo(conf, "root", "password".getBytes(), "table", new Authorizations("foo"));
+    
+    Assert.assertEquals(0, InputFormatBase.nextSequenceToProcess(conf));
+    Assert.assertEquals(-1, InputFormatBase.nextSequenceToProcess(conf));
+  }
+
+  @Test
+  public void testDefaultSequenceInputAndConnection() {
+    AccumuloInputFormat.setInputInfo(conf, "root", "password".getBytes(), "table", new Authorizations("foo"));
+    AccumuloInputFormat.setZooKeeperInstance(conf, "instance1", "zk1");
+    
+    Assert.assertEquals(0, InputFormatBase.nextSequenceToProcess(conf));
+    Assert.assertEquals(-1, InputFormatBase.nextSequenceToProcess(conf));
+  }
+
+  @Test
+  public void testDefaultWithCustomSequence() {
+    AccumuloInputFormat.setInputInfo(conf, "root", "password".getBytes(), "table", new Authorizations("foo"));
+    AccumuloInputFormat.setZooKeeperInstance(conf, "instance", "zk");
+    
+    int seq = AccumuloInputFormat.nextSequence(conf);
+    
+    Assert.assertEquals(1, seq);
+    
+    AccumuloInputFormat.setInputInfo(conf, seq, "root1", "password1".getBytes(), "table1", new Authorizations("foo1"));
+    AccumuloInputFormat.setZooKeeperInstance(conf, seq, "instance1", "zk1");
+    
+    Assert.assertEquals(0, InputFormatBase.nextSequenceToProcess(conf));
+    Assert.assertEquals(1, InputFormatBase.nextSequenceToProcess(conf));
+    Assert.assertEquals(-1, InputFormatBase.nextSequenceToProcess(conf));
+  }
+  
+  @Test
+  public void testMultipleSequences() {
+    int seq = AccumuloInputFormat.nextSequence(conf);
+    
+    AccumuloInputFormat.setInputInfo(conf,  seq, "root1", "password1".getBytes(), "table1", new Authorizations("foo1"));
+    AccumuloInputFormat.setZooKeeperInstance(conf, seq, "instance1", "zk1");
+    
+    seq = AccumuloInputFormat.nextSequence(conf);
+    
+    AccumuloInputFormat.setInputInfo(conf, seq, "root2", "password2".getBytes(), "table2", new Authorizations("foo2"));
+    AccumuloInputFormat.setZooKeeperInstance(conf, seq, "instance2", "zk2");
+    
+    seq = AccumuloInputFormat.nextSequence(conf);
+    
+    AccumuloInputFormat.setInputInfo(conf, seq, "root3", "password3".getBytes(), "table3", new Authorizations("foo3"));
+    AccumuloInputFormat.setZooKeeperInstance(conf, seq, "instance3", "zk3");
+    
+    Assert.assertEquals(1, InputFormatBase.nextSequenceToProcess(conf));
+    Assert.assertEquals(2, InputFormatBase.nextSequenceToProcess(conf));
+    Assert.assertEquals(3, InputFormatBase.nextSequenceToProcess(conf));
+    Assert.assertEquals(-1, InputFormatBase.nextSequenceToProcess(conf));
+  }
+  
+  @Test(expected = NoSuchElementException.class)
+  public void testNoSequences() {
+    // When nothing was set, we should error
+    InputFormatBase.nextSequenceToProcess(conf);
+  }
+}


[5/6] git commit: ACCUMULO-1854 Lift duplicated code between AIF and AOF into a helper class

Posted by el...@apache.org.
ACCUMULO-1854 Lift duplicated code between AIF and AOF into a helper
class


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f10a6ff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f10a6ff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f10a6ff

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 0f10a6ffb0400424d30d3f49312bb500265cb276
Parents: 1fe2238
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 6 17:51:06 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Nov 6 17:51:06 2013 -0500

----------------------------------------------------------------------
 .../client/mapreduce/AccumuloOutputFormat.java  | 130 +++----------
 .../core/client/mapreduce/InputFormatBase.java  | 184 +++++--------------
 .../client/mapreduce/SequencedFormatHelper.java | 145 +++++++++++++++
 .../mapreduce/AccumuloInputFormatTest.java      |   7 +-
 4 files changed, 214 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 5e5e43d..dd9762e 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -96,14 +96,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute
   private static final int DEFAULT_NUM_WRITE_THREADS = 2;
   
-  private static final int DEFAULT_SEQUENCE = 0;
   private static final String SEQ_DELIM = ".";
   
-  private static final String COMMA = ",";
   private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredSeqs";
-  private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
-  private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
-  private static final String TRUE = "true";
 
 
 
@@ -113,17 +108,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @return A unique number to provide to future AccumuloInputFormat calls
    */
   public static synchronized int nextSequence(Configuration conf) {
-    String value = conf.get(CONFIGURED_SEQUENCES);
-    if (null == value) {
-      conf.set(CONFIGURED_SEQUENCES, "1");
-      return 1;
-    } else {
-      String[] splitValues = StringUtils.split(value, COMMA);
-      int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
-      
-      conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
-      return newValue;
-    }
+    return SequencedFormatHelper.nextSequence(conf, PREFIX);
   }
   
   /**
@@ -133,84 +118,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    * @throws NoSuchElementException
    */
   protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException {
-    String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
-    
-    // We haven't set anything, so we need to find the first to return
-    if (null == processedConfs || 0 == processedConfs.length) {
-      // Check to see if the default sequence was used
-      boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
-      
-      // If so, set that we're processing it and return the value of the default
-      if (defaultSeqUsed) {
-        conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
-        return DEFAULT_SEQUENCE;
-      }
-      
-      String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
-      
-      // There was *nothing* loaded, fail.
-      if (null == loadedConfs || 0 == loadedConfs.length) {
-        throw new NoSuchElementException("Sequence was requested to process but none exist to return");
-      }
-      
-      // We have loaded configuration(s), use the first
-      int firstLoaded = Integer.parseInt(loadedConfs[0]);
-      conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
-      
-      return firstLoaded;
-    }
-    
-    // We've previously parsed some confs, need to find the next one to load
-    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
-    String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
-    
-    // We only have the default sequence, no specifics.
-    // Getting here, we already know that we processed that default
-    if (null == configuredSequencesArray) {
-      return -1;
-    }
-
-    List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1);
-    
-    // If we used the default sequence ID, add that into the list of configured sequences
-    if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
-      configuredSequences.add(DEFAULT_SEQUENCE);
-    }
-
-    // Add the rest of any sequences to our list
-    for (String configuredSequence : configuredSequencesArray) {
-      configuredSequences.add(Integer.parseInt(configuredSequence));
-    }
-    
-    int lastParsedSeqIndex = configuredSequences.size() - 1;
-    
-    // Find the next sequence number after the one we last processed
-    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
-      int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
-      
-      if (lastLoadedValue == lastProcessedSeq) {
-        break;
-      }
-    }
-    
-    // We either had no sequences to match or we matched the last configured sequence
-    // Both of which are equivalent to no (more) sequences to process
-    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) {
-      return -1;
-    }
-    
-    // Get the value of the sequence at that offset
-    int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
-    conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
-    
-    return nextSequence;
+    return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX);
   }
   
   protected static void setDefaultSequenceUsed(Configuration conf) {
-    String value = conf.get(DEFAULT_SEQ_USED);
-    if (null == value || !TRUE.equals(value)) {
-      conf.setBoolean(DEFAULT_SEQ_USED, true);
-    }
+    SequencedFormatHelper.setDefaultSequenceUsed(conf, PREFIX);
   }
 
   protected static String merge(String name, Integer sequence) {
@@ -266,7 +178,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
    */
   public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) {
     setDefaultSequenceUsed(conf);
-    setOutputInfo(conf, DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable);
+    setOutputInfo(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable);
   }
   
   /**
@@ -306,7 +218,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
     setDefaultSequenceUsed(conf);
-    setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
+    setZooKeeperInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName, zooKeepers);
   }
   
   public static void setZooKeeperInstance(Configuration conf, int sequence, String instanceName, String zooKeepers) {
@@ -329,7 +241,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   public static void setMockInstance(Configuration conf, String instanceName) {
     setDefaultSequenceUsed(conf);
-    setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
+    setMockInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName);
   }
   
   public static void setMockInstance(Configuration conf, int sequence, String instanceName) {
@@ -347,7 +259,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
     setDefaultSequenceUsed(conf);
-    setMaxMutationBufferSize(conf, DEFAULT_SEQUENCE, numberOfBytes);
+    setMaxMutationBufferSize(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfBytes);
   }
   
   public static void setMaxMutationBufferSize(Configuration conf, int sequence, long numberOfBytes) {
@@ -363,7 +275,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
     setDefaultSequenceUsed(conf);
-    setMaxLatency(conf, DEFAULT_SEQUENCE, numberOfMilliseconds);
+    setMaxLatency(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfMilliseconds);
   }
   
   public static void setMaxLatency(Configuration conf, int sequence, int numberOfMilliseconds) {
@@ -379,7 +291,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
     setDefaultSequenceUsed(conf);
-    setMaxWriteThreads(conf, DEFAULT_SEQUENCE, numberOfThreads);
+    setMaxWriteThreads(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfThreads);
   }
   
   public static void setMaxWriteThreads(Configuration conf, int sequence, int numberOfThreads) {
@@ -395,7 +307,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   public static void setLogLevel(Configuration conf, Level level) {
     setDefaultSequenceUsed(conf);
-    setLogLevel(conf, DEFAULT_SEQUENCE, level);
+    setLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, level);
   }
   
   public static void setLogLevel(Configuration conf, int sequence, Level level) {
@@ -412,7 +324,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   
   public static void setSimulationMode(Configuration conf) {
     setDefaultSequenceUsed(conf);
-    setSimulationMode(conf, DEFAULT_SEQUENCE);
+    setSimulationMode(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   public static void setSimulationMode(Configuration conf, int sequence) {
@@ -427,7 +339,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static String getUsername(Configuration conf) {
-    return getUsername(conf, DEFAULT_SEQUENCE);
+    return getUsername(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static String getUsername(Configuration conf, int sequence) {
@@ -444,7 +356,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
     return getPassword(job.getConfiguration());
   }
   protected static byte[] getPassword(Configuration conf) {
-    return getPassword(conf, DEFAULT_SEQUENCE);
+    return getPassword(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   /**
@@ -463,7 +375,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static boolean canCreateTables(Configuration conf) {
-    return canCreateTables(conf, DEFAULT_SEQUENCE);
+    return canCreateTables(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static boolean canCreateTables(Configuration conf, int sequence) {
@@ -478,7 +390,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static String getDefaultTableName(Configuration conf) {
-    return getDefaultTableName(conf, DEFAULT_SEQUENCE);
+    return getDefaultTableName(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static String getDefaultTableName(Configuration conf, int sequence) {
@@ -493,7 +405,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static Instance getInstance(Configuration conf) {
-    return getInstance(conf, DEFAULT_SEQUENCE);
+    return getInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static Instance getInstance(Configuration conf, int sequence) {
@@ -510,7 +422,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static long getMaxMutationBufferSize(Configuration conf) {
-    return getMaxMutationBufferSize(conf, DEFAULT_SEQUENCE);
+    return getMaxMutationBufferSize(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static long getMaxMutationBufferSize(Configuration conf, int sequence) {
@@ -525,7 +437,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static int getMaxLatency(Configuration conf) {
-    return getMaxLatency(conf, DEFAULT_SEQUENCE);
+    return getMaxLatency(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static int getMaxLatency(Configuration conf, int sequence) {
@@ -540,7 +452,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static int getMaxWriteThreads(Configuration conf) {
-    return getMaxWriteThreads(conf, DEFAULT_SEQUENCE);
+    return getMaxWriteThreads(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static int getMaxWriteThreads(Configuration conf, int sequence) {
@@ -555,7 +467,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static Level getLogLevel(Configuration conf) {
-    return getLogLevel(conf, DEFAULT_SEQUENCE);
+    return getLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static Level getLogLevel(Configuration conf, int sequence) {
@@ -573,7 +485,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
   }
   
   protected static boolean getSimulationMode(Configuration conf) {
-    return getSimulationMode(conf, DEFAULT_SEQUENCE);
+    return getSimulationMode(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static boolean getSimulationMode(Configuration conf, int sequence) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 7042f19..5c87c13 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -151,124 +151,30 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   private static final String ITERATORS_DELIM = ",";
 
   private static final String SEQ_DELIM = ".";
-  protected static final int DEFAULT_SEQUENCE = 0;
   
-  private static final String COMMA = ",";
-  private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredsSeqs";
-  private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
-  private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
-  private static final String TRUE = "true";
 
   private static final String READ_OFFLINE = PREFIX + ".read.offline";
 
+  protected static String merge(String name, Integer sequence) {
+    return name + SEQ_DELIM + sequence;
+  }
+
+
   /**
    * Get a unique identifier for these configurations
    * 
    * @return A unique number to provide to future AccumuloInputFormat calls
    */
   public static synchronized int nextSequence(Configuration conf) {
-    String value = conf.get(CONFIGURED_SEQUENCES);
-    if (null == value) {
-      conf.set(CONFIGURED_SEQUENCES, "1");
-      return 1;
-    } else {
-      String[] splitValues = StringUtils.split(value, COMMA);
-      int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
-      
-      conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
-      return newValue;
-    }
+    return SequencedFormatHelper.nextSequence(conf, PREFIX);
   }
   
-  /**
-   * Using the provided Configuration, return the next sequence number to process.
-   * @param conf A Configuration object used to store AccumuloInputFormat information into
-   * @return The next sequence number to process, -1 when finished.
-   * @throws NoSuchElementException
-   */
-  protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException {
-    String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
-    
-    // We haven't set anything, so we need to find the first to return
-    if (null == processedConfs || 0 == processedConfs.length) {
-      // Check to see if the default sequence was used
-      boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
-      
-      // If so, set that we're processing it and return the value of the default
-      if (defaultSeqUsed) {
-        conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
-        return DEFAULT_SEQUENCE;
-      }
-      
-      String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
-      
-      // There was *nothing* loaded, fail.
-      if (null == loadedConfs || 0 == loadedConfs.length) {
-        throw new NoSuchElementException("Sequence was requested to process but none exist to return");
-      }
-      
-      // We have loaded configuration(s), use the first
-      int firstLoaded = Integer.parseInt(loadedConfs[0]);
-      conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
-      
-      return firstLoaded;
-    }
-    
-    // We've previously parsed some confs, need to find the next one to load
-    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
-    String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
-    
-    // We only have the default sequence, no specifics.
-    // Getting here, we already know that we processed that default
-    if (null == configuredSequencesArray) {
-      return -1;
-    }
-
-    List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1);
-    
-    // If we used the default sequence ID, add that into the list of configured sequences
-    if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
-      configuredSequences.add(DEFAULT_SEQUENCE);
-    }
-
-    // Add the rest of any sequences to our list
-    for (String configuredSequence : configuredSequencesArray) {
-      configuredSequences.add(Integer.parseInt(configuredSequence));
-    }
-    
-    int lastParsedSeqIndex = configuredSequences.size() - 1;
-    
-    // Find the next sequence number after the one we last processed
-    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
-      int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
-      
-      if (lastLoadedValue == lastProcessedSeq) {
-        break;
-      }
-    }
-    
-    // We either had no sequences to match or we matched the last configured sequence
-    // Both of which are equivalent to no (more) sequences to process
-    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) {
-      return -1;
-    }
-    
-    // Get the value of the sequence at that offset
-    int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
-    conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
-    
-    return nextSequence;
+  protected static int nextSequenceToProcess(Configuration conf) {
+    return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX);
   }
   
   protected static void setDefaultSequenceUsed(Configuration conf) {
-    String value = conf.get(DEFAULT_SEQ_USED);
-    if (null == value || !TRUE.equals(value)) {
-      conf.setBoolean(DEFAULT_SEQ_USED, true);
-    }
-  }
-
-  protected static String merge(String name, Integer sequence) {
-    return name + SEQ_DELIM + sequence;
+    SequencedFormatHelper.setDefaultSequenceUsed(conf, PREFIX);
   }
   
   public static Map<String,String> getRelevantEntries(Configuration conf) {
@@ -302,7 +208,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setIsolated(Configuration conf, boolean enable) {
     setDefaultSequenceUsed(conf);
-    setIsolated(conf, DEFAULT_SEQUENCE, enable);
+    setIsolated(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, enable);
   }
 
   /**
@@ -334,7 +240,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setLocalIterators(Configuration conf, boolean enable) {
     setDefaultSequenceUsed(conf);
-    setLocalIterators(conf, DEFAULT_SEQUENCE, enable);
+    setLocalIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, enable);
   }
 
   /**
@@ -372,7 +278,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
     setDefaultSequenceUsed(conf);
-    setInputInfo(conf, DEFAULT_SEQUENCE, user, passwd, table, auths);
+    setInputInfo(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, user, passwd, table, auths);
   }
 
   /**
@@ -422,7 +328,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
     setDefaultSequenceUsed(conf);
-    setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
+    setZooKeeperInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName, zooKeepers);
   }
 
   /**
@@ -463,7 +369,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setMockInstance(Configuration conf, String instanceName) {
     setDefaultSequenceUsed(conf);
-    setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
+    setMockInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName);
   }
 
   /**
@@ -497,7 +403,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setRanges(Configuration conf, Collection<Range> ranges) {
     setDefaultSequenceUsed(conf);
-    setRanges(conf, DEFAULT_SEQUENCE, ranges);
+    setRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, ranges);
   }
 
   /**
@@ -539,7 +445,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void disableAutoAdjustRanges(Configuration conf) {
     setDefaultSequenceUsed(conf);
-    disableAutoAdjustRanges(conf, DEFAULT_SEQUENCE);
+    disableAutoAdjustRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -569,7 +475,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setRegex(JobContext job, RegexType type, String regex) {
     setDefaultSequenceUsed(job.getConfiguration());
-    setRegex(job, DEFAULT_SEQUENCE, type, regex);
+    setRegex(job, SequencedFormatHelper.DEFAULT_SEQUENCE, type, regex);
   }
 
   /**
@@ -626,7 +532,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
     setDefaultSequenceUsed(conf);
-    setMaxVersions(conf, DEFAULT_SEQUENCE, maxVersions);
+    setMaxVersions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, maxVersions);
   }
 
   /**
@@ -675,7 +581,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
 
   public static void setScanOffline(Configuration conf, boolean scanOff) {
     setDefaultSequenceUsed(conf);
-    setScanOffline(conf, DEFAULT_SEQUENCE, scanOff);
+    setScanOffline(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, scanOff);
   }
 
   /**
@@ -727,7 +633,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
     setDefaultSequenceUsed(conf);
-    fetchColumns(conf, DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs);
+    fetchColumns(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs);
   }
 
   /**
@@ -771,7 +677,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setLogLevel(Configuration conf, Level level) {
     setDefaultSequenceUsed(conf);
-    setLogLevel(conf, DEFAULT_SEQUENCE, level);
+    setLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, level);
   }
 
   /**
@@ -806,7 +712,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void addIterator(Configuration conf, IteratorSetting cfg) {
     setDefaultSequenceUsed(conf);
-    addIterator(conf, DEFAULT_SEQUENCE, cfg);
+    addIterator(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, cfg);
   }
 
   /**
@@ -866,7 +772,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setIterator(JobContext job, int priority, String iteratorClass, String iteratorName) {
     setDefaultSequenceUsed(job.getConfiguration());
-    setIterator(job, DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName);
+    setIterator(job, SequencedFormatHelper.DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName);
   }
 
   /**
@@ -916,7 +822,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   public static void setIteratorOption(JobContext job, String iteratorName, String key, String value) {
     setDefaultSequenceUsed(job.getConfiguration());
-    setIteratorOption(job, DEFAULT_SEQUENCE, iteratorName, key, value);
+    setIteratorOption(job, SequencedFormatHelper.DEFAULT_SEQUENCE, iteratorName, key, value);
   }
 
   /**
@@ -967,7 +873,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setIsolated(Configuration, boolean)
    */
   protected static boolean isIsolated(Configuration conf) {
-    return isIsolated(conf, DEFAULT_SEQUENCE);
+    return isIsolated(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -998,7 +904,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setLocalIterators(Configuration, boolean)
    */
   protected static boolean usesLocalIterators(Configuration conf) {
-    return usesLocalIterators(conf, DEFAULT_SEQUENCE);
+    return usesLocalIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1029,7 +935,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
   protected static String getUsername(Configuration conf) {
-    return getUsername(conf, DEFAULT_SEQUENCE);
+    return getUsername(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1064,7 +970,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
   protected static byte[] getPassword(Configuration conf) {
-    return getPassword(conf, DEFAULT_SEQUENCE);
+    return getPassword(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1096,7 +1002,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
   protected static String getTablename(Configuration conf) {
-    return getTablename(conf, DEFAULT_SEQUENCE);
+    return getTablename(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1127,7 +1033,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
   protected static Authorizations getAuthorizations(Configuration conf) {
-    return getAuthorizations(conf, DEFAULT_SEQUENCE);
+    return getAuthorizations(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1160,7 +1066,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setMockInstance(Configuration, String)
    */
   protected static Instance getInstance(Configuration conf) {
-    return getInstance(conf, DEFAULT_SEQUENCE);
+    return getInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1195,7 +1101,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *           if the table name set on the configuration doesn't exist
    */
   protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
-    return getTabletLocator(conf, DEFAULT_SEQUENCE);
+    return getTabletLocator(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1236,7 +1142,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setRanges(Configuration, Collection)
    */
   protected static List<Range> getRanges(Configuration conf) throws IOException {
-    return getRanges(conf, DEFAULT_SEQUENCE);
+    return getRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1265,7 +1171,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setRegex(JobContext, RegexType, String)
    */
   protected static String getRegex(JobContext job, RegexType type) {
-    return getRegex(job, DEFAULT_SEQUENCE, type);
+    return getRegex(job, SequencedFormatHelper.DEFAULT_SEQUENCE, type);
   }
 
   /**
@@ -1317,7 +1223,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #fetchColumns(Configuration, Collection)
    */
   protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
-    return getFetchedColumns(conf, DEFAULT_SEQUENCE);
+    return getFetchedColumns(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1355,7 +1261,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #disableAutoAdjustRanges(Configuration)
    */
   protected static boolean getAutoAdjustRanges(Configuration conf) {
-    return getAutoAdjustRanges(conf, DEFAULT_SEQUENCE);
+    return getAutoAdjustRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1386,7 +1292,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setLogLevel(Configuration, Level)
    */
   protected static Level getLogLevel(Configuration conf) {
-    return getLogLevel(conf, DEFAULT_SEQUENCE);
+    return getLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1421,7 +1327,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *           if the configuration is improperly configured
    */
   protected static void validateOptions(Configuration conf) throws IOException {
-    validateOptions(conf, DEFAULT_SEQUENCE);
+    validateOptions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's
@@ -1479,7 +1385,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
   protected static int getMaxVersions(Configuration conf) {
     setDefaultSequenceUsed(conf);
-    return getMaxVersions(conf, DEFAULT_SEQUENCE);
+    return getMaxVersions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1495,7 +1401,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   }
 
   protected static boolean isOfflineScan(Configuration conf) {
-    return isOfflineScan(conf, DEFAULT_SEQUENCE);
+    return isOfflineScan(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   protected static boolean isOfflineScan(Configuration conf, int sequence) {
@@ -1520,7 +1426,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #addIterator(Configuration, IteratorSetting)
    */
   protected static List<AccumuloIterator> getIterators(Configuration conf) {
-    return getIterators(conf, DEFAULT_SEQUENCE);
+    return getIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1565,7 +1471,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #addIterator(Configuration, IteratorSetting)
    */
   protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
-    return getIteratorOptions(conf, DEFAULT_SEQUENCE);
+    return getIteratorOptions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1635,7 +1541,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
      * @deprecated Use {@link #setupIterators(Configuration,Scanner)} instead
      */
     protected void setupIterators(TaskAttemptContext attempt, Scanner scanner) throws AccumuloException {
-      setupIterators(attempt.getConfiguration(), DEFAULT_SEQUENCE, scanner);
+      setupIterators(attempt.getConfiguration(), SequencedFormatHelper.DEFAULT_SEQUENCE, scanner);
     }
 
     /**
@@ -1667,7 +1573,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
      * @deprecated Use {@link #setupMaxVersions(Configuration,Scanner)} instead
      */
     protected void setupMaxVersions(TaskAttemptContext attempt, Scanner scanner) {
-      setupMaxVersions(attempt.getConfiguration(), DEFAULT_SEQUENCE, scanner);
+      setupMaxVersions(attempt.getConfiguration(), SequencedFormatHelper.DEFAULT_SEQUENCE, scanner);
     }
 
     /**
@@ -2046,7 +1952,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     }
 
     RangeInputSplit(String table, Range range, String[] locations) {
-      this(table, range, locations, DEFAULT_SEQUENCE);
+      this(table, range, locations, SequencedFormatHelper.DEFAULT_SEQUENCE);
     }
 
     RangeInputSplit(String table, Range range, String[] locations, int sequence) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
new file mode 100644
index 0000000..ff18754
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
@@ -0,0 +1,145 @@
+package org.apache.accumulo.core.client.mapreduce;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Convenience class with methods useful to dealing with multiple configurations of AccumuloInputFormat and/or AccumuloOutputFormat in the same Configuration
+ * object.
+ */
+public class SequencedFormatHelper {
+
+  private static final String COMMA = ",";
+  private static final String TRUE = "true";
+  protected static final int DEFAULT_SEQUENCE = 0;
+
+  private static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed";
+  private static final String CONFIGURED_SEQUENCES = ".configuredSeqs";
+  private static final String PROCESSED_SEQUENCES = ".processedSeqs";
+
+  /**
+   * Get a unique identifier for these configurations
+   * 
+   * @return A unique number to provide to future AccumuloInputFormat calls
+   */
+  public static synchronized int nextSequence(Configuration conf, String prefix) {
+    ArgumentChecker.notNull(conf, prefix);
+
+    final String configuredSequences = prefix + CONFIGURED_SEQUENCES;
+
+    String value = conf.get(configuredSequences);
+    if (null == value) {
+      conf.set(configuredSequences, "1");
+      return 1;
+    } else {
+      String[] splitValues = StringUtils.split(value, COMMA);
+      int newValue = Integer.parseInt(splitValues[splitValues.length - 1]) + 1;
+
+      conf.set(configuredSequences, value + COMMA + newValue);
+      return newValue;
+    }
+  }
+
+  protected static void setDefaultSequenceUsed(Configuration conf, String prefix) {
+    ArgumentChecker.notNull(conf, prefix);
+
+    final String configuredSequences = prefix + DEFAULT_SEQ_USED;
+
+    String value = conf.get(configuredSequences);
+    if (null == value || !TRUE.equals(value)) {
+      conf.setBoolean(configuredSequences, true);
+    }
+  }
+
+  /**
+   * Using the provided Configuration, return the next sequence number to process.
+   * 
+   * @param conf
+   *          A Configuration object used to store AccumuloInputFormat information into
+   * @return The next sequence number to process, -1 when finished.
+   * @throws NoSuchElementException
+   */
+  protected static synchronized int nextSequenceToProcess(Configuration conf, String prefix) throws NoSuchElementException {
+    ArgumentChecker.notNull(prefix);
+
+    final String processedSequences = prefix + PROCESSED_SEQUENCES, defaultSequenceUsed = prefix + DEFAULT_SEQ_USED, configuredSequences = prefix
+        + CONFIGURED_SEQUENCES;
+
+    String[] processedConfs = conf.getStrings(processedSequences);
+
+    // We haven't set anything, so we need to find the first to return
+    if (null == processedConfs || 0 == processedConfs.length) {
+      // Check to see if the default sequence was used
+      boolean defaultSeqUsed = conf.getBoolean(defaultSequenceUsed, false);
+
+      // If so, set that we're processing it and return the value of the default
+      if (defaultSeqUsed) {
+        conf.set(processedSequences, Integer.toString(DEFAULT_SEQUENCE));
+        return DEFAULT_SEQUENCE;
+      }
+
+      String[] loadedConfs = conf.getStrings(configuredSequences);
+
+      // There was *nothing* loaded, fail.
+      if (null == loadedConfs || 0 == loadedConfs.length) {
+        throw new NoSuchElementException("Sequence was requested to process but none exist to return");
+      }
+
+      // We have loaded configuration(s), use the first
+      int firstLoaded = Integer.parseInt(loadedConfs[0]);
+      conf.setInt(processedSequences, firstLoaded);
+
+      return firstLoaded;
+    }
+
+    // We've previously parsed some confs, need to find the next one to load
+    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
+    String[] configuredSequencesArray = conf.getStrings(configuredSequences);
+
+    // We only have the default sequence, no specifics.
+    // Getting here, we already know that we processed that default
+    if (null == configuredSequencesArray) {
+      return -1;
+    }
+
+    List<Integer> configuredSequencesList = new ArrayList<Integer>(configuredSequencesArray.length + 1);
+
+    // If we used the default sequence ID, add that into the list of configured sequences
+    if (conf.getBoolean(defaultSequenceUsed, false)) {
+      configuredSequencesList.add(DEFAULT_SEQUENCE);
+    }
+
+    // Add the rest of any sequences to our list
+    for (String configuredSequence : configuredSequencesArray) {
+      configuredSequencesList.add(Integer.parseInt(configuredSequence));
+    }
+
+    int lastParsedSeqIndex = configuredSequencesList.size() - 1;
+
+    // Find the next sequence number after the one we last processed
+    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
+      int lastLoadedValue = configuredSequencesList.get(lastParsedSeqIndex);
+
+      if (lastLoadedValue == lastProcessedSeq) {
+        break;
+      }
+    }
+
+    // We either had no sequences to match or we matched the last configured sequence
+    // Both of which are equivalent to no (more) sequences to process
+    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequencesList.size()) {
+      return -1;
+    }
+
+    // Get the value of the sequence at that offset
+    int nextSequence = configuredSequencesList.get(lastParsedSeqIndex + 1);
+    conf.set(processedSequences, conf.get(processedSequences) + COMMA + nextSequence);
+
+    return nextSequence;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 98f3e7a..a6f5c48 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloInputFormatTest {
@@ -101,7 +100,7 @@ public class AccumuloInputFormatTest {
     
     AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow");
     Configuration conf = job.getConfiguration();
-    String iterators = conf.get("AccumuloInputFormat.iterators." + InputFormatBase.DEFAULT_SEQUENCE);
+    String iterators = conf.get("AccumuloInputFormat.iterators." + SequencedFormatHelper.DEFAULT_SEQUENCE);
     assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators);
   }
   
@@ -164,7 +163,7 @@ public class AccumuloInputFormatTest {
     
     final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString();
     
-    assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options." + InputFormatBase.DEFAULT_SEQUENCE));
+    assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options." + SequencedFormatHelper.DEFAULT_SEQUENCE));
     
     List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
     assertEquals(1, opts.size());
@@ -228,7 +227,7 @@ public class AccumuloInputFormatTest {
     AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue");
     
     Configuration conf = job.getConfiguration();
-    String options = conf.get("AccumuloInputFormat.iterators.options." + InputFormatBase.DEFAULT_SEQUENCE);
+    String options = conf.get("AccumuloInputFormat.iterators.options." + SequencedFormatHelper.DEFAULT_SEQUENCE);
     assertEquals(new String("someIterator:aKey:aValue"), options);
   }
   


[2/6] git commit: ACCUMULO-1854 Add AIF test to retrieve relevant configuration entries. Remove the internal AtomicIntegers in favor of state management through the Configuration object.

Posted by el...@apache.org.
ACCUMULO-1854 Add AIF test to retrieve relevant configuration entries.
Remove the internal AtomicIntegers in favor of state management through
the Configuration object.

Using static concurrent structures in the InputFormat didn't work well
when making multiple calls to the class and altering the state. By
holding state on the Configuration for the input format, we can
alleviate some of this and do better duplication checking.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5d3c3d5f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5d3c3d5f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5d3c3d5f

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 5d3c3d5fdd2691f87a5319672c1d9f7dd29f457f
Parents: 4111d1e
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 6 14:09:06 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Nov 6 14:09:06 2013 -0500

----------------------------------------------------------------------
 .../core/client/mapreduce/InputFormatBase.java  | 167 ++++++++++++++++---
 .../mapreduce/AccumuloInputFormatTest.java      |   8 +-
 .../mapreduce/AccumuloOutputFormatTest.java     |  47 +++++-
 3 files changed, 181 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d3c3d5f/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 32b7af5..32240b7 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -32,6 +32,7 @@ import java.net.URLEncoder;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -41,7 +42,8 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.StringTokenizer;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.jsp.jstl.core.Config;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -81,6 +83,7 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -149,32 +152,135 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
 
   private static final String SEQ_DELIM = ".";
   protected static final int DEFAULT_SEQUENCE = 0;
+  
+  private static final String COMMA = ",";
+  private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredsSeqs";
+  private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
+  private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
+  private static final String TRUE = "true";
 
   private static final String READ_OFFLINE = PREFIX + ".read.offline";
 
-  private static final AtomicInteger NUM_CONFIGURATIONS_LOADED = new AtomicInteger(0);
-  private static final AtomicInteger NUM_CONFIGURATIONS_PROCESSED = new AtomicInteger(0);
-
   /**
    * Get a unique identifier for these configurations
    * 
    * @return A unique number to provide to future AccumuloInputFormat calls
    */
-  public static int nextSequence() {
-    return NUM_CONFIGURATIONS_LOADED.incrementAndGet();
+  public static synchronized int nextSequence(Configuration conf) {
+    String value = conf.get(CONFIGURED_SEQUENCES);
+    if (null == value) {
+      conf.set(CONFIGURED_SEQUENCES, "1");
+      return 1;
+    } else {
+      String[] splitValues = StringUtils.split(value, COMMA);
+      int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
+      
+      conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
+      return newValue;
+    }
+  }
+  
+  /**
+   * Using the provided Configuration, return the next sequence number to process.
+   * @param conf A Configuration object used to store AccumuloInputFormat information into
+   * @return The next sequence number to process, -1 when finished.
+   * @throws NoSuchElementException
+   */
+  protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException {
+    String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
+    
+    // We haven't set anything, so we need to find the first to return
+    if (null == processedConfs || 0 == processedConfs.length) {
+      // Check to see if the default sequence was used
+      boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
+      
+      // If so, set that we're processing it and return the value of the default
+      if (defaultSeqUsed) {
+        conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
+        return DEFAULT_SEQUENCE;
+      }
+      
+      String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
+      
+      // There was *nothing* loaded, fail.
+      if (null == loadedConfs || 0 == loadedConfs.length) {
+        throw new NoSuchElementException("Sequence was requested to process but none exist to return");
+      }
+      
+      // We have loaded configuration(s), use the first
+      int firstLoaded = Integer.parseInt(loadedConfs[0]);
+      conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
+      
+      return firstLoaded;
+    }
+    
+    // We've previously parsed some confs, need to find the next one to load
+    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
+    String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
+    
+    // We only have the default sequence, no specifics.
+    // Getting here, we already know that we processed that default
+    if (null == configuredSequencesArray) {
+      return -1;
+    }
+
+    List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1);
+    
+    // If we used the default sequence ID, add that into the list of configured sequences
+    if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
+      configuredSequences.add(DEFAULT_SEQUENCE);
+      for (String configuredSequence : configuredSequencesArray) {
+        configuredSequences.add(Integer.parseInt(configuredSequence));
+      }
+    }
+    
+    int lastParsedSeqIndex = configuredSequences.size() - 1;
+    
+    // Find the next sequence number after the one we last processed
+    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
+      int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
+      
+      if (lastLoadedValue == lastProcessedSeq) {
+        break;
+      }
+    }
+    
+    // We either had no sequences to match or we matched the last configured sequence
+    // Both of which are equivalent to no (more) sequences to process
+    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) {
+      return -1;
+    }
+    
+    // Get the value of the sequence at that offset
+    int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
+    conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
+    
+    return nextSequence;
+  }
+  
+  protected static void setDefaultSequenceUsed(Configuration conf) {
+    String value = conf.get(DEFAULT_SEQ_USED);
+    if (null == value || !TRUE.equals(value)) {
+      conf.setBoolean(DEFAULT_SEQ_USED, true);
+    }
   }
 
   protected static String merge(String name, Integer sequence) {
     return name + SEQ_DELIM + sequence;
   }
   
-  /**
-   * For testing purposes, allows internal counters to be reset. Clients should not have
-   * to call this directly.
-   */
-  public static void resetCounters() {
-    NUM_CONFIGURATIONS_LOADED.set(0);
-    NUM_CONFIGURATIONS_PROCESSED.set(0);
+  public static Map<String,String> getRelevantEntries(Configuration conf) {
+    ArgumentChecker.notNull(conf);
+    
+    HashMap<String,String> confEntries = new HashMap<String,String>();
+    for (Entry<String,String> entry : conf) {
+      final String key = entry.getKey();
+      if (0 == key.indexOf(PREFIX)) {
+        confEntries.put(key, entry.getValue());
+      }
+    }
+    
+    return confEntries;
   }
 
   /**
@@ -193,6 +299,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          if true, enable usage of the IsolatedScanner. Otherwise, disable.
    */
   public static void setIsolated(Configuration conf, boolean enable) {
+    setDefaultSequenceUsed(conf);
     setIsolated(conf, DEFAULT_SEQUENCE, enable);
   }
 
@@ -224,6 +331,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          if true, enable usage of the ClientSideInteratorScanner. Otherwise, disable.
    */
   public static void setLocalIterators(Configuration conf, boolean enable) {
+    setDefaultSequenceUsed(conf);
     setLocalIterators(conf, DEFAULT_SEQUENCE, enable);
   }
 
@@ -261,6 +369,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the authorizations used to restrict data read
    */
   public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
+    setDefaultSequenceUsed(conf);
     setInputInfo(conf, DEFAULT_SEQUENCE, user, passwd, table, auths);
   }
 
@@ -310,6 +419,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          a comma-separated list of zookeeper servers
    */
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
+    setDefaultSequenceUsed(conf);
     setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
   }
 
@@ -350,6 +460,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the accumulo instance name
    */
   public static void setMockInstance(Configuration conf, String instanceName) {
+    setDefaultSequenceUsed(conf);
     setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
   }
 
@@ -383,6 +494,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the ranges that will be mapped over
    */
   public static void setRanges(Configuration conf, Collection<Range> ranges) {
+    setDefaultSequenceUsed(conf);
     setRanges(conf, DEFAULT_SEQUENCE, ranges);
   }
 
@@ -424,6 +536,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the Hadoop configuration object
    */
   public static void disableAutoAdjustRanges(Configuration conf) {
+    setDefaultSequenceUsed(conf);
     disableAutoAdjustRanges(conf, DEFAULT_SEQUENCE);
   }
 
@@ -453,6 +566,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param regex
    */
   public static void setRegex(JobContext job, RegexType type, String regex) {
+    setDefaultSequenceUsed(job.getConfiguration());
     setRegex(job, DEFAULT_SEQUENCE, type, regex);
   }
 
@@ -509,6 +623,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *           if maxVersions is < 1
    */
   public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
+    setDefaultSequenceUsed(conf);
     setMaxVersions(conf, DEFAULT_SEQUENCE, maxVersions);
   }
 
@@ -557,6 +672,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
 
   public static void setScanOffline(Configuration conf, boolean scanOff) {
+    setDefaultSequenceUsed(conf);
     setScanOffline(conf, DEFAULT_SEQUENCE, scanOff);
   }
 
@@ -608,6 +724,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
    */
   public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
+    setDefaultSequenceUsed(conf);
     fetchColumns(conf, DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs);
   }
 
@@ -651,6 +768,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the logging level
    */
   public static void setLogLevel(Configuration conf, Level level) {
+    setDefaultSequenceUsed(conf);
     setLogLevel(conf, DEFAULT_SEQUENCE, level);
   }
 
@@ -685,6 +803,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          The configuration of the iterator
    */
   public static void addIterator(Configuration conf, IteratorSetting cfg) {
+    setDefaultSequenceUsed(conf);
     addIterator(conf, DEFAULT_SEQUENCE, cfg);
   }
 
@@ -744,6 +863,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @deprecated since 1.4, see {@link #addIterator(Configuration, IteratorSetting)}
    */
   public static void setIterator(JobContext job, int priority, String iteratorClass, String iteratorName) {
+    setDefaultSequenceUsed(job.getConfiguration());
     setIterator(job, DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName);
   }
 
@@ -793,6 +913,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @deprecated since 1.4, see {@link #addIterator(Configuration, IteratorSetting)}
    */
   public static void setIteratorOption(JobContext job, String iteratorName, String key, String value) {
+    setDefaultSequenceUsed(job.getConfiguration());
     setIteratorOption(job, DEFAULT_SEQUENCE, iteratorName, key, value);
   }
 
@@ -1355,6 +1476,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setMaxVersions(Configuration, int)
    */
   protected static int getMaxVersions(Configuration conf) {
+    setDefaultSequenceUsed(conf);
     return getMaxVersions(conf, DEFAULT_SEQUENCE);
   }
 
@@ -1762,20 +1884,13 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
     final Configuration conf = job.getConfiguration();
-    final int sequence;
-    if (0 == NUM_CONFIGURATIONS_LOADED.get()) {
-      sequence = DEFAULT_SEQUENCE;
-      
-      log.debug("No sequence numbers were given, falling back to the default sequence number");
-    } else {
-      sequence = NUM_CONFIGURATIONS_PROCESSED.incrementAndGet();
-      
-      if (sequence > NUM_CONFIGURATIONS_LOADED.get()) {
-        log.warn("Attempting to load AccumuloInputFormat information from Configuration using a sequence number that wasn't assigned");
-      }
+    final int sequence = nextSequenceToProcess(conf);
+    
+    if (-1 == sequence) {
+      log.debug("No more splits to process");
+      return Collections.emptyList();
     }
 
-
     log.setLevel(getLogLevel(conf, sequence));
     validateOptions(conf, sequence);
 
@@ -1863,6 +1978,8 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     if (!autoAdjust)
       for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
         splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0]), sequence));
+    
+    log.info("Returning splits:" + splits);
     return splits;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d3c3d5f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 0df1f52..98f3e7a 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -53,12 +53,6 @@ import org.junit.Test;
 
 public class AccumuloInputFormatTest {
   
-  @Before
-  public void setup() {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
-  }
-  
   @After
   public void tearDown() throws Exception {}
   
@@ -397,7 +391,7 @@ public class AccumuloInputFormatTest {
   @Test
   public void testMultipleConfigurations() throws Exception {
     Configuration conf = new Configuration();
-    int seq1 = AccumuloInputFormat.nextSequence(), seq2 = AccumuloInputFormat.nextSequence();
+    int seq1 = AccumuloInputFormat.nextSequence(conf), seq2 = AccumuloInputFormat.nextSequence(conf);
     
     AccumuloInputFormat.setZooKeeperInstance(conf, seq1, "instance1", "zookeeper1");
     AccumuloInputFormat.setInputInfo(conf, seq1, "user1", "password1".getBytes(), "table1", new Authorizations("1"));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d3c3d5f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
index ef48cc4..bc1bd1a 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -34,6 +36,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -43,7 +46,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -75,12 +77,6 @@ public class AccumuloOutputFormatTest {
     }
   }
   
-  @Before
-  public void setup() {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
-  }
-  
   @Test
   public void testMR() throws Exception {
     MockInstance mockInstance = new MockInstance("testmrinstance");
@@ -136,8 +132,6 @@ public class AccumuloOutputFormatTest {
   
   @Test
   public void testMultiInstanceConfiguration() throws Exception {
-    MockInstance mockInstance1 = new MockInstance("testinstance1"), mockInstance2 = new MockInstance("testinstance2");
-    
     int seq1 = AccumuloOutputFormat.nextSequence(), seq2 = AccumuloOutputFormat.nextSequence();
     
     Configuration conf = new Configuration();
@@ -162,6 +156,41 @@ public class AccumuloOutputFormatTest {
     
     Instance inst2 = AccumuloOutputFormat.getInstance(conf, seq2);
     assertEquals("testinstance2", inst2.getInstanceName());
+  }
+  
+  @Test
+  public void testConfigEntries() throws Exception {
+    Configuration conf = new Configuration();
+    int seq1 = AccumuloOutputFormat.nextSequence(), seq2 = AccumuloOutputFormat.nextSequence();
+    
+    AccumuloOutputFormat.setOutputInfo(conf, seq1, "root1", "1".getBytes(), false, "testtable1");
+    AccumuloOutputFormat.setZooKeeperInstance(conf, seq1, "instance1", "zk1");
+    
+    AccumuloOutputFormat.setOutputInfo(conf, seq2, "root2", "2".getBytes(), true, "testtable2");
+    AccumuloOutputFormat.setZooKeeperInstance(conf, seq2, "instance2", "zk2");
+    
+    final String prefix = AccumuloOutputFormat.class.getSimpleName();
+    HashMap<String,String> expected = new HashMap<String,String>();
+    expected.put(prefix + ".username.1", "root1");
+    expected.put(prefix + ".password.1", new String(Base64.encodeBase64("1".getBytes())));
+    expected.put(prefix + ".createtables.1", "false");
+    expected.put(prefix + ".defaulttable.1", "testtable1");
+    expected.put(prefix + ".instanceName.1", "instance1");
+    expected.put(prefix + ".zooKeepers.1", "zk1");
+    expected.put(prefix + ".configured.1", "true");
+    expected.put(prefix + ".instanceConfigured.1", "true");
+
+    expected.put(prefix + ".username.2", "root2");
+    expected.put(prefix + ".password.2", new String(Base64.encodeBase64("2".getBytes())));
+    expected.put(prefix + ".createtables.2", "true");
+    expected.put(prefix + ".defaulttable.2", "testtable2");
+    expected.put(prefix + ".instanceName.2", "instance2");
+    expected.put(prefix + ".zooKeepers.2", "zk2");
+    expected.put(prefix + ".configured.2", "true");
+    expected.put(prefix + ".instanceConfigured.2", "true");
+    
+    Map<String,String> actual = AccumuloOutputFormat.getRelevantEntries(conf);
     
+    assertEquals(expected, actual);
   }
 }