You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/07 06:24:26 UTC
[5/6] git commit: ACCUMULO-1854 Lift duplicated code between AIF and
AOF into a helper class
ACCUMULO-1854 Lift duplicated code between AIF and AOF into a helper
class
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f10a6ff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f10a6ff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f10a6ff
Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 0f10a6ffb0400424d30d3f49312bb500265cb276
Parents: 1fe2238
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 6 17:51:06 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Nov 6 17:51:06 2013 -0500
----------------------------------------------------------------------
.../client/mapreduce/AccumuloOutputFormat.java | 130 +++----------
.../core/client/mapreduce/InputFormatBase.java | 184 +++++--------------
.../client/mapreduce/SequencedFormatHelper.java | 145 +++++++++++++++
.../mapreduce/AccumuloInputFormatTest.java | 7 +-
4 files changed, 214 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 5e5e43d..dd9762e 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -96,14 +96,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute
private static final int DEFAULT_NUM_WRITE_THREADS = 2;
- private static final int DEFAULT_SEQUENCE = 0;
private static final String SEQ_DELIM = ".";
- private static final String COMMA = ",";
private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredSeqs";
- private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
- private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
- private static final String TRUE = "true";
@@ -113,17 +108,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
* @return A unique number to provide to future AccumuloInputFormat calls
*/
public static synchronized int nextSequence(Configuration conf) {
- String value = conf.get(CONFIGURED_SEQUENCES);
- if (null == value) {
- conf.set(CONFIGURED_SEQUENCES, "1");
- return 1;
- } else {
- String[] splitValues = StringUtils.split(value, COMMA);
- int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
-
- conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
- return newValue;
- }
+ return SequencedFormatHelper.nextSequence(conf, PREFIX);
}
/**
@@ -133,84 +118,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
* @throws NoSuchElementException
*/
protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException {
- String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
-
- // We haven't set anything, so we need to find the first to return
- if (null == processedConfs || 0 == processedConfs.length) {
- // Check to see if the default sequence was used
- boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
-
- // If so, set that we're processing it and return the value of the default
- if (defaultSeqUsed) {
- conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
- return DEFAULT_SEQUENCE;
- }
-
- String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
-
- // There was *nothing* loaded, fail.
- if (null == loadedConfs || 0 == loadedConfs.length) {
- throw new NoSuchElementException("Sequence was requested to process but none exist to return");
- }
-
- // We have loaded configuration(s), use the first
- int firstLoaded = Integer.parseInt(loadedConfs[0]);
- conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
-
- return firstLoaded;
- }
-
- // We've previously parsed some confs, need to find the next one to load
- int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
- String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
-
- // We only have the default sequence, no specifics.
- // Getting here, we already know that we processed that default
- if (null == configuredSequencesArray) {
- return -1;
- }
-
- List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1);
-
- // If we used the default sequence ID, add that into the list of configured sequences
- if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
- configuredSequences.add(DEFAULT_SEQUENCE);
- }
-
- // Add the rest of any sequences to our list
- for (String configuredSequence : configuredSequencesArray) {
- configuredSequences.add(Integer.parseInt(configuredSequence));
- }
-
- int lastParsedSeqIndex = configuredSequences.size() - 1;
-
- // Find the next sequence number after the one we last processed
- for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
- int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
-
- if (lastLoadedValue == lastProcessedSeq) {
- break;
- }
- }
-
- // We either had no sequences to match or we matched the last configured sequence
- // Both of which are equivalent to no (more) sequences to process
- if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) {
- return -1;
- }
-
- // Get the value of the sequence at that offset
- int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
- conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
-
- return nextSequence;
+ return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX);
}
protected static void setDefaultSequenceUsed(Configuration conf) {
- String value = conf.get(DEFAULT_SEQ_USED);
- if (null == value || !TRUE.equals(value)) {
- conf.setBoolean(DEFAULT_SEQ_USED, true);
- }
+ SequencedFormatHelper.setDefaultSequenceUsed(conf, PREFIX);
}
protected static String merge(String name, Integer sequence) {
@@ -266,7 +178,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
*/
public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) {
setDefaultSequenceUsed(conf);
- setOutputInfo(conf, DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable);
+ setOutputInfo(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable);
}
/**
@@ -306,7 +218,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
setDefaultSequenceUsed(conf);
- setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
+ setZooKeeperInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName, zooKeepers);
}
public static void setZooKeeperInstance(Configuration conf, int sequence, String instanceName, String zooKeepers) {
@@ -329,7 +241,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
public static void setMockInstance(Configuration conf, String instanceName) {
setDefaultSequenceUsed(conf);
- setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
+ setMockInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName);
}
public static void setMockInstance(Configuration conf, int sequence, String instanceName) {
@@ -347,7 +259,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
setDefaultSequenceUsed(conf);
- setMaxMutationBufferSize(conf, DEFAULT_SEQUENCE, numberOfBytes);
+ setMaxMutationBufferSize(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfBytes);
}
public static void setMaxMutationBufferSize(Configuration conf, int sequence, long numberOfBytes) {
@@ -363,7 +275,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
setDefaultSequenceUsed(conf);
- setMaxLatency(conf, DEFAULT_SEQUENCE, numberOfMilliseconds);
+ setMaxLatency(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfMilliseconds);
}
public static void setMaxLatency(Configuration conf, int sequence, int numberOfMilliseconds) {
@@ -379,7 +291,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
setDefaultSequenceUsed(conf);
- setMaxWriteThreads(conf, DEFAULT_SEQUENCE, numberOfThreads);
+ setMaxWriteThreads(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfThreads);
}
public static void setMaxWriteThreads(Configuration conf, int sequence, int numberOfThreads) {
@@ -395,7 +307,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
public static void setLogLevel(Configuration conf, Level level) {
setDefaultSequenceUsed(conf);
- setLogLevel(conf, DEFAULT_SEQUENCE, level);
+ setLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, level);
}
public static void setLogLevel(Configuration conf, int sequence, Level level) {
@@ -412,7 +324,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
public static void setSimulationMode(Configuration conf) {
setDefaultSequenceUsed(conf);
- setSimulationMode(conf, DEFAULT_SEQUENCE);
+ setSimulationMode(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
public static void setSimulationMode(Configuration conf, int sequence) {
@@ -427,7 +339,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static String getUsername(Configuration conf) {
- return getUsername(conf, DEFAULT_SEQUENCE);
+ return getUsername(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static String getUsername(Configuration conf, int sequence) {
@@ -444,7 +356,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
return getPassword(job.getConfiguration());
}
protected static byte[] getPassword(Configuration conf) {
- return getPassword(conf, DEFAULT_SEQUENCE);
+ return getPassword(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -463,7 +375,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static boolean canCreateTables(Configuration conf) {
- return canCreateTables(conf, DEFAULT_SEQUENCE);
+ return canCreateTables(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static boolean canCreateTables(Configuration conf, int sequence) {
@@ -478,7 +390,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static String getDefaultTableName(Configuration conf) {
- return getDefaultTableName(conf, DEFAULT_SEQUENCE);
+ return getDefaultTableName(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static String getDefaultTableName(Configuration conf, int sequence) {
@@ -493,7 +405,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static Instance getInstance(Configuration conf) {
- return getInstance(conf, DEFAULT_SEQUENCE);
+ return getInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static Instance getInstance(Configuration conf, int sequence) {
@@ -510,7 +422,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static long getMaxMutationBufferSize(Configuration conf) {
- return getMaxMutationBufferSize(conf, DEFAULT_SEQUENCE);
+ return getMaxMutationBufferSize(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static long getMaxMutationBufferSize(Configuration conf, int sequence) {
@@ -525,7 +437,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static int getMaxLatency(Configuration conf) {
- return getMaxLatency(conf, DEFAULT_SEQUENCE);
+ return getMaxLatency(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static int getMaxLatency(Configuration conf, int sequence) {
@@ -540,7 +452,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static int getMaxWriteThreads(Configuration conf) {
- return getMaxWriteThreads(conf, DEFAULT_SEQUENCE);
+ return getMaxWriteThreads(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static int getMaxWriteThreads(Configuration conf, int sequence) {
@@ -555,7 +467,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static Level getLogLevel(Configuration conf) {
- return getLogLevel(conf, DEFAULT_SEQUENCE);
+ return getLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static Level getLogLevel(Configuration conf, int sequence) {
@@ -573,7 +485,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
}
protected static boolean getSimulationMode(Configuration conf) {
- return getSimulationMode(conf, DEFAULT_SEQUENCE);
+ return getSimulationMode(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static boolean getSimulationMode(Configuration conf, int sequence) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 7042f19..5c87c13 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -151,124 +151,30 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
private static final String ITERATORS_DELIM = ",";
private static final String SEQ_DELIM = ".";
- protected static final int DEFAULT_SEQUENCE = 0;
- private static final String COMMA = ",";
- private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredsSeqs";
- private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
- private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
- private static final String TRUE = "true";
private static final String READ_OFFLINE = PREFIX + ".read.offline";
+ protected static String merge(String name, Integer sequence) {
+ return name + SEQ_DELIM + sequence;
+ }
+
+
/**
* Get a unique identifier for these configurations
*
* @return A unique number to provide to future AccumuloInputFormat calls
*/
public static synchronized int nextSequence(Configuration conf) {
- String value = conf.get(CONFIGURED_SEQUENCES);
- if (null == value) {
- conf.set(CONFIGURED_SEQUENCES, "1");
- return 1;
- } else {
- String[] splitValues = StringUtils.split(value, COMMA);
- int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
-
- conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
- return newValue;
- }
+ return SequencedFormatHelper.nextSequence(conf, PREFIX);
}
- /**
- * Using the provided Configuration, return the next sequence number to process.
- * @param conf A Configuration object used to store AccumuloInputFormat information into
- * @return The next sequence number to process, -1 when finished.
- * @throws NoSuchElementException
- */
- protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException {
- String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
-
- // We haven't set anything, so we need to find the first to return
- if (null == processedConfs || 0 == processedConfs.length) {
- // Check to see if the default sequence was used
- boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
-
- // If so, set that we're processing it and return the value of the default
- if (defaultSeqUsed) {
- conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
- return DEFAULT_SEQUENCE;
- }
-
- String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
-
- // There was *nothing* loaded, fail.
- if (null == loadedConfs || 0 == loadedConfs.length) {
- throw new NoSuchElementException("Sequence was requested to process but none exist to return");
- }
-
- // We have loaded configuration(s), use the first
- int firstLoaded = Integer.parseInt(loadedConfs[0]);
- conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
-
- return firstLoaded;
- }
-
- // We've previously parsed some confs, need to find the next one to load
- int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
- String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
-
- // We only have the default sequence, no specifics.
- // Getting here, we already know that we processed that default
- if (null == configuredSequencesArray) {
- return -1;
- }
-
- List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1);
-
- // If we used the default sequence ID, add that into the list of configured sequences
- if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
- configuredSequences.add(DEFAULT_SEQUENCE);
- }
-
- // Add the rest of any sequences to our list
- for (String configuredSequence : configuredSequencesArray) {
- configuredSequences.add(Integer.parseInt(configuredSequence));
- }
-
- int lastParsedSeqIndex = configuredSequences.size() - 1;
-
- // Find the next sequence number after the one we last processed
- for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
- int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
-
- if (lastLoadedValue == lastProcessedSeq) {
- break;
- }
- }
-
- // We either had no sequences to match or we matched the last configured sequence
- // Both of which are equivalent to no (more) sequences to process
- if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) {
- return -1;
- }
-
- // Get the value of the sequence at that offset
- int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
- conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
-
- return nextSequence;
+ protected static int nextSequenceToProcess(Configuration conf) {
+ return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX);
}
protected static void setDefaultSequenceUsed(Configuration conf) {
- String value = conf.get(DEFAULT_SEQ_USED);
- if (null == value || !TRUE.equals(value)) {
- conf.setBoolean(DEFAULT_SEQ_USED, true);
- }
- }
-
- protected static String merge(String name, Integer sequence) {
- return name + SEQ_DELIM + sequence;
+ SequencedFormatHelper.setDefaultSequenceUsed(conf, PREFIX);
}
public static Map<String,String> getRelevantEntries(Configuration conf) {
@@ -302,7 +208,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setIsolated(Configuration conf, boolean enable) {
setDefaultSequenceUsed(conf);
- setIsolated(conf, DEFAULT_SEQUENCE, enable);
+ setIsolated(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, enable);
}
/**
@@ -334,7 +240,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setLocalIterators(Configuration conf, boolean enable) {
setDefaultSequenceUsed(conf);
- setLocalIterators(conf, DEFAULT_SEQUENCE, enable);
+ setLocalIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, enable);
}
/**
@@ -372,7 +278,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
setDefaultSequenceUsed(conf);
- setInputInfo(conf, DEFAULT_SEQUENCE, user, passwd, table, auths);
+ setInputInfo(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, user, passwd, table, auths);
}
/**
@@ -422,7 +328,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
setDefaultSequenceUsed(conf);
- setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
+ setZooKeeperInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName, zooKeepers);
}
/**
@@ -463,7 +369,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setMockInstance(Configuration conf, String instanceName) {
setDefaultSequenceUsed(conf);
- setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
+ setMockInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName);
}
/**
@@ -497,7 +403,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setRanges(Configuration conf, Collection<Range> ranges) {
setDefaultSequenceUsed(conf);
- setRanges(conf, DEFAULT_SEQUENCE, ranges);
+ setRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, ranges);
}
/**
@@ -539,7 +445,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void disableAutoAdjustRanges(Configuration conf) {
setDefaultSequenceUsed(conf);
- disableAutoAdjustRanges(conf, DEFAULT_SEQUENCE);
+ disableAutoAdjustRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -569,7 +475,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setRegex(JobContext job, RegexType type, String regex) {
setDefaultSequenceUsed(job.getConfiguration());
- setRegex(job, DEFAULT_SEQUENCE, type, regex);
+ setRegex(job, SequencedFormatHelper.DEFAULT_SEQUENCE, type, regex);
}
/**
@@ -626,7 +532,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
setDefaultSequenceUsed(conf);
- setMaxVersions(conf, DEFAULT_SEQUENCE, maxVersions);
+ setMaxVersions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, maxVersions);
}
/**
@@ -675,7 +581,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
public static void setScanOffline(Configuration conf, boolean scanOff) {
setDefaultSequenceUsed(conf);
- setScanOffline(conf, DEFAULT_SEQUENCE, scanOff);
+ setScanOffline(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, scanOff);
}
/**
@@ -727,7 +633,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
setDefaultSequenceUsed(conf);
- fetchColumns(conf, DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs);
+ fetchColumns(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs);
}
/**
@@ -771,7 +677,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setLogLevel(Configuration conf, Level level) {
setDefaultSequenceUsed(conf);
- setLogLevel(conf, DEFAULT_SEQUENCE, level);
+ setLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, level);
}
/**
@@ -806,7 +712,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void addIterator(Configuration conf, IteratorSetting cfg) {
setDefaultSequenceUsed(conf);
- addIterator(conf, DEFAULT_SEQUENCE, cfg);
+ addIterator(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, cfg);
}
/**
@@ -866,7 +772,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setIterator(JobContext job, int priority, String iteratorClass, String iteratorName) {
setDefaultSequenceUsed(job.getConfiguration());
- setIterator(job, DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName);
+ setIterator(job, SequencedFormatHelper.DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName);
}
/**
@@ -916,7 +822,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
public static void setIteratorOption(JobContext job, String iteratorName, String key, String value) {
setDefaultSequenceUsed(job.getConfiguration());
- setIteratorOption(job, DEFAULT_SEQUENCE, iteratorName, key, value);
+ setIteratorOption(job, SequencedFormatHelper.DEFAULT_SEQUENCE, iteratorName, key, value);
}
/**
@@ -967,7 +873,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setIsolated(Configuration, boolean)
*/
protected static boolean isIsolated(Configuration conf) {
- return isIsolated(conf, DEFAULT_SEQUENCE);
+ return isIsolated(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -998,7 +904,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setLocalIterators(Configuration, boolean)
*/
protected static boolean usesLocalIterators(Configuration conf) {
- return usesLocalIterators(conf, DEFAULT_SEQUENCE);
+ return usesLocalIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1029,7 +935,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
*/
protected static String getUsername(Configuration conf) {
- return getUsername(conf, DEFAULT_SEQUENCE);
+ return getUsername(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1064,7 +970,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
*/
protected static byte[] getPassword(Configuration conf) {
- return getPassword(conf, DEFAULT_SEQUENCE);
+ return getPassword(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1096,7 +1002,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
*/
protected static String getTablename(Configuration conf) {
- return getTablename(conf, DEFAULT_SEQUENCE);
+ return getTablename(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1127,7 +1033,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
*/
protected static Authorizations getAuthorizations(Configuration conf) {
- return getAuthorizations(conf, DEFAULT_SEQUENCE);
+ return getAuthorizations(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1160,7 +1066,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setMockInstance(Configuration, String)
*/
protected static Instance getInstance(Configuration conf) {
- return getInstance(conf, DEFAULT_SEQUENCE);
+ return getInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1195,7 +1101,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* if the table name set on the configuration doesn't exist
*/
protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
- return getTabletLocator(conf, DEFAULT_SEQUENCE);
+ return getTabletLocator(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1236,7 +1142,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setRanges(Configuration, Collection)
*/
protected static List<Range> getRanges(Configuration conf) throws IOException {
- return getRanges(conf, DEFAULT_SEQUENCE);
+ return getRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1265,7 +1171,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setRegex(JobContext, RegexType, String)
*/
protected static String getRegex(JobContext job, RegexType type) {
- return getRegex(job, DEFAULT_SEQUENCE, type);
+ return getRegex(job, SequencedFormatHelper.DEFAULT_SEQUENCE, type);
}
/**
@@ -1317,7 +1223,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #fetchColumns(Configuration, Collection)
*/
protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
- return getFetchedColumns(conf, DEFAULT_SEQUENCE);
+ return getFetchedColumns(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1355,7 +1261,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #disableAutoAdjustRanges(Configuration)
*/
protected static boolean getAutoAdjustRanges(Configuration conf) {
- return getAutoAdjustRanges(conf, DEFAULT_SEQUENCE);
+ return getAutoAdjustRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1386,7 +1292,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #setLogLevel(Configuration, Level)
*/
protected static Level getLogLevel(Configuration conf) {
- return getLogLevel(conf, DEFAULT_SEQUENCE);
+ return getLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1421,7 +1327,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* if the configuration is improperly configured
*/
protected static void validateOptions(Configuration conf) throws IOException {
- validateOptions(conf, DEFAULT_SEQUENCE);
+ validateOptions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
// InputFormat doesn't have the equivalent of OutputFormat's
@@ -1479,7 +1385,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
*/
protected static int getMaxVersions(Configuration conf) {
setDefaultSequenceUsed(conf);
- return getMaxVersions(conf, DEFAULT_SEQUENCE);
+ return getMaxVersions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1495,7 +1401,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
}
protected static boolean isOfflineScan(Configuration conf) {
- return isOfflineScan(conf, DEFAULT_SEQUENCE);
+ return isOfflineScan(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
protected static boolean isOfflineScan(Configuration conf, int sequence) {
@@ -1520,7 +1426,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #addIterator(Configuration, IteratorSetting)
*/
protected static List<AccumuloIterator> getIterators(Configuration conf) {
- return getIterators(conf, DEFAULT_SEQUENCE);
+ return getIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1565,7 +1471,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @see #addIterator(Configuration, IteratorSetting)
*/
protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
- return getIteratorOptions(conf, DEFAULT_SEQUENCE);
+ return getIteratorOptions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
/**
@@ -1635,7 +1541,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @deprecated Use {@link #setupIterators(Configuration,Scanner)} instead
*/
protected void setupIterators(TaskAttemptContext attempt, Scanner scanner) throws AccumuloException {
- setupIterators(attempt.getConfiguration(), DEFAULT_SEQUENCE, scanner);
+ setupIterators(attempt.getConfiguration(), SequencedFormatHelper.DEFAULT_SEQUENCE, scanner);
}
/**
@@ -1667,7 +1573,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
* @deprecated Use {@link #setupMaxVersions(Configuration,Scanner)} instead
*/
protected void setupMaxVersions(TaskAttemptContext attempt, Scanner scanner) {
- setupMaxVersions(attempt.getConfiguration(), DEFAULT_SEQUENCE, scanner);
+ setupMaxVersions(attempt.getConfiguration(), SequencedFormatHelper.DEFAULT_SEQUENCE, scanner);
}
/**
@@ -2046,7 +1952,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
}
RangeInputSplit(String table, Range range, String[] locations) {
- this(table, range, locations, DEFAULT_SEQUENCE);
+ this(table, range, locations, SequencedFormatHelper.DEFAULT_SEQUENCE);
}
RangeInputSplit(String table, Range range, String[] locations, int sequence) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
new file mode 100644
index 0000000..ff18754
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
@@ -0,0 +1,145 @@
+package org.apache.accumulo.core.client.mapreduce;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Convenience class with methods useful to dealing with multiple configurations of AccumuloInputFormat and/or AccumuloOutputFormat in the same Configuration
+ * object.
+ */
+public class SequencedFormatHelper {
+
+ private static final String COMMA = ",";
+ private static final String TRUE = "true";
+ protected static final int DEFAULT_SEQUENCE = 0;
+
+ private static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed";
+ private static final String CONFIGURED_SEQUENCES = ".configuredSeqs";
+ private static final String PROCESSED_SEQUENCES = ".processedSeqs";
+
+ /**
+ * Get a unique identifier for these configurations
+ *
+ * @return A unique number to provide to future AccumuloInputFormat calls
+ */
+ public static synchronized int nextSequence(Configuration conf, String prefix) {
+ ArgumentChecker.notNull(conf, prefix);
+
+ final String configuredSequences = prefix + CONFIGURED_SEQUENCES;
+
+ String value = conf.get(configuredSequences);
+ if (null == value) {
+ conf.set(configuredSequences, "1");
+ return 1;
+ } else {
+ String[] splitValues = StringUtils.split(value, COMMA);
+ int newValue = Integer.parseInt(splitValues[splitValues.length - 1]) + 1;
+
+ conf.set(configuredSequences, value + COMMA + newValue);
+ return newValue;
+ }
+ }
+
+ protected static void setDefaultSequenceUsed(Configuration conf, String prefix) {
+ ArgumentChecker.notNull(conf, prefix);
+
+ final String configuredSequences = prefix + DEFAULT_SEQ_USED;
+
+ String value = conf.get(configuredSequences);
+ if (null == value || !TRUE.equals(value)) {
+ conf.setBoolean(configuredSequences, true);
+ }
+ }
+
+ /**
+ * Using the provided Configuration, return the next sequence number to process.
+ *
+ * @param conf
+ * A Configuration object used to store AccumuloInputFormat information into
+ * @return The next sequence number to process, -1 when finished.
+ * @throws NoSuchElementException
+ */
+ protected static synchronized int nextSequenceToProcess(Configuration conf, String prefix) throws NoSuchElementException {
+ ArgumentChecker.notNull(prefix);
+
+ final String processedSequences = prefix + PROCESSED_SEQUENCES, defaultSequenceUsed = prefix + DEFAULT_SEQ_USED, configuredSequences = prefix
+ + CONFIGURED_SEQUENCES;
+
+ String[] processedConfs = conf.getStrings(processedSequences);
+
+ // We haven't set anything, so we need to find the first to return
+ if (null == processedConfs || 0 == processedConfs.length) {
+ // Check to see if the default sequence was used
+ boolean defaultSeqUsed = conf.getBoolean(defaultSequenceUsed, false);
+
+ // If so, set that we're processing it and return the value of the default
+ if (defaultSeqUsed) {
+ conf.set(processedSequences, Integer.toString(DEFAULT_SEQUENCE));
+ return DEFAULT_SEQUENCE;
+ }
+
+ String[] loadedConfs = conf.getStrings(configuredSequences);
+
+ // There was *nothing* loaded, fail.
+ if (null == loadedConfs || 0 == loadedConfs.length) {
+ throw new NoSuchElementException("Sequence was requested to process but none exist to return");
+ }
+
+ // We have loaded configuration(s), use the first
+ int firstLoaded = Integer.parseInt(loadedConfs[0]);
+ conf.setInt(processedSequences, firstLoaded);
+
+ return firstLoaded;
+ }
+
+ // We've previously parsed some confs, need to find the next one to load
+ int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
+ String[] configuredSequencesArray = conf.getStrings(configuredSequences);
+
+ // We only have the default sequence, no specifics.
+ // Getting here, we already know that we processed that default
+ if (null == configuredSequencesArray) {
+ return -1;
+ }
+
+ List<Integer> configuredSequencesList = new ArrayList<Integer>(configuredSequencesArray.length + 1);
+
+ // If we used the default sequence ID, add that into the list of configured sequences
+ if (conf.getBoolean(defaultSequenceUsed, false)) {
+ configuredSequencesList.add(DEFAULT_SEQUENCE);
+ }
+
+ // Add the rest of any sequences to our list
+ for (String configuredSequence : configuredSequencesArray) {
+ configuredSequencesList.add(Integer.parseInt(configuredSequence));
+ }
+
+ int lastParsedSeqIndex = configuredSequencesList.size() - 1;
+
+ // Find the next sequence number after the one we last processed
+ for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
+ int lastLoadedValue = configuredSequencesList.get(lastParsedSeqIndex);
+
+ if (lastLoadedValue == lastProcessedSeq) {
+ break;
+ }
+ }
+
+ // We either had no sequences to match or we matched the last configured sequence
+ // Both of which are equivalent to no (more) sequences to process
+ if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequencesList.size()) {
+ return -1;
+ }
+
+ // Get the value of the sequence at that offset
+ int nextSequence = configuredSequencesList.get(lastParsedSeqIndex + 1);
+ conf.set(processedSequences, conf.get(processedSequences) + COMMA + nextSequence);
+
+ return nextSequence;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 98f3e7a..a6f5c48 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
public class AccumuloInputFormatTest {
@@ -101,7 +100,7 @@ public class AccumuloInputFormatTest {
AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow");
Configuration conf = job.getConfiguration();
- String iterators = conf.get("AccumuloInputFormat.iterators." + InputFormatBase.DEFAULT_SEQUENCE);
+ String iterators = conf.get("AccumuloInputFormat.iterators." + SequencedFormatHelper.DEFAULT_SEQUENCE);
assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators);
}
@@ -164,7 +163,7 @@ public class AccumuloInputFormatTest {
final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString();
- assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options." + InputFormatBase.DEFAULT_SEQUENCE));
+ assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options." + SequencedFormatHelper.DEFAULT_SEQUENCE));
List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
assertEquals(1, opts.size());
@@ -228,7 +227,7 @@ public class AccumuloInputFormatTest {
AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue");
Configuration conf = job.getConfiguration();
- String options = conf.get("AccumuloInputFormat.iterators.options." + InputFormatBase.DEFAULT_SEQUENCE);
+ String options = conf.get("AccumuloInputFormat.iterators.options." + SequencedFormatHelper.DEFAULT_SEQUENCE);
assertEquals(new String("someIterator:aKey:aValue"), options);
}