You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/07 06:24:23 UTC

[2/6] git commit: ACCUMULO-1854 Add AIF test to retrieve relevant configuration entries. Remove the internal AtomicIntegers in favor of state management through the Configuration object.

ACCUMULO-1854 Add AIF test to retrieve relevant configuration entries.
Remove the internal AtomicIntegers in favor of state management through
the Configuration object.

Using static concurrent structures in the InputFormat didn't work well
when making multiple calls to the class and altering the state. By
holding state on the Configuration for the input format, we can
alleviate some of this and do better duplication checking.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5d3c3d5f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5d3c3d5f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5d3c3d5f

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 5d3c3d5fdd2691f87a5319672c1d9f7dd29f457f
Parents: 4111d1e
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 6 14:09:06 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Nov 6 14:09:06 2013 -0500

----------------------------------------------------------------------
 .../core/client/mapreduce/InputFormatBase.java  | 167 ++++++++++++++++---
 .../mapreduce/AccumuloInputFormatTest.java      |   8 +-
 .../mapreduce/AccumuloOutputFormatTest.java     |  47 +++++-
 3 files changed, 181 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d3c3d5f/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 32b7af5..32240b7 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -32,6 +32,7 @@ import java.net.URLEncoder;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -41,7 +42,8 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.StringTokenizer;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.jsp.jstl.core.Config;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -81,6 +83,7 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -149,32 +152,135 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
 
   private static final String SEQ_DELIM = ".";
   protected static final int DEFAULT_SEQUENCE = 0;
+  
+  private static final String COMMA = ",";
+  private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredsSeqs";
+  private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
+  private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
+  private static final String TRUE = "true";
 
   private static final String READ_OFFLINE = PREFIX + ".read.offline";
 
-  private static final AtomicInteger NUM_CONFIGURATIONS_LOADED = new AtomicInteger(0);
-  private static final AtomicInteger NUM_CONFIGURATIONS_PROCESSED = new AtomicInteger(0);
-
   /**
    * Get a unique identifier for these configurations
    * 
    * @return A unique number to provide to future AccumuloInputFormat calls
    */
-  public static int nextSequence() {
-    return NUM_CONFIGURATIONS_LOADED.incrementAndGet();
+  public static synchronized int nextSequence(Configuration conf) {
+    String value = conf.get(CONFIGURED_SEQUENCES);
+    if (null == value) {
+      conf.set(CONFIGURED_SEQUENCES, "1");
+      return 1;
+    } else {
+      String[] splitValues = StringUtils.split(value, COMMA);
+      int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
+      
+      conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
+      return newValue;
+    }
+  }
+  
+  /**
+   * Using the provided Configuration, return the next sequence number to process.
+   * @param conf A Configuration object used to store AccumuloInputFormat information into
+   * @return The next sequence number to process, -1 when finished.
+   * @throws NoSuchElementException
+   */
+  protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException {
+    String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
+    
+    // We haven't set anything, so we need to find the first to return
+    if (null == processedConfs || 0 == processedConfs.length) {
+      // Check to see if the default sequence was used
+      boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
+      
+      // If so, set that we're processing it and return the value of the default
+      if (defaultSeqUsed) {
+        conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
+        return DEFAULT_SEQUENCE;
+      }
+      
+      String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
+      
+      // There was *nothing* loaded, fail.
+      if (null == loadedConfs || 0 == loadedConfs.length) {
+        throw new NoSuchElementException("Sequence was requested to process but none exist to return");
+      }
+      
+      // We have loaded configuration(s), use the first
+      int firstLoaded = Integer.parseInt(loadedConfs[0]);
+      conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
+      
+      return firstLoaded;
+    }
+    
+    // We've previously parsed some confs, need to find the next one to load
+    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
+    String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
+    
+    // We only have the default sequence, no specifics.
+    // Getting here, we already know that we processed that default
+    if (null == configuredSequencesArray) {
+      return -1;
+    }
+
+    List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1);
+    
+    // If we used the default sequence ID, add that into the list of configured sequences
+    if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
+      configuredSequences.add(DEFAULT_SEQUENCE);
+      for (String configuredSequence : configuredSequencesArray) {
+        configuredSequences.add(Integer.parseInt(configuredSequence));
+      }
+    }
+    
+    int lastParsedSeqIndex = configuredSequences.size() - 1;
+    
+    // Find the next sequence number after the one we last processed
+    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
+      int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
+      
+      if (lastLoadedValue == lastProcessedSeq) {
+        break;
+      }
+    }
+    
+    // We either had no sequences to match or we matched the last configured sequence
+    // Both of which are equivalent to no (more) sequences to process
+    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) {
+      return -1;
+    }
+    
+    // Get the value of the sequence at that offset
+    int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
+    conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
+    
+    return nextSequence;
+  }
+  
+  protected static void setDefaultSequenceUsed(Configuration conf) {
+    String value = conf.get(DEFAULT_SEQ_USED);
+    if (null == value || !TRUE.equals(value)) {
+      conf.setBoolean(DEFAULT_SEQ_USED, true);
+    }
   }
 
   protected static String merge(String name, Integer sequence) {
     return name + SEQ_DELIM + sequence;
   }
   
-  /**
-   * For testing purposes, allows internal counters to be reset. Clients should not have
-   * to call this directly.
-   */
-  public static void resetCounters() {
-    NUM_CONFIGURATIONS_LOADED.set(0);
-    NUM_CONFIGURATIONS_PROCESSED.set(0);
+  public static Map<String,String> getRelevantEntries(Configuration conf) {
+    ArgumentChecker.notNull(conf);
+    
+    HashMap<String,String> confEntries = new HashMap<String,String>();
+    for (Entry<String,String> entry : conf) {
+      final String key = entry.getKey();
+      if (0 == key.indexOf(PREFIX)) {
+        confEntries.put(key, entry.getValue());
+      }
+    }
+    
+    return confEntries;
   }
 
   /**
@@ -193,6 +299,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          if true, enable usage of the IsolatedScanner. Otherwise, disable.
    */
   public static void setIsolated(Configuration conf, boolean enable) {
+    setDefaultSequenceUsed(conf);
     setIsolated(conf, DEFAULT_SEQUENCE, enable);
   }
 
@@ -224,6 +331,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          if true, enable usage of the ClientSideInteratorScanner. Otherwise, disable.
    */
   public static void setLocalIterators(Configuration conf, boolean enable) {
+    setDefaultSequenceUsed(conf);
     setLocalIterators(conf, DEFAULT_SEQUENCE, enable);
   }
 
@@ -261,6 +369,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the authorizations used to restrict data read
    */
   public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
+    setDefaultSequenceUsed(conf);
     setInputInfo(conf, DEFAULT_SEQUENCE, user, passwd, table, auths);
   }
 
@@ -310,6 +419,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          a comma-separated list of zookeeper servers
    */
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
+    setDefaultSequenceUsed(conf);
     setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
   }
 
@@ -350,6 +460,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the accumulo instance name
    */
   public static void setMockInstance(Configuration conf, String instanceName) {
+    setDefaultSequenceUsed(conf);
     setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
   }
 
@@ -383,6 +494,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the ranges that will be mapped over
    */
   public static void setRanges(Configuration conf, Collection<Range> ranges) {
+    setDefaultSequenceUsed(conf);
     setRanges(conf, DEFAULT_SEQUENCE, ranges);
   }
 
@@ -424,6 +536,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the Hadoop configuration object
    */
   public static void disableAutoAdjustRanges(Configuration conf) {
+    setDefaultSequenceUsed(conf);
     disableAutoAdjustRanges(conf, DEFAULT_SEQUENCE);
   }
 
@@ -453,6 +566,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @param regex
    */
   public static void setRegex(JobContext job, RegexType type, String regex) {
+    setDefaultSequenceUsed(job.getConfiguration());
     setRegex(job, DEFAULT_SEQUENCE, type, regex);
   }
 
@@ -509,6 +623,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *           if maxVersions is < 1
    */
   public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
+    setDefaultSequenceUsed(conf);
     setMaxVersions(conf, DEFAULT_SEQUENCE, maxVersions);
   }
 
@@ -557,6 +672,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    */
 
   public static void setScanOffline(Configuration conf, boolean scanOff) {
+    setDefaultSequenceUsed(conf);
     setScanOffline(conf, DEFAULT_SEQUENCE, scanOff);
   }
 
@@ -608,6 +724,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          selected. An empty set is the default and is equivalent to scanning the all columns.
    */
   public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
+    setDefaultSequenceUsed(conf);
     fetchColumns(conf, DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs);
   }
 
@@ -651,6 +768,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          the logging level
    */
   public static void setLogLevel(Configuration conf, Level level) {
+    setDefaultSequenceUsed(conf);
     setLogLevel(conf, DEFAULT_SEQUENCE, level);
   }
 
@@ -685,6 +803,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    *          The configuration of the iterator
    */
   public static void addIterator(Configuration conf, IteratorSetting cfg) {
+    setDefaultSequenceUsed(conf);
     addIterator(conf, DEFAULT_SEQUENCE, cfg);
   }
 
@@ -744,6 +863,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @deprecated since 1.4, see {@link #addIterator(Configuration, IteratorSetting)}
    */
   public static void setIterator(JobContext job, int priority, String iteratorClass, String iteratorName) {
+    setDefaultSequenceUsed(job.getConfiguration());
     setIterator(job, DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName);
   }
 
@@ -793,6 +913,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @deprecated since 1.4, see {@link #addIterator(Configuration, IteratorSetting)}
    */
   public static void setIteratorOption(JobContext job, String iteratorName, String key, String value) {
+    setDefaultSequenceUsed(job.getConfiguration());
     setIteratorOption(job, DEFAULT_SEQUENCE, iteratorName, key, value);
   }
 
@@ -1355,6 +1476,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
    * @see #setMaxVersions(Configuration, int)
    */
   protected static int getMaxVersions(Configuration conf) {
+    setDefaultSequenceUsed(conf);
     return getMaxVersions(conf, DEFAULT_SEQUENCE);
   }
 
@@ -1762,20 +1884,13 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
     final Configuration conf = job.getConfiguration();
-    final int sequence;
-    if (0 == NUM_CONFIGURATIONS_LOADED.get()) {
-      sequence = DEFAULT_SEQUENCE;
-      
-      log.debug("No sequence numbers were given, falling back to the default sequence number");
-    } else {
-      sequence = NUM_CONFIGURATIONS_PROCESSED.incrementAndGet();
-      
-      if (sequence > NUM_CONFIGURATIONS_LOADED.get()) {
-        log.warn("Attempting to load AccumuloInputFormat information from Configuration using a sequence number that wasn't assigned");
-      }
+    final int sequence = nextSequenceToProcess(conf);
+    
+    if (-1 == sequence) {
+      log.debug("No more splits to process");
+      return Collections.emptyList();
     }
 
-
     log.setLevel(getLogLevel(conf, sequence));
     validateOptions(conf, sequence);
 
@@ -1863,6 +1978,8 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
     if (!autoAdjust)
       for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
         splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0]), sequence));
+    
+    log.info("Returning splits:" + splits);
     return splits;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d3c3d5f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 0df1f52..98f3e7a 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -53,12 +53,6 @@ import org.junit.Test;
 
 public class AccumuloInputFormatTest {
   
-  @Before
-  public void setup() {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
-  }
-  
   @After
   public void tearDown() throws Exception {}
   
@@ -397,7 +391,7 @@ public class AccumuloInputFormatTest {
   @Test
   public void testMultipleConfigurations() throws Exception {
     Configuration conf = new Configuration();
-    int seq1 = AccumuloInputFormat.nextSequence(), seq2 = AccumuloInputFormat.nextSequence();
+    int seq1 = AccumuloInputFormat.nextSequence(conf), seq2 = AccumuloInputFormat.nextSequence(conf);
     
     AccumuloInputFormat.setZooKeeperInstance(conf, seq1, "instance1", "zookeeper1");
     AccumuloInputFormat.setInputInfo(conf, seq1, "user1", "password1".getBytes(), "table1", new Authorizations("1"));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5d3c3d5f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
index ef48cc4..bc1bd1a 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -34,6 +36,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -43,7 +46,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -75,12 +77,6 @@ public class AccumuloOutputFormatTest {
     }
   }
   
-  @Before
-  public void setup() {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
-  }
-  
   @Test
   public void testMR() throws Exception {
     MockInstance mockInstance = new MockInstance("testmrinstance");
@@ -136,8 +132,6 @@ public class AccumuloOutputFormatTest {
   
   @Test
   public void testMultiInstanceConfiguration() throws Exception {
-    MockInstance mockInstance1 = new MockInstance("testinstance1"), mockInstance2 = new MockInstance("testinstance2");
-    
     int seq1 = AccumuloOutputFormat.nextSequence(), seq2 = AccumuloOutputFormat.nextSequence();
     
     Configuration conf = new Configuration();
@@ -162,6 +156,41 @@ public class AccumuloOutputFormatTest {
     
     Instance inst2 = AccumuloOutputFormat.getInstance(conf, seq2);
     assertEquals("testinstance2", inst2.getInstanceName());
+  }
+  
+  @Test
+  public void testConfigEntries() throws Exception {
+    Configuration conf = new Configuration();
+    int seq1 = AccumuloOutputFormat.nextSequence(), seq2 = AccumuloOutputFormat.nextSequence();
+    
+    AccumuloOutputFormat.setOutputInfo(conf, seq1, "root1", "1".getBytes(), false, "testtable1");
+    AccumuloOutputFormat.setZooKeeperInstance(conf, seq1, "instance1", "zk1");
+    
+    AccumuloOutputFormat.setOutputInfo(conf, seq2, "root2", "2".getBytes(), true, "testtable2");
+    AccumuloOutputFormat.setZooKeeperInstance(conf, seq2, "instance2", "zk2");
+    
+    final String prefix = AccumuloOutputFormat.class.getSimpleName();
+    HashMap<String,String> expected = new HashMap<String,String>();
+    expected.put(prefix + ".username.1", "root1");
+    expected.put(prefix + ".password.1", new String(Base64.encodeBase64("1".getBytes())));
+    expected.put(prefix + ".createtables.1", "false");
+    expected.put(prefix + ".defaulttable.1", "testtable1");
+    expected.put(prefix + ".instanceName.1", "instance1");
+    expected.put(prefix + ".zooKeepers.1", "zk1");
+    expected.put(prefix + ".configured.1", "true");
+    expected.put(prefix + ".instanceConfigured.1", "true");
+
+    expected.put(prefix + ".username.2", "root2");
+    expected.put(prefix + ".password.2", new String(Base64.encodeBase64("2".getBytes())));
+    expected.put(prefix + ".createtables.2", "true");
+    expected.put(prefix + ".defaulttable.2", "testtable2");
+    expected.put(prefix + ".instanceName.2", "instance2");
+    expected.put(prefix + ".zooKeepers.2", "zk2");
+    expected.put(prefix + ".configured.2", "true");
+    expected.put(prefix + ".instanceConfigured.2", "true");
+    
+    Map<String,String> actual = AccumuloOutputFormat.getRelevantEntries(conf);
     
+    assertEquals(expected, actual);
   }
 }