You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2012/09/10 16:59:44 UTC
svn commit: r1382923 - in /accumulo/branches/1.4/src/core/src:
main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
Author: billie
Date: Mon Sep 10 14:59:43 2012
New Revision: 1382923
URL: http://svn.apache.org/viewvc?rev=1382923&view=rev
Log:
ACCUMULO-507 made setRanges write to a distributed cache file instead of storing ranges in the configuration
Modified:
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1382923&r1=1382922&r2=1382923&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Mon Sep 10 14:59:43 2012
@@ -310,17 +310,24 @@ public abstract class InputFormatBase<K,
*/
public static void setRanges(Configuration conf, Collection<Range> ranges) {
ArgumentChecker.notNull(ranges);
- ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
try {
+ FileSystem fs = FileSystem.get(conf);
+ Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".ranges");
+ conf.set(RANGES, file.toString());
+ FSDataOutputStream fos = fs.create(file, false);
+ fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+ fs.deleteOnExit(file);
+
+ fos.writeInt(ranges.size());
for (Range r : ranges) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- r.write(new DataOutputStream(baos));
- rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
+ r.write(fos);
}
- } catch (IOException ex) {
- throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
+ fos.close();
+
+ DistributedCache.addCacheFile(file.toUri(), conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to write ranges to file", e);
}
- conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
}
/**
@@ -793,12 +800,21 @@ public abstract class InputFormatBase<K,
*/
protected static List<Range> getRanges(Configuration conf) throws IOException {
ArrayList<Range> ranges = new ArrayList<Range>();
- for (String rangeString : conf.getStringCollection(RANGES)) {
- ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
- Range range = new Range();
- range.readFields(new DataInputStream(bais));
- ranges.add(range);
+ FileSystem fs = FileSystem.get(conf);
+ String rangePath = conf.get(RANGES);
+ if (rangePath == null)
+ return ranges;
+ Path file = new Path(rangePath);
+
+ FSDataInputStream fdis = fs.open(file);
+ int numRanges = fdis.readInt();
+ while (numRanges > 0) {
+ Range r = new Range();
+ r.readFields(fdis);
+ ranges.add(r);
+ numRanges--;
}
+ fdis.close();
return ranges;
}
Modified: accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1382923&r1=1382922&r2=1382923&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original)
+++ accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Mon Sep 10 14:59:43 2012
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import org.apache.accumulo.core.client.BatchWriter;
@@ -32,6 +33,7 @@ import org.apache.accumulo.core.client.m
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
@@ -277,14 +279,17 @@ public class AccumuloInputFormatTest {
static class TestMapper extends Mapper<Key,Value,Key,Value> {
Key key = null;
+ int first = 0;
int count = 0;
@Override
protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
if (key != null)
assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", count));
+ else
+ first = Integer.parseInt(k.getRow().toString(), 16) - 1;
+ assertEquals(k.getRow(), new Text(String.format("%09x", first + count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", first + count));
key = new Key(k);
count++;
}
@@ -309,10 +314,14 @@ public class AccumuloInputFormatTest {
job.setNumReduceTasks(0);
AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
+ HashSet<Range> ranges = new HashSet<Range>();
+ ranges.add(new Range("000000000", "000000010"));
+ ranges.add(new Range("000000100", "000000110"));
+ AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
AccumuloInputFormat input = new AccumuloInputFormat();
List<InputSplit> splits = input.getSplits(job);
- assertEquals(splits.size(), 1);
+ assertEquals(splits.size(), 2);
TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
for (InputSplit split : splits) {
@@ -322,6 +331,7 @@ public class AccumuloInputFormatTest {
Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split);
reader.initialize(split, context);
mapper.run(context);
+ assertEquals(mapper.count, 16);
}
}
@@ -342,7 +352,9 @@ public class AccumuloInputFormatTest {
AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations());
AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
AccumuloInputFormat input = new AccumuloInputFormat();
- RangeInputSplit ris = new RangeInputSplit();
+ List<InputSplit> splits = input.getSplits(job);
+ assertEquals(splits.size(), 1);
+ RangeInputSplit ris = (RangeInputSplit) splits.get(0);
TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
rr.initialize(ris, tac);
@@ -352,5 +364,6 @@ public class AccumuloInputFormatTest {
while (rr.nextKeyValue()) {
mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context);
}
+ assertEquals(mapper.count, 100);
}
}