You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2012/09/10 16:59:44 UTC

svn commit: r1382923 - in /accumulo/branches/1.4/src/core/src: main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java

Author: billie
Date: Mon Sep 10 14:59:43 2012
New Revision: 1382923

URL: http://svn.apache.org/viewvc?rev=1382923&view=rev
Log:
ACCUMULO-507 made setRanges write to a distributed cache file instead of storing ranges in the configuration

Modified:
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1382923&r1=1382922&r2=1382923&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Mon Sep 10 14:59:43 2012
@@ -310,17 +310,24 @@ public abstract class InputFormatBase<K,
    */
   public static void setRanges(Configuration conf, Collection<Range> ranges) {
     ArgumentChecker.notNull(ranges);
-    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
     try {
+      FileSystem fs = FileSystem.get(conf);
+      Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".ranges");
+      conf.set(RANGES, file.toString());
+      FSDataOutputStream fos = fs.create(file, false);
+      fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+      fs.deleteOnExit(file);
+      
+      fos.writeInt(ranges.size());
       for (Range r : ranges) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        r.write(new DataOutputStream(baos));
-        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
+        r.write(fos);
       }
-    } catch (IOException ex) {
-      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
+      fos.close();
+      
+      DistributedCache.addCacheFile(file.toUri(), conf);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to write ranges to file", e);
     }
-    conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
   }
   
   /**
@@ -793,12 +800,21 @@ public abstract class InputFormatBase<K,
    */
   protected static List<Range> getRanges(Configuration conf) throws IOException {
     ArrayList<Range> ranges = new ArrayList<Range>();
-    for (String rangeString : conf.getStringCollection(RANGES)) {
-      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
-      Range range = new Range();
-      range.readFields(new DataInputStream(bais));
-      ranges.add(range);
+    FileSystem fs = FileSystem.get(conf);
+    String rangePath = conf.get(RANGES);
+    if (rangePath == null)
+      return ranges;
+    Path file = new Path(rangePath);
+    
+    FSDataInputStream fdis = fs.open(file);
+    int numRanges = fdis.readInt();
+    while (numRanges > 0) {
+      Range r = new Range();
+      r.readFields(fdis);
+      ranges.add(r);
+      numRanges--;
     }
+    fdis.close();
     return ranges;
   }
   

Modified: accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1382923&r1=1382922&r2=1382923&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original)
+++ accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Mon Sep 10 14:59:43 2012
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -32,6 +33,7 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.Authorizations;
@@ -277,14 +279,17 @@ public class AccumuloInputFormatTest {
   
   static class TestMapper extends Mapper<Key,Value,Key,Value> {
     Key key = null;
+    int first = 0;
     int count = 0;
     
     @Override
     protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
       if (key != null)
         assertEquals(key.getRow().toString(), new String(v.get()));
-      assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
-      assertEquals(new String(v.get()), String.format("%09x", count));
+      else
+        first = Integer.parseInt(k.getRow().toString(), 16) - 1;
+      assertEquals(k.getRow(), new Text(String.format("%09x", first + count + 1)));
+      assertEquals(new String(v.get()), String.format("%09x", first + count));
       key = new Key(k);
       count++;
     }
@@ -309,10 +314,14 @@ public class AccumuloInputFormatTest {
     job.setNumReduceTasks(0);
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
     AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
+    HashSet<Range> ranges = new HashSet<Range>();
+    ranges.add(new Range("000000000", "000000010"));
+    ranges.add(new Range("000000100", "000000110"));
+    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
     
     AccumuloInputFormat input = new AccumuloInputFormat();
     List<InputSplit> splits = input.getSplits(job);
-    assertEquals(splits.size(), 1);
+    assertEquals(splits.size(), 2);
     
     TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
     for (InputSplit split : splits) {
@@ -322,6 +331,7 @@ public class AccumuloInputFormatTest {
       Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split);
       reader.initialize(split, context);
       mapper.run(context);
+      assertEquals(mapper.count, 16);
     }
   }
   
@@ -342,7 +352,9 @@ public class AccumuloInputFormatTest {
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations());
     AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
     AccumuloInputFormat input = new AccumuloInputFormat();
-    RangeInputSplit ris = new RangeInputSplit();
+    List<InputSplit> splits = input.getSplits(job);
+    assertEquals(splits.size(), 1);
+    RangeInputSplit ris = (RangeInputSplit) splits.get(0);
     TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
     RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
     rr.initialize(ris, tac);
@@ -352,5 +364,6 @@ public class AccumuloInputFormatTest {
     while (rr.nextKeyValue()) {
       mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context);
     }
+    assertEquals(mapper.count, 100);
   }
 }