You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2016/06/02 15:04:50 UTC

[1/2] accumulo git commit: ACCUMULO-4165 Added a user facing API for RFile

Repository: accumulo
Updated Branches:
  refs/heads/1.8 c8621bbc8 -> 61a7de4a1


http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
new file mode 100644
index 0000000..3029592
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class RFileTest {
+
+  // method created to foil findbugs... it was complaining ret val not used when it did not matter
+  private void foo(boolean b) {}
+
+  private String createTmpTestFile() throws IOException {
+    File dir = new File(System.getProperty("user.dir") + "/target/rfile-test");
+    foo(dir.mkdirs());
+    File testFile = File.createTempFile("test", ".rf", dir);
+    foo(testFile.delete());
+    return testFile.getAbsolutePath();
+  }
+
+  String rowStr(int r) {
+    return String.format("%06x", r);
+  }
+
+  String colStr(int c) {
+    return String.format("%04x", c);
+  }
+
+  private SortedMap<Key,Value> createTestData(int rows, int families, int qualifiers) {
+    return createTestData(0, rows, 0, families, qualifiers);
+  }
+
+  private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families, int qualifiers) {
+    TreeMap<Key,Value> testData = new TreeMap<>();
+
+    for (int r = 0; r < rows; r++) {
+      String row = rowStr(r + startRow);
+      for (int f = 0; f < families; f++) {
+        String fam = colStr(f + startFamily);
+        for (int q = 0; q < qualifiers; q++) {
+          String qual = colStr(q);
+          Key k = new Key(row, fam, qual);
+          testData.put(k, new Value((k.hashCode() + "").getBytes()));
+        }
+      }
+    }
+
+    return testData;
+  }
+
+  private String createRFile(SortedMap<Key,Value> testData) throws Exception {
+    String testFile = createTmpTestFile();
+
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(FileSystem.getLocal(new Configuration())).build()) {
+      writer.startDefaultLocalityGroup();
+      writer.append(testData.entrySet());
+      // TODO ensure compressors are returned
+    }
+
+    return testFile;
+  }
+
+  @Test
+  public void testIndependance() throws Exception {
+    // test to ensure two iterators allocated from same RFile scanner are independent.
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    SortedMap<Key,Value> testData = createTestData(10, 10, 10);
+
+    String testFile = createRFile(testData);
+
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
+    Range range1 = Range.exact(rowStr(5));
+    scanner.setRange(range1);
+    Iterator<Entry<Key,Value>> scnIter1 = scanner.iterator();
+    Iterator<Entry<Key,Value>> mapIter1 = testData.subMap(range1.getStartKey(), range1.getEndKey()).entrySet().iterator();
+
+    Range range2 = new Range(rowStr(3), true, rowStr(4), true);
+    scanner.setRange(range2);
+    Iterator<Entry<Key,Value>> scnIter2 = scanner.iterator();
+    Iterator<Entry<Key,Value>> mapIter2 = testData.subMap(range2.getStartKey(), range2.getEndKey()).entrySet().iterator();
+
+    while (scnIter1.hasNext() || scnIter2.hasNext()) {
+      if (scnIter1.hasNext()) {
+        Assert.assertTrue(mapIter1.hasNext());
+        Assert.assertEquals(scnIter1.next(), mapIter1.next());
+      } else {
+        Assert.assertFalse(mapIter1.hasNext());
+      }
+
+      if (scnIter2.hasNext()) {
+        Assert.assertTrue(mapIter2.hasNext());
+        Assert.assertEquals(scnIter2.next(), mapIter2.next());
+      } else {
+        Assert.assertFalse(mapIter2.hasNext());
+      }
+    }
+
+    Assert.assertFalse(mapIter1.hasNext());
+    Assert.assertFalse(mapIter2.hasNext());
+
+    scanner.close();
+  }
+
+  SortedMap<Key,Value> toMap(Scanner scanner) {
+    TreeMap<Key,Value> map = new TreeMap<>();
+    for (Entry<Key,Value> entry : scanner) {
+      map.put(entry.getKey(), entry.getValue());
+    }
+    return map;
+  }
+
+  @Test
+  public void testMultipleSources() throws Exception {
+    SortedMap<Key,Value> testData1 = createTestData(10, 10, 10);
+    SortedMap<Key,Value> testData2 = createTestData(0, 10, 0, 10, 10);
+
+    String testFile1 = createRFile(testData1);
+    String testFile2 = createRFile(testData2);
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    Scanner scanner = RFile.newScanner().from(testFile1, testFile2).withFileSystem(localFs).build();
+
+    TreeMap<Key,Value> expected = new TreeMap<>(testData1);
+    expected.putAll(testData2);
+
+    Assert.assertEquals(expected, toMap(scanner));
+
+    Range range = new Range(rowStr(3), true, rowStr(14), true);
+    scanner.setRange(range);
+    Assert.assertEquals(expected.subMap(range.getStartKey(), range.getEndKey()), toMap(scanner));
+
+    scanner.close();
+  }
+
+  @Test
+  public void testWriterTableProperties() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    String testFile = createTmpTestFile();
+
+    Map<String,String> props = new HashMap<>();
+    props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K");
+    props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), "1K");
+    RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).withTableProperties(props).build();
+
+    SortedMap<Key,Value> testData1 = createTestData(10, 10, 10);
+    writer.startDefaultLocalityGroup();
+    writer.append(testData1.entrySet());
+    writer.close();
+
+    Reader reader = getReader(localFs, testFile);
+    FileSKVIterator iiter = reader.getIndex();
+
+    int count = 0;
+    while (iiter.hasTop()) {
+      count++;
+      iiter.next();
+    }
+
+    // if settings are used then should create multiple index entries
+    Assert.assertTrue(count > 10);
+
+    reader.close();
+
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
+    Assert.assertEquals(testData1, toMap(scanner));
+    scanner.close();
+  }
+
+  @Test
+  public void testLocalityGroups() throws Exception {
+
+    SortedMap<Key,Value> testData1 = createTestData(0, 10, 0, 2, 10);
+    SortedMap<Key,Value> testData2 = createTestData(0, 10, 2, 1, 10);
+    SortedMap<Key,Value> defaultData = createTestData(0, 10, 3, 7, 10);
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+
+    writer.startNewLocalityGroup("z", colStr(0), colStr(1));
+    writer.append(testData1.entrySet());
+
+    writer.startNewLocalityGroup("h", colStr(2));
+    writer.append(testData2.entrySet());
+
+    writer.startDefaultLocalityGroup();
+    writer.append(defaultData.entrySet());
+
+    writer.close();
+
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
+
+    scanner.fetchColumnFamily(new Text(colStr(0)));
+    scanner.fetchColumnFamily(new Text(colStr(1)));
+    Assert.assertEquals(testData1, toMap(scanner));
+
+    scanner.clearColumns();
+    scanner.fetchColumnFamily(new Text(colStr(2)));
+    Assert.assertEquals(testData2, toMap(scanner));
+
+    scanner.clearColumns();
+    for (int i = 3; i < 10; i++) {
+      scanner.fetchColumnFamily(new Text(colStr(i)));
+    }
+    Assert.assertEquals(defaultData, toMap(scanner));
+
+    scanner.clearColumns();
+    Assert.assertEquals(createTestData(10, 10, 10), toMap(scanner));
+
+    scanner.close();
+
+    Reader reader = getReader(localFs, testFile);
+    Map<String,ArrayList<ByteSequence>> lGroups = reader.getLocalityGroupCF();
+    Assert.assertTrue(lGroups.containsKey("z"));
+    Assert.assertTrue(lGroups.get("z").size() == 2);
+    Assert.assertTrue(lGroups.get("z").contains(new ArrayByteSequence(colStr(0))));
+    Assert.assertTrue(lGroups.get("z").contains(new ArrayByteSequence(colStr(1))));
+    Assert.assertTrue(lGroups.containsKey("h"));
+    Assert.assertEquals(Arrays.asList(new ArrayByteSequence(colStr(2))), lGroups.get("h"));
+    reader.close();
+  }
+
+  @Test
+  public void testIterators() throws Exception {
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    SortedMap<Key,Value> testData = createTestData(10, 10, 10);
+    String testFile = createRFile(testData);
+
+    IteratorSetting is = new IteratorSetting(50, "regex", RegExFilter.class);
+    RegExFilter.setRegexs(is, ".*00000[78].*", null, null, null, false);
+
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
+    scanner.addScanIterator(is);
+
+    Assert.assertEquals(createTestData(7, 2, 0, 10, 10), toMap(scanner));
+
+    scanner.close();
+  }
+
+  @Test
+  public void testAuths() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+
+    Key k1 = new Key("r1", "f1", "q1", "A&B");
+    Key k2 = new Key("r1", "f1", "q2", "A");
+    Key k3 = new Key("r1", "f1", "q3");
+
+    Value v1 = new Value("p".getBytes());
+    Value v2 = new Value("c".getBytes());
+    Value v3 = new Value("t".getBytes());
+
+    writer.startDefaultLocalityGroup();
+    writer.append(k1, v1);
+    writer.append(k2, v2);
+    writer.append(k3, v3);
+    writer.close();
+
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withAuthorizations(new Authorizations("A")).build();
+    Assert.assertEquals(ImmutableMap.of(k2, v2, k3, v3), toMap(scanner));
+    Assert.assertEquals(new Authorizations("A"), scanner.getAuthorizations());
+    scanner.close();
+
+    scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withAuthorizations(new Authorizations("A", "B")).build();
+    Assert.assertEquals(ImmutableMap.of(k1, v1, k2, v2, k3, v3), toMap(scanner));
+    Assert.assertEquals(new Authorizations("A", "B"), scanner.getAuthorizations());
+    scanner.close();
+
+    scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withAuthorizations(new Authorizations("B")).build();
+    Assert.assertEquals(ImmutableMap.of(k3, v3), toMap(scanner));
+    Assert.assertEquals(new Authorizations("B"), scanner.getAuthorizations());
+    scanner.close();
+  }
+
+  @Test
+  public void testNoSystemIters() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+
+    Key k1 = new Key("r1", "f1", "q1");
+    k1.setTimestamp(3);
+
+    Key k2 = new Key("r1", "f1", "q1");
+    k2.setTimestamp(6);
+    k2.setDeleted(true);
+
+    Value v1 = new Value("p".getBytes());
+    Value v2 = new Value("".getBytes());
+
+    writer.startDefaultLocalityGroup();
+    writer.append(k2, v2);
+    writer.append(k1, v1);
+    writer.close();
+
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
+    Assert.assertFalse(scanner.iterator().hasNext());
+    scanner.close();
+
+    scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withoutSystemIterators().build();
+    Assert.assertEquals(ImmutableMap.of(k2, v2, k1, v1), toMap(scanner));
+    scanner.setRange(new Range("r2"));
+    Assert.assertFalse(scanner.iterator().hasNext());
+    scanner.close();
+  }
+
+  @Test
+  public void testBounds() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    SortedMap<Key,Value> testData = createTestData(10, 10, 10);
+    String testFile = createRFile(testData);
+
+    // set a lower bound row
+    Range bounds = new Range(rowStr(3), false, null, true);
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build();
+    Assert.assertEquals(createTestData(4, 6, 0, 10, 10), toMap(scanner));
+    scanner.close();
+
+    // set an upper bound row
+    bounds = new Range(null, false, rowStr(7), true);
+    scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build();
+    Assert.assertEquals(createTestData(8, 10, 10), toMap(scanner));
+    scanner.close();
+
+    // set row bounds
+    bounds = new Range(rowStr(3), false, rowStr(7), true);
+    scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build();
+    Assert.assertEquals(createTestData(4, 4, 0, 10, 10), toMap(scanner));
+    scanner.close();
+
+    // set a row family bound
+    bounds = Range.exact(rowStr(3), colStr(5));
+    scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build();
+    Assert.assertEquals(createTestData(3, 1, 5, 1, 10), toMap(scanner));
+    scanner.close();
+  }
+
+  @Test
+  public void testScannerTableProperties() throws Exception {
+    NewTableConfiguration ntc = new NewTableConfiguration();
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
+
+    Key k1 = new Key("r1", "f1", "q1");
+    k1.setTimestamp(3);
+
+    Key k2 = new Key("r1", "f1", "q1");
+    k2.setTimestamp(6);
+
+    Value v1 = new Value("p".getBytes());
+    Value v2 = new Value("q".getBytes());
+
+    writer.startDefaultLocalityGroup();
+    writer.append(k2, v2);
+    writer.append(k1, v1);
+    writer.close();
+
+    // pass in table config that has versioning iterator configured
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withTableProperties(ntc.getProperties()).build();
+    Assert.assertEquals(ImmutableMap.of(k2, v2), toMap(scanner));
+    scanner.close();
+
+    scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
+    Assert.assertEquals(ImmutableMap.of(k2, v2, k1, v1), toMap(scanner));
+    scanner.close();
+  }
+
+  @Test
+  public void testSampling() throws Exception {
+
+    SortedMap<Key,Value> testData1 = createTestData(1000, 2, 1);
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+
+    SamplerConfiguration sc = new SamplerConfiguration(RowSampler.class).setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "19"));
+
+    RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).withSampler(sc).build();
+    writer.startDefaultLocalityGroup();
+    writer.append(testData1.entrySet());
+    writer.close();
+
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
+    scanner.setSamplerConfiguration(sc);
+
+    RowSampler rowSampler = new RowSampler();
+    rowSampler.init(sc);
+
+    SortedMap<Key,Value> sampleData = new TreeMap<>();
+    for (Entry<Key,Value> e : testData1.entrySet()) {
+      if (rowSampler.accept(e.getKey())) {
+        sampleData.put(e.getKey(), e.getValue());
+      }
+    }
+
+    Assert.assertTrue(sampleData.size() < testData1.size());
+
+    Assert.assertEquals(sampleData, toMap(scanner));
+
+    scanner.clearSamplerConfiguration();
+
+    Assert.assertEquals(testData1, toMap(scanner));
+
+  }
+
+  @Test
+  public void testAppendScanner() throws Exception {
+    SortedMap<Key,Value> testData = createTestData(10000, 1, 1);
+    String testFile = createRFile(testData);
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
+
+    String testFile2 = createTmpTestFile();
+    RFileWriter writer = RFile.newWriter().to(testFile2).build();
+    writer.startDefaultLocalityGroup();
+    writer.append(scanner);
+    writer.close();
+    scanner.close();
+
+    scanner = RFile.newScanner().from(testFile2).withFileSystem(localFs).build();
+    Assert.assertEquals(testData, toMap(scanner));
+    scanner.close();
+  }
+
+  @Test
+  public void testCache() throws Exception {
+    SortedMap<Key,Value> testData = createTestData(10000, 1, 1);
+    String testFile = createRFile(testData);
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withIndexCache(1000000).withDataCache(10000000).build();
+
+    Random rand = new Random(5);
+
+    for (int i = 0; i < 100; i++) {
+      int r = rand.nextInt(10000);
+      scanner.setRange(new Range(rowStr(r)));
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+      Assert.assertTrue(iter.hasNext());
+      Assert.assertEquals(rowStr(r), iter.next().getKey().getRow().toString());
+      Assert.assertFalse(iter.hasNext());
+    }
+
+    scanner.close();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testOutOfOrder() throws Exception {
+    // test that exception declared in API is thrown
+    Key k1 = new Key("r1", "f1", "q1");
+    Value v1 = new Value("1".getBytes());
+
+    Key k2 = new Key("r2", "f1", "q1");
+    Value v2 = new Value("2".getBytes());
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      writer.startDefaultLocalityGroup();
+      writer.append(k2, v2);
+      writer.append(k1, v1);
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testOutOfOrderIterable() throws Exception {
+    // test that exception declared in API is thrown
+    Key k1 = new Key("r1", "f1", "q1");
+    Value v1 = new Value("1".getBytes());
+
+    Key k2 = new Key("r2", "f1", "q1");
+    Value v2 = new Value("2".getBytes());
+
+    ArrayList<Entry<Key,Value>> data = new ArrayList<>();
+    data.add(new AbstractMap.SimpleEntry<>(k2, v2));
+    data.add(new AbstractMap.SimpleEntry<>(k1, v1));
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      writer.startDefaultLocalityGroup();
+      writer.append(data);
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadVis() throws Exception {
+    // this test has two purposes ensure an exception is thrown and ensure the exception document in the javadoc is thrown
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      writer.startDefaultLocalityGroup();
+      Key k1 = new Key("r1", "f1", "q1", "(A&(B");
+      writer.append(k1, new Value("".getBytes()));
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadVisIterable() throws Exception {
+    // test append(iterable) method
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      writer.startDefaultLocalityGroup();
+      Key k1 = new Key("r1", "f1", "q1", "(A&(B");
+      Entry<Key,Value> entry = new AbstractMap.SimpleEntry<>(k1, new Value("".getBytes()));
+      writer.append(Collections.singletonList(entry));
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNoLocalityGroupStarted() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      Key k1 = new Key("r1", "f1", "q1");
+      writer.append(k1, new Value("".getBytes()));
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNoLocalityGroupStartedIterable() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      Key k1 = new Key("r1", "f1", "q1");
+      Entry<Key,Value> entry = new AbstractMap.SimpleEntry<>(k1, new Value("".getBytes()));
+      writer.append(Collections.singletonList(entry));
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDoubleStart() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      writer.startDefaultLocalityGroup();
+      writer.startDefaultLocalityGroup();
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStartAfter() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      writer.startDefaultLocalityGroup();
+      Key k1 = new Key("r1", "f1", "q1");
+      writer.append(k1, new Value("".getBytes()));
+      writer.startNewLocalityGroup("lg1", "fam1");
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalColumn() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      writer.startNewLocalityGroup("lg1", "fam1");
+      Key k1 = new Key("r1", "f1", "q1");
+      // should not be able to append the column family f1
+      writer.append(k1, new Value("".getBytes()));
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWrongGroup() throws Exception {
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+    try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
+      writer.startNewLocalityGroup("lg1", "fam1");
+      Key k1 = new Key("r1", "fam1", "q1");
+      writer.append(k1, new Value("".getBytes()));
+      writer.startDefaultLocalityGroup();
+      // should not be able to append the column family fam1 to default locality group
+      Key k2 = new Key("r1", "fam1", "q2");
+      writer.append(k2, new Value("".getBytes()));
+    }
+  }
+
+  private Reader getReader(LocalFileSystem localFs, String testFile) throws IOException {
+    Reader reader = (Reader) FileOperations.getInstance().newReaderBuilder().forFile(testFile).inFileSystem(localFs, localFs.getConf())
+        .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
+    return reader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 8db1b21..a60dfa4 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -542,35 +542,35 @@ public class RFileTest {
     try {
       trf.writer.append(nk("r0", "cf1", "cq1", "L1", 55), nv("foo1"));
       assertFalse(true);
-    } catch (IllegalStateException ioe) {
+    } catch (IllegalArgumentException ioe) {
 
     }
 
     try {
       trf.writer.append(nk("r1", "cf0", "cq1", "L1", 55), nv("foo1"));
       assertFalse(true);
-    } catch (IllegalStateException ioe) {
+    } catch (IllegalArgumentException ioe) {
 
     }
 
     try {
       trf.writer.append(nk("r1", "cf1", "cq0", "L1", 55), nv("foo1"));
       assertFalse(true);
-    } catch (IllegalStateException ioe) {
+    } catch (IllegalArgumentException ioe) {
 
     }
 
     try {
       trf.writer.append(nk("r1", "cf1", "cq1", "L0", 55), nv("foo1"));
       assertFalse(true);
-    } catch (IllegalStateException ioe) {
+    } catch (IllegalArgumentException ioe) {
 
     }
 
     try {
       trf.writer.append(nk("r1", "cf1", "cq1", "L1", 56), nv("foo1"));
       assertFalse(true);
-    } catch (IllegalStateException ioe) {
+    } catch (IllegalArgumentException ioe) {
 
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index ea1f786..28cbdc9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -36,14 +36,10 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
 import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 import org.apache.accumulo.core.iterators.system.MultiIterator;
 import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 import org.apache.accumulo.core.iterators.system.StatsIterator;
-import org.apache.accumulo.core.iterators.system.VisibilityFilter;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
@@ -185,13 +181,8 @@ class ScanDataSource implements DataSource {
 
     statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter());
 
-    DeletingIterator delIter = new DeletingIterator(statsIterator, false);
-
-    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-
-    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.getColumnSet());
-
-    VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.getAuthorizations(), options.getDefaultLabels());
+    SortedKeyValueIterator<Key,Value> visFilter = IteratorUtil.setupSystemScanIterators(statsIterator, options.getColumnSet(), options.getAuthorizations(),
+        options.getDefaultLabels());
 
     if (!loadIters) {
       return visFilter;


[2/2] accumulo git commit: ACCUMULO-4165 Added a user facing API for RFile

Posted by kt...@apache.org.
ACCUMULO-4165 Added a user facing API for RFile

Squashed commit of the following:

commit c96cda97507fc611bfdf1699ea022769072adc55
Author: Keith Turner <kt...@apache.org>
Date:   Tue May 31 17:53:20 2016 -0400

    Multiple improvements based on code review.

     * Made AccumuloFileOutputFormat use new RFile public API.
     * Moved checking of visibility vallidity to within new RFile API impl
     * Added test for error conditions and documented expected exceptions.

commit 005ebb2c0e47e4cf818c955f292c71696a4fff41
Author: Keith Turner <kt...@apache.org>
Date:   Tue May 31 14:35:18 2016 -0400

    updates based on code review comments

commit a5c6ece070fc44923758a1da4aa50849f872fdf4
Author: Keith Turner <kt...@apache.org>
Date:   Fri May 27 15:41:06 2016 -0400

    added a test

commit 911c64cd714364707e1258dcf627b151630a18bf
Author: Keith Turner <kt...@apache.org>
Date:   Fri May 27 14:38:28 2016 -0400

    ACCUMULO-4165 Added a user facing API for RFile


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/61a7de4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/61a7de4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/61a7de4a

Branch: refs/heads/1.8
Commit: 61a7de4a167571bea16f2f130ddaaa8768562148
Parents: c8621bb
Author: Keith Turner <ke...@deenlo.com>
Authored: Thu Jun 2 11:03:37 2016 -0400
Committer: Keith Turner <ke...@deenlo.com>
Committed: Thu Jun 2 11:03:37 2016 -0400

----------------------------------------------------------------------
 .../client/admin/NewTableConfiguration.java     |  14 +-
 .../core/client/admin/TableOperations.java      |   4 +-
 .../core/client/impl/OfflineIterator.java       |  14 +-
 .../client/mapred/AccumuloFileOutputFormat.java |  25 +-
 .../mapreduce/AccumuloFileOutputFormat.java     |  27 +-
 .../accumulo/core/client/rfile/FSConfArgs.java  |  47 ++
 .../accumulo/core/client/rfile/RFile.java       | 275 ++++++++
 .../core/client/rfile/RFileScanner.java         | 330 ++++++++++
 .../core/client/rfile/RFileScannerBuilder.java  | 148 +++++
 .../accumulo/core/client/rfile/RFileSource.java |  44 ++
 .../accumulo/core/client/rfile/RFileWriter.java | 236 +++++++
 .../core/client/rfile/RFileWriterBuilder.java   | 148 +++++
 .../accumulo/core/client/sample/Sampler.java    |   1 -
 .../accumulo/core/file/FileOperations.java      |  59 +-
 .../apache/accumulo/core/file/rfile/RFile.java  |   2 +-
 .../core/file/rfile/RFileOperations.java        |  45 +-
 .../accumulo/core/iterators/IteratorUtil.java   |  15 +
 .../sample/impl/SamplerConfigurationImpl.java   |  12 +
 .../accumulo/core/util/LocalityGroupUtil.java   |   2 +
 .../accumulo/core/client/rfile/RFileTest.java   | 647 +++++++++++++++++++
 .../accumulo/core/file/rfile/RFileTest.java     |  10 +-
 .../accumulo/tserver/tablet/ScanDataSource.java |  13 +-
 22 files changed, 2012 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
index 994b653..e7dc898 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
@@ -88,7 +88,7 @@ public class NewTableConfiguration {
    */
   public NewTableConfiguration setProperties(Map<String,String> prop) {
     checkArgument(prop != null, "properties is null");
-    checkDisjoint(prop, samplerConfiguration);
+    SamplerConfigurationImpl.checkDisjoint(prop, samplerConfiguration);
 
     this.properties = new HashMap<String,String>(prop);
     return this;
@@ -114,16 +114,6 @@ public class NewTableConfiguration {
     return Collections.unmodifiableMap(propertyMap);
   }
 
-  private void checkDisjoint(Map<String,String> props, SamplerConfiguration samplerConfiguration) {
-    if (props.isEmpty() || samplerConfiguration == null) {
-      return;
-    }
-
-    Map<String,String> sampleProps = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap();
-
-    checkArgument(Collections.disjoint(props.keySet(), sampleProps.keySet()), "Properties and derived sampler properties are not disjoint");
-  }
-
   /**
    * Enable building a sample data set on the new table using the given sampler configuration.
    *
@@ -131,7 +121,7 @@ public class NewTableConfiguration {
    */
   public NewTableConfiguration enableSampling(SamplerConfiguration samplerConfiguration) {
     requireNonNull(samplerConfiguration);
-    checkDisjoint(properties, samplerConfiguration);
+    SamplerConfigurationImpl.checkDisjoint(properties, samplerConfiguration);
     this.samplerConfiguration = samplerConfiguration;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index f292902..3e56736 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -31,6 +31,8 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
@@ -538,7 +540,7 @@ public interface TableOperations {
   Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
 
   /**
-   * Bulk import all the files in a directory into a table.
+   * Bulk import all the files in a directory into a table. Files can be created using {@link AccumuloFileOutputFormat} and {@link RFile#newWriter()}
    *
    * @param tableName
    *          the name of the table

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
index c5017c3..54fe4ff 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
@@ -52,11 +52,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
 import org.apache.accumulo.core.iterators.system.MultiIterator;
-import org.apache.accumulo.core.iterators.system.VisibilityFilter;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
@@ -359,18 +355,12 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
     OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations, acuTableConf, false, samplerConfImpl == null ? null
         : samplerConfImpl.toSamplerConfiguration());
 
-    DeletingIterator delIter = new DeletingIterator(multiIter, false);
-
-    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-
-    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns));
-
     byte[] defaultSecurityLabel;
-
     ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
     defaultSecurityLabel = cv.getExpression();
 
-    VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
+    SortedKeyValueIterator<Key,Value> visFilter = IteratorUtil.setupSystemScanIterators(multiIter, new HashSet<Column>(options.fetchedColumns), authorizations,
+        defaultSecurityLabel);
 
     return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList,
         options.serverSideIteratorOptions, iterEnv, false));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index 1e90e27..f2bc4cd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -17,20 +17,16 @@
 package org.apache.accumulo.core.client.mapred;
 
 import java.io.IOException;
-import java.util.Arrays;
 
 import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -163,11 +159,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
 
     final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
     final Path file = new Path(getWorkOutputPath(job), getUniqueName(job, "part") + "." + extension);
-
-    final LRUMap validVisibilities = new LRUMap(ConfiguratorBase.getVisibilityCacheSize(conf));
+    final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf);
 
     return new RecordWriter<Key,Value>() {
-      FileSKVWriter out = null;
+      RFileWriter out = null;
 
       @Override
       public void close(Reporter reporter) throws IOException {
@@ -177,17 +172,9 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
 
       @Override
       public void write(Key key, Value value) throws IOException {
-
-        Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData());
-        if (wasChecked == null) {
-          byte[] cv = key.getColumnVisibilityData().toArray();
-          new ColumnVisibility(cv);
-          validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
-        }
-
         if (out == null) {
-          out = FileOperations.getInstance().newWriterBuilder().forFile(file.toString(), file.getFileSystem(conf), conf).withTableConfiguration(acuConf)
-              .build();
+          out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf)).withTableProperties(acuConf)
+              .withVisibilityCacheSize(visCacheSize).build();
           out.startDefaultLocalityGroup();
         }
         out.append(key, value);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index b337f56..75afe2b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -17,19 +17,16 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import java.io.IOException;
-import java.util.Arrays;
 
+import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -161,11 +158,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
 
     final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
     final Path file = this.getDefaultWorkFile(context, "." + extension);
-
-    final LRUMap validVisibilities = new LRUMap(1000);
+    final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf);
 
     return new RecordWriter<Key,Value>() {
-      FileSKVWriter out = null;
+      RFileWriter out = null;
 
       @Override
       public void close(TaskAttemptContext context) throws IOException {
@@ -175,22 +171,13 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
 
       @Override
       public void write(Key key, Value value) throws IOException {
-
-        Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData());
-        if (wasChecked == null) {
-          byte[] cv = key.getColumnVisibilityData().toArray();
-          new ColumnVisibility(cv);
-          validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
-        }
-
         if (out == null) {
-          out = FileOperations.getInstance().newWriterBuilder().forFile(file.toString(), file.getFileSystem(conf), conf).withTableConfiguration(acuConf)
-              .build();
+          out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf)).withTableProperties(acuConf)
+              .withVisibilityCacheSize(visCacheSize).build();
           out.startDefaultLocalityGroup();
         }
         out.append(key, value);
       }
     };
   }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
new file mode 100644
index 0000000..1679e43
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+class FSConfArgs {
+
+  FileSystem fs;
+  Configuration conf;
+
+  FileSystem getFileSystem() throws IOException {
+    if (fs == null) {
+      fs = FileSystem.get(getConf());
+    }
+    return fs;
+  }
+
+  Configuration getConf() throws IOException {
+    if (fs != null) {
+      return fs.getConf();
+    }
+
+    if (conf == null) {
+      conf = new Configuration();
+    }
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
new file mode 100644
index 0000000..bc5995e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+/**
+ * RFile is Accumulo's internal storage format for Key Value pairs. This class is a Factory that enables creating a {@link Scanner} for reading and a
+ * {@link RFileWriter} for writing Rfiles.
+ *
+ * <p>
+ * The {@link Scanner} created by this class makes it easy to experiment with real data from a live system on a developers workstation. Also the {@link Scanner}
+ * can be used to write tools to analyze Accumulo's raw data.
+ *
+ * @since 1.8.0
+ */
+public class RFile {
+
+  /**
+   * This is an intermediate interface in a larger builder pattern. Supports setting the required input sources for reading a RFile.
+   *
+   * @since 1.8.0
+   */
+  public static interface InputArguments {
+    /**
+     * Specify RFiles to read from. When multiple inputs are specified the {@link Scanner} constructed will present a merged view.
+     *
+     * @param inputs
+     *          one or more RFiles to read.
+     * @return this
+     */
+    ScannerOptions from(RFileSource... inputs);
+
+    /**
+     * Specify RFiles to read from. When multiple are specified the {@link Scanner} constructed will present a merged view.
+     *
+     * @param files
+     *          one or more RFiles to read.
+     * @return this
+     */
+    ScannerFSOptions from(String... files);
+  }
+
+  /**
+   * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to read RFile(s) from.
+   *
+   * @since 1.8.0
+   */
+  public static interface ScannerFSOptions extends ScannerOptions {
+    /**
+     * Optionally provide a FileSystem to open RFiles. If not specified, the FileSystem will be constructed using configuration on the classpath.
+     *
+     * @param fs
+     *          use this FileSystem to open files.
+     * @return this
+     */
+    ScannerOptions withFileSystem(FileSystem fs);
+  }
+
+  /**
+   * This is an intermediate interface in a larger builder pattern. Supports setting optional parameters for reading RFile(s) and building a scanner over
+   * RFile(s).
+   *
+   * @since 1.8.0
+   */
+  public static interface ScannerOptions {
+
+    /**
+     * By default the {@link Scanner} created will setup the default Accumulo system iterators. The iterators do things like the following :
+     *
+     * <ul>
+     * <li>Suppress deleted data</li>
+     * <li>Filter based on @link {@link Authorizations}</li>
+     * <li>Filter columns specified by functions like {@link Scanner#fetchColumn(Text, Text)} and {@link Scanner#fetchColumnFamily(Text)}</li>
+     * </ul>
+     *
+     * <p>
+     * Calling this method will turn off these system iterators and allow reading the raw data in an RFile. When reading the raw data, delete data and delete
+     * markers may be seen. Delete markers are {@link Key}s with the delete flag set.
+     *
+     * <p>
+     * Disabling system iterators will cause {@link #withAuthorizations(Authorizations)}, {@link Scanner#fetchColumn(Text, Text)}, and
+     * {@link Scanner#fetchColumnFamily(Text)} to throw runtime exceptions.
+     *
+     * @return this
+     */
+    public ScannerOptions withoutSystemIterators();
+
+    /**
+     * The authorizations passed here will be used to filter Keys, from the {@link Scanner}, based on the content of the column visibility field.
+     *
+     * @param auths
+     *          scan with these authorizations
+     * @return this
+     */
+    public ScannerOptions withAuthorizations(Authorizations auths);
+
+    /**
+     * Enabling this option will cache RFiles data in memory. This option is useful when doing lots of random accesses.
+     *
+     * @param cacheSize
+     *          the size of the data cache in bytes.
+     * @return this
+     */
+    public ScannerOptions withDataCache(long cacheSize);
+
+    /**
+     * Enabling this option will cache RFiles indexes in memory. Index data within a RFile is used to find data when seeking to a {@link Key}. This option is
+     * useful when doing lots of random accesses.
+     *
+     * @param cacheSize
+     *          the size of the index cache in bytes.
+     * @return this
+     */
+    public ScannerOptions withIndexCache(long cacheSize);
+
+    /**
+     * This option allows limiting the {@link Scanner} from reading data outside of a given range. A scanner will not see any data outside of this range even if
+     * the RFile(s) have data outside the range.
+     *
+     * @return this
+     */
+    public ScannerOptions withBounds(Range range);
+
+    /**
+     * Construct the {@link Scanner} with iterators specified in a tables properties. Properties for a table can be obtained by calling
+     * {@link TableOperations#getProperties(String)}
+     *
+     * @param props
+     *          iterable over Accumulo table key value properties.
+     * @return this
+     */
+    public ScannerOptions withTableProperties(Iterable<Entry<String,String>> props);
+
+    /**
+     * @see #withTableProperties(Iterable)
+     * @param props
+     *          a map instead of an Iterable
+     * @return this
+     */
+    public ScannerOptions withTableProperties(Map<String,String> props);
+
+    /**
+     * @return a Scanner over RFile using the specified options.
+     */
+    public Scanner build();
+  }
+
+  /**
+   * Entry point for building a new {@link Scanner} over one or more RFiles.
+   */
+  public static InputArguments newScanner() {
+    return new RFileScannerBuilder();
+  }
+
+  /**
+   * This is an intermediate interface in a larger builder pattern. Supports setting the required output sink to write a RFile to.
+   *
+   * @since 1.8.0
+   */
+  public static interface OutputArguments {
+    /**
+     * @param filename
+     *          name of file to write RFile data
+     * @return this
+     */
+    public WriterFSOptions to(String filename);
+
+    /**
+     * @param out
+     *          output stream to write RFile data
+     * @return this
+     */
+    public WriterOptions to(OutputStream out);
+  }
+
+  /**
+   * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to write to.
+   *
+   * @since 1.8.0
+   */
+  public static interface WriterFSOptions extends WriterOptions {
+    /**
+     * Optionally provide a FileSystem to open a file to write a RFile. If not specified, the FileSystem will be constructed using configuration on the
+     * classpath.
+     *
+     * @param fs
+     *          use this FileSystem to open files.
+     * @return this
+     */
+    WriterOptions withFileSystem(FileSystem fs);
+  }
+
+  /**
+   * This is an intermediate interface in a larger builder pattern. Supports setting optional parameters for creating a RFile and building a RFileWriter.
+   *
+   * @since 1.8.0
+   */
+  public static interface WriterOptions {
+    /**
+     * An option to store sample data in the generated RFile.
+     *
+     * @param samplerConf
+     *          configuration to use when generating sample data.
+     * @throws IllegalArgumentException
+     *           if table properties were previously specified and the table properties also specify a sampler.
+     * @return this
+     */
+    public WriterOptions withSampler(SamplerConfiguration samplerConf);
+
+    /**
+     * Create an RFile using the same configuration as an Accumulo table. Properties for a table can be obtained by calling
+     * {@link TableOperations#getProperties(String)}
+     *
+     * @param props
+     *          iterable over Accumulo table key value properties.
+     * @throws IllegalArgumentException
+     *           if sampler was previously specified and the table properties also specify a sampler.
+     * @return this
+     */
+    public WriterOptions withTableProperties(Iterable<Entry<String,String>> props);
+
+    /**
+     * @see #withTableProperties(Iterable)
+     */
+    public WriterOptions withTableProperties(Map<String,String> props);
+
+    /**
+     * @param maxSize
+     *          As keys are added to an RFile the visibility field is validated. Validating the visibility field requires parsing it. In order to make
+     *          validation faster, previously seen visibilities are cached. This option allows setting the maximum size of this cache.
+     * @return this
+     */
+    public WriterOptions withVisibilityCacheSize(int maxSize);
+
+    /**
+     * @return a new RfileWriter created with the options previously specified.
+     */
+    public RFileWriter build() throws IOException;
+  }
+
+  /**
+   * Entry point for creating a new RFile writer.
+   */
+  public static OutputArguments newWriter() {
+    return new RFileWriterBuilder();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
new file mode 100644
index 0000000..4dfba68
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
+import org.apache.accumulo.core.client.impl.ScannerOptions;
+import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.iterators.IteratorAdapter;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+class RFileScanner extends ScannerOptions implements Scanner {
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+  private static final Range EMPTY_RANGE = new Range();
+
+  private Range range;
+  private BlockCache dataCache = null;
+  private BlockCache indexCache = null;
+  private Opts opts;
+  private int batchSize = 1000;
+  private long readaheadThreshold = 3;
+
+  private static final long CACHE_BLOCK_SIZE = AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
+
+  static class Opts {
+    InputArgs in;
+    Authorizations auths = Authorizations.EMPTY;
+    long dataCacheSize;
+    long indexCacheSize;
+    boolean useSystemIterators = true;
+    public HashMap<String,String> tableConfig;
+    Range bounds;
+  }
+
+  // This cache exist as a hack to avoid leaking decompressors. When the RFile code is not given a
+  // cache it reads blocks directly from the decompressor. However if a user does not read all data
+  // for a scan this can leave a BCFile block open and a decompressor allocated.
+  //
+  // By providing a cache to the RFile code it forces each block to be read into memory. When a
+  // block is accessed the entire thing is read into memory immediately allocating and deallocating
+  // a decompressor. If the user does not read all data, no decompressors are left allocated.
+  private static class NoopCache implements BlockCache {
+    @Override
+    public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
+      return null;
+    }
+
+    @Override
+    public CacheEntry cacheBlock(String blockName, byte[] buf) {
+      return null;
+    }
+
+    @Override
+    public CacheEntry getBlock(String blockName) {
+      return null;
+    }
+
+    @Override
+    public long getMaxSize() {
+      return Integer.MAX_VALUE;
+    }
+  }
+
+  RFileScanner(Opts opts) {
+    if (!opts.auths.equals(Authorizations.EMPTY) && !opts.useSystemIterators) {
+      throw new IllegalArgumentException("Set authorizations and specified not to use system iterators");
+    }
+
+    this.opts = opts;
+    if (opts.indexCacheSize > 0) {
+      this.indexCache = new LruBlockCache(opts.indexCacheSize, CACHE_BLOCK_SIZE);
+    } else {
+      this.indexCache = new NoopCache();
+    }
+
+    if (opts.dataCacheSize > 0) {
+      this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE);
+    } else {
+      this.dataCache = new NoopCache();
+    }
+  }
+
+  @Override
+  public synchronized void fetchColumnFamily(Text col) {
+    Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators");
+    super.fetchColumnFamily(col);
+  }
+
+  @Override
+  public synchronized void fetchColumn(Text colFam, Text colQual) {
+    Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators");
+    super.fetchColumn(colFam, colQual);
+  }
+
+  @Override
+  public void fetchColumn(IteratorSetting.Column column) {
+    Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators");
+    super.fetchColumn(column);
+  }
+
+  @Override
+  public void setClassLoaderContext(String classLoaderContext) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Deprecated
+  @Override
+  public void setTimeOut(int timeOut) {
+    if (timeOut == Integer.MAX_VALUE)
+      setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    else
+      setTimeout(timeOut, TimeUnit.SECONDS);
+  }
+
+  @Deprecated
+  @Override
+  public int getTimeOut() {
+    long timeout = getTimeout(TimeUnit.SECONDS);
+    if (timeout >= Integer.MAX_VALUE)
+      return Integer.MAX_VALUE;
+    return (int) timeout;
+  }
+
+  @Override
+  public void setRange(Range range) {
+    this.range = range;
+  }
+
+  @Override
+  public Range getRange() {
+    return range;
+  }
+
+  @Override
+  public void setBatchSize(int size) {
+    this.batchSize = size;
+  }
+
+  @Override
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  @Override
+  public void enableIsolation() {}
+
+  @Override
+  public void disableIsolation() {}
+
+  @Override
+  public synchronized void setReadaheadThreshold(long batches) {
+    Preconditions.checkArgument(batches > 0);
+    readaheadThreshold = batches;
+  }
+
+  @Override
+  public synchronized long getReadaheadThreshold() {
+    return readaheadThreshold;
+  }
+
+  @Override
+  public Authorizations getAuthorizations() {
+    return opts.auths;
+  }
+
+  @Override
+  public void addScanIterator(IteratorSetting cfg) {
+    super.addScanIterator(cfg);
+  }
+
+  @Override
+  public void removeScanIterator(String iteratorName) {
+    super.removeScanIterator(iteratorName);
+  }
+
+  @Override
+  public void updateScanIteratorOption(String iteratorName, String key, String value) {
+    super.updateScanIteratorOption(iteratorName, key, value);
+  }
+
+  private class IterEnv extends BaseIteratorEnvironment {
+    @Override
+    public IteratorScope getIteratorScope() {
+      return IteratorScope.scan;
+    }
+
+    @Override
+    public boolean isFullMajorCompaction() {
+      return false;
+    }
+
+    @Override
+    public Authorizations getAuthorizations() {
+      return opts.auths;
+    }
+
+    @Override
+    public boolean isSamplingEnabled() {
+      return RFileScanner.this.getSamplerConfiguration() != null;
+    }
+
+    @Override
+    public SamplerConfiguration getSamplerConfiguration() {
+      return RFileScanner.this.getSamplerConfiguration();
+    }
+  }
+
+  @Override
+  public Iterator<Entry<Key,Value>> iterator() {
+    try {
+      RFileSource[] sources = opts.in.getSources();
+      List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
+      for (int i = 0; i < sources.length; i++) {
+        FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
+        readers.add(new RFile.Reader(new CachableBlockFile.Reader(inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
+            AccumuloConfiguration.getDefaultConfiguration())));
+      }
+
+      if (getSamplerConfiguration() != null) {
+        for (int i = 0; i < readers.size(); i++) {
+          readers.set(i, ((Reader) readers.get(i)).getSample(new SamplerConfigurationImpl(getSamplerConfiguration())));
+        }
+      }
+
+      SortedKeyValueIterator<Key,Value> iterator;
+      if (opts.bounds != null) {
+        iterator = new MultiIterator(readers, opts.bounds);
+      } else {
+        iterator = new MultiIterator(readers, false);
+      }
+
+      Set<ByteSequence> families = Collections.emptySet();
+
+      if (opts.useSystemIterators) {
+        SortedSet<Column> cols = this.getFetchedColumns();
+        families = LocalityGroupUtil.families(cols);
+        iterator = IteratorUtil.setupSystemScanIterators(iterator, cols, getAuthorizations(), EMPTY_BYTES);
+      }
+
+      try {
+        if (opts.tableConfig != null && opts.tableConfig.size() > 0) {
+          ConfigurationCopy conf = new ConfigurationCopy(opts.tableConfig);
+          iterator = IteratorUtil.loadIterators(IteratorScope.scan, iterator, null, conf, serverSideIteratorList, serverSideIteratorOptions, new IterEnv());
+        } else {
+          iterator = IteratorUtil.loadIterators(iterator, serverSideIteratorList, serverSideIteratorOptions, new IterEnv(), false, null);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      iterator.seek(getRange() == null ? EMPTY_RANGE : getRange(), families, families.size() == 0 ? false : true);
+      return new IteratorAdapter(iterator);
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (dataCache instanceof LruBlockCache) {
+      ((LruBlockCache) dataCache).shutdown();
+    }
+
+    if (indexCache instanceof LruBlockCache) {
+      ((LruBlockCache) indexCache).shutdown();
+    }
+
+    try {
+      for (RFileSource source : opts.in.getSources()) {
+        source.getInputStream().close();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
new file mode 100644
index 0000000..92e07b4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.rfile.RFile.ScannerFSOptions;
+import org.apache.accumulo.core.client.rfile.RFile.ScannerOptions;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+class RFileScannerBuilder implements RFile.InputArguments, RFile.ScannerFSOptions, RFile.ScannerOptions {
+
+  static class InputArgs extends FSConfArgs {
+    private Path[] paths;
+    private RFileSource[] sources;
+
+    InputArgs(String... files) {
+      this.paths = new Path[files.length];
+      for (int i = 0; i < files.length; i++) {
+        this.paths[i] = new Path(files[i]);
+      }
+    }
+
+    InputArgs(RFileSource... sources) {
+      this.sources = sources;
+    }
+
+    RFileSource[] getSources() throws IOException {
+      if (sources == null) {
+        sources = new RFileSource[paths.length];
+        for (int i = 0; i < paths.length; i++) {
+          sources[i] = new RFileSource(getFileSystem().open(paths[i]), getFileSystem().getFileStatus(paths[i]).getLen());
+        }
+      } else {
+        for (int i = 0; i < sources.length; i++) {
+          if (!(sources[i].getInputStream() instanceof FSDataInputStream)) {
+            sources[i] = new RFileSource(new FSDataInputStream(sources[i].getInputStream()), sources[i].getLength());
+          }
+        }
+      }
+
+      return sources;
+    }
+  }
+
+  private RFileScanner.Opts opts = new RFileScanner.Opts();
+
+  @Override
+  public ScannerOptions withoutSystemIterators() {
+    opts.useSystemIterators = false;
+    return this;
+  }
+
+  @Override
+  public ScannerOptions withAuthorizations(Authorizations auths) {
+    Objects.requireNonNull(auths);
+    opts.auths = auths;
+    return this;
+  }
+
+  @Override
+  public ScannerOptions withDataCache(long cacheSize) {
+    Preconditions.checkArgument(cacheSize > 0);
+    opts.dataCacheSize = cacheSize;
+    return this;
+  }
+
+  @Override
+  public ScannerOptions withIndexCache(long cacheSize) {
+    Preconditions.checkArgument(cacheSize > 0);
+    opts.indexCacheSize = cacheSize;
+    return this;
+  }
+
+  @Override
+  public Scanner build() {
+    return new RFileScanner(opts);
+  }
+
+  @Override
+  public ScannerOptions withFileSystem(FileSystem fs) {
+    Objects.requireNonNull(fs);
+    opts.in.fs = fs;
+    return this;
+  }
+
+  @Override
+  public ScannerOptions from(RFileSource... inputs) {
+    opts.in = new InputArgs(inputs);
+    return this;
+  }
+
+  @Override
+  public ScannerFSOptions from(String... files) {
+    opts.in = new InputArgs(files);
+    return this;
+  }
+
+  @Override
+  public ScannerOptions withTableProperties(Iterable<Entry<String,String>> tableConfig) {
+    Objects.requireNonNull(tableConfig);
+    this.opts.tableConfig = new HashMap<>();
+    for (Entry<String,String> entry : tableConfig) {
+      this.opts.tableConfig.put(entry.getKey(), entry.getValue());
+    }
+    return this;
+  }
+
+  @Override
+  public ScannerOptions withTableProperties(Map<String,String> tableConfig) {
+    Objects.requireNonNull(tableConfig);
+    this.opts.tableConfig = new HashMap<>(tableConfig);
+    return this;
+  }
+
+  @Override
+  public ScannerOptions withBounds(Range range) {
+    Objects.requireNonNull(range);
+    this.opts.bounds = range;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java
new file mode 100644
index 0000000..21298c3
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.InputStream;
+
+/**
+ * RFile metadata is stored at the end of the file. Inorder to read an RFile, its length must be known. This provides a way to pass an InputStream and length
+ * for reading an RFile.
+ *
+ * @since 1.8.0
+ */
+public class RFileSource {
+  private final InputStream in;
+  private final long len;
+
+  public RFileSource(InputStream in, long len) {
+    this.in = in;
+    this.len = len;
+  }
+
+  public InputStream getInputStream() {
+    return in;
+  }
+
+  public long getLength() {
+    return len;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
new file mode 100644
index 0000000..aad4908
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.collections.map.LRUMap;
+
+import com.google.common.base.Preconditions;
+
+//formatter was adding spaced that checkstyle did not like, so turned off formatter
+//@formatter:off
+/**
+ * This class provides an API for writing RFiles. It can be used to create file for bulk import into Accumulo using
+ * {@link TableOperations#importDirectory(String, String, String, boolean)}
+ *
+ * <p>
+ * A RFileWriter has the following constraints. Violating these contraints will result in runtime exceptions.
+ *
+ * <ul>
+ * <li>Before appending any keys, a locality group must be started by calling one of the startNewLocalityGroup functions or startDefaultLocalityGroup.</li>
+ * <li>Keys must be appended in sorted order within a locality group.</li>
+ * <li>Locality groups must have a mutually exclusive set of column families.</li>
+ * <li>The default locality group must be started last.</li>
+ * </ul>
+ *
+ *
+ * <p>
+ * Below is an example of using RFileWriter
+ *
+ * <p>
+ *
+ * <pre>
+ * {@code
+ *    Iterable<Entry<Key, Value>> localityGroup1Data = ...
+ *    Iterable<Entry<Key, Value>> localityGroup2Data = ...
+ *    Iterable<Entry<Key, Value>> defaultGroupData = ...
+ *
+ *     try(RFileWriter writer = RFile.newWriter().to(file).build()){
+ *
+ *       //Start a locality group before appending data.
+ *       writer.startNewLocalityGroup("groupA", "columnFam1", "columnFam2");
+ *       //Append data to the locality group that was started above.  Must append in sorted order.
+ *       writer.append(localityGroup1Data);
+ *
+ *       //Add another locality group.
+ *       writer.startNewLocalityGroup("groupB", "columnFam3", "columnFam4");
+ *       writer.append(localityGroup2Data);
+ *
+ *       //The default locality group must be started last.  The column families for the default group do not need to be specified.
+ *       writer.startDefaultLocalityGroup();
+ *       //Data appended here can not contain any column families specified in previous locality groups.
+ *       writer.append(defaultGroupData);
+ *
+ *       //This is a try-with-resources so the writer is closed here at the end of the code block.
+ *     }
+ * }
+ * </pre>
+ *
+ * <p>
+ * Create instances by calling {@link RFile#newWriter()}
+ *
+ * @since 1.8.0
+ */
+// @formatter:on
+public class RFileWriter implements AutoCloseable {
+
+  private FileSKVWriter writer;
+  private final LRUMap validVisibilities;
+  private boolean startedLG;
+  private boolean startedDefaultLG;
+
+  RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) {
+    this.writer = fileSKVWriter;
+    this.validVisibilities = new LRUMap(visCacheSize);
+  }
+
+  private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
+    Preconditions.checkState(!startedDefaultLG, "Cannont start a locality group after starting the default locality group");
+    writer.startNewLocalityGroup(name, columnFamilies);
+    startedLG = true;
+  }
+
+  /**
+   * Before appending any data, a locality group must be started. The default locality group must be started last.
+   *
+   * @param name
+   *          locality group name, used for informational purposes
+   * @param families
+   *          the column families the locality group can contain
+   *
+   * @throws IllegalStateException
+   *           When default locality group already started.
+   */
+  public void startNewLocalityGroup(String name, List<byte[]> families) throws IOException {
+    HashSet<ByteSequence> fams = new HashSet<ByteSequence>();
+    for (byte[] family : families) {
+      fams.add(new ArrayByteSequence(family));
+    }
+    _startNewLocalityGroup(name, fams);
+  }
+
+  /**
+   * See have doc for {@link #startNewLocalityGroup(String, List)}
+   */
+  public void startNewLocalityGroup(String name, byte[]... families) throws IOException {
+    startNewLocalityGroup(name, Arrays.asList(families));
+  }
+
+  /**
+   * See have doc for {@link #startNewLocalityGroup(String, List)}.
+   *
+   * @param families
+   *          will be encoded using UTF-8
+   *
+   * @throws IllegalStateException
+   *           When default locality group already started.
+   */
+  public void startNewLocalityGroup(String name, Set<String> families) throws IOException {
+    HashSet<ByteSequence> fams = new HashSet<ByteSequence>();
+    for (String family : families) {
+      fams.add(new ArrayByteSequence(family));
+    }
+    _startNewLocalityGroup(name, fams);
+  }
+
+  /**
+   * See have doc for {@link #startNewLocalityGroup(String, List)}.
+   *
+   * @param families
+   *          will be encoded using UTF-8
+   *
+   * @throws IllegalStateException
+   *           When default locality group already started.
+   */
+  public void startNewLocalityGroup(String name, String... families) throws IOException {
+    HashSet<ByteSequence> fams = new HashSet<ByteSequence>();
+    for (String family : families) {
+      fams.add(new ArrayByteSequence(family));
+    }
+    _startNewLocalityGroup(name, fams);
+  }
+
+  /**
+   * A locality group in which the column families do not need to specified. The locality group must be started after all other locality groups. Can not append
+   * column families that were in a previous locality group.
+   *
+   * @throws IllegalStateException
+   *           When default locality group already started.
+   */
+
+  public void startDefaultLocalityGroup() throws IOException {
+    Preconditions.checkState(!startedDefaultLG);
+    writer.startDefaultLocalityGroup();
+    startedDefaultLG = true;
+    startedLG = true;
+  }
+
+  /**
+   * Append the key and value to the last locality group that was started.
+   *
+   * @param key
+   *          This key must be greater than or equal to the last key appended. For non-default locality groups, the keys column family must be one of the column
+   *          families specified when calling startNewLocalityGroup(). Must be non-null.
+   * @param val
+   *          value to append, must be non-null.
+   *
+   * @throws IllegalArgumentException
+   *           This is thrown when data is appended out of order OR when the key contains a invalid visibility OR when a column family is not valid for a
+   *           locality group.
+   * @throws IllegalStateException
+   *           Thrown when no locality group was started.
+   */
+  public void append(Key key, Value val) throws IOException {
+    Preconditions.checkState(startedLG, "No locality group was started");
+    Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData());
+    if (wasChecked == null) {
+      byte[] cv = key.getColumnVisibilityData().toArray();
+      new ColumnVisibility(cv);
+      validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
+    }
+    writer.append(key, val);
+  }
+
+  /**
+   * Append the keys and values to the last locality group that was started.
+   *
+   * @param keyValues
+   *          The keys must be in sorted order. The first key returned by the iterable must be greater than or equal to the last key appended. For non-default
+   *          locality groups, the keys column family must be one of the column families specified when calling startNewLocalityGroup(). Must be non-null.
+   *
+   * @throws IllegalArgumentException
+   *           This is thrown when data is appended out of order OR when the key contains a invalid visibility OR when a column family is not valid for a
+   *           locality group.
+   * @throws IllegalStateException
+   *           When no locality group was started.
+   */
+  public void append(Iterable<Entry<Key,Value>> keyValues) throws IOException {
+    Preconditions.checkState(startedLG, "No locality group was started");
+    for (Entry<Key,Value> entry : keyValues) {
+      append(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
new file mode 100644
index 0000000..e4a141c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions;
+import org.apache.accumulo.core.client.rfile.RFile.WriterOptions;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions, RFile.WriterOptions {
+
+  private static class OutputArgs extends FSConfArgs {
+    private Path path;
+    private OutputStream out;
+
+    OutputArgs(String filename) {
+      this.path = new Path(filename);
+    }
+
+    OutputArgs(OutputStream out) {
+      this.out = out;
+    }
+
+    OutputStream getOutputStream() {
+      return out;
+    }
+  }
+
+  private OutputArgs out;
+  private SamplerConfiguration sampler = null;
+  private Map<String,String> tableConfig = Collections.emptyMap();
+  private int visCacheSize = 1000;
+
+  @Override
+  public WriterOptions withSampler(SamplerConfiguration samplerConf) {
+    Objects.requireNonNull(samplerConf);
+    SamplerConfigurationImpl.checkDisjoint(tableConfig, samplerConf);
+    this.sampler = samplerConf;
+    return this;
+  }
+
+  @Override
+  public RFileWriter build() throws IOException {
+    FileOperations fileops = FileOperations.getInstance();
+    AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
+    HashMap<String,String> userProps = new HashMap<>();
+    if (sampler != null) {
+      userProps.putAll(new SamplerConfigurationImpl(sampler).toTablePropertiesMap());
+    }
+    userProps.putAll(tableConfig);
+
+    if (userProps.size() > 0) {
+      acuconf = new ConfigurationCopy(Iterables.concat(acuconf, userProps.entrySet()));
+    }
+
+    if (out.getOutputStream() != null) {
+      FSDataOutputStream fsdo;
+      if (out.getOutputStream() instanceof FSDataOutputStream) {
+        fsdo = (FSDataOutputStream) out.getOutputStream();
+      } else {
+        fsdo = new FSDataOutputStream(out.getOutputStream(), new FileSystem.Statistics("foo"));
+      }
+      return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf).build(), visCacheSize);
+    } else {
+      return new RFileWriter(fileops.newWriterBuilder().forFile(out.path.toString(), out.getFileSystem(), out.getConf()).withTableConfiguration(acuconf)
+          .build(), visCacheSize);
+    }
+  }
+
+  @Override
+  public WriterOptions withFileSystem(FileSystem fs) {
+    Objects.requireNonNull(fs);
+    out.fs = fs;
+    return this;
+  }
+
+  @Override
+  public WriterFSOptions to(String filename) {
+    Objects.requireNonNull(filename);
+    this.out = new OutputArgs(filename);
+    return this;
+  }
+
+  @Override
+  public WriterOptions to(OutputStream out) {
+    Objects.requireNonNull(out);
+    this.out = new OutputArgs(out);
+    return this;
+  }
+
+  @Override
+  public WriterOptions withTableProperties(Iterable<Entry<String,String>> tableConfig) {
+    Objects.requireNonNull(tableConfig);
+    HashMap<String,String> cfg = new HashMap<>();
+    for (Entry<String,String> entry : tableConfig) {
+      cfg.put(entry.getKey(), entry.getValue());
+    }
+
+    SamplerConfigurationImpl.checkDisjoint(cfg, sampler);
+    this.tableConfig = cfg;
+    return this;
+  }
+
+  @Override
+  public WriterOptions withTableProperties(Map<String,String> tableConfig) {
+    Objects.requireNonNull(tableConfig);
+    return withTableProperties(tableConfig.entrySet());
+  }
+
+  @Override
+  public WriterOptions withVisibilityCacheSize(int maxSize) {
+    Preconditions.checkArgument(maxSize > 0);
+    this.visCacheSize = maxSize;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java b/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java
index 03bd9d7..8b4db95 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java
@@ -36,7 +36,6 @@ import org.apache.accumulo.core.data.Key;
  *
  * @since 1.8.0
  */
-
 public interface Sampler {
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 314bbae..4724bbe 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 
 public abstract class FileOperations {
@@ -76,7 +77,7 @@ public abstract class FileOperations {
    * </pre>
    */
   public NeedsFile<GetFileSizeOperationBuilder> getFileSize() {
-    return (NeedsFile<GetFileSizeOperationBuilder>) new GetFileSizeOperation();
+    return new GetFileSizeOperation();
   }
 
   /**
@@ -92,8 +93,8 @@ public abstract class FileOperations {
    *     .build();
    * </pre>
    */
-  public NeedsFile<OpenWriterOperationBuilder> newWriterBuilder() {
-    return (NeedsFile<OpenWriterOperationBuilder>) new OpenWriterOperation();
+  public NeedsFileOrOuputStream<OpenWriterOperationBuilder> newWriterBuilder() {
+    return new OpenWriterOperation();
   }
 
   /**
@@ -110,7 +111,7 @@ public abstract class FileOperations {
    * </pre>
    */
   public NeedsFile<OpenIndexOperationBuilder> newIndexReaderBuilder() {
-    return (NeedsFile<OpenIndexOperationBuilder>) new OpenIndexOperation();
+    return new OpenIndexOperation();
   }
 
   /**
@@ -150,7 +151,7 @@ public abstract class FileOperations {
    * </pre>
    */
   public NeedsFile<OpenReaderOperationBuilder> newReaderBuilder() {
-    return (NeedsFile<OpenReaderOperationBuilder>) new OpenReaderOperation();
+    return new OpenReaderOperation();
   }
 
   //
@@ -203,6 +204,10 @@ public abstract class FileOperations {
       return (SubclassType) this;
     }
 
+    protected void setFilename(String filename) {
+      this.filename = filename;
+    }
+
     public String getFilename() {
       return filename;
     }
@@ -211,6 +216,10 @@ public abstract class FileOperations {
       return fs;
     }
 
+    protected void setConfiguration(Configuration fsConf) {
+      this.fsConf = fsConf;
+    }
+
     public Configuration getConfiguration() {
       return fsConf;
     }
@@ -239,6 +248,7 @@ public abstract class FileOperations {
    */
   protected class GetFileSizeOperation extends FileAccessOperation<GetFileSizeOperation> implements GetFileSizeOperationBuilder {
     /** Return the size of the file. */
+    @Override
     public long execute() throws IOException {
       validate();
       return getFileSize(this);
@@ -278,9 +288,20 @@ public abstract class FileOperations {
   /**
    * Operation object for constructing a writer.
    */
-  protected class OpenWriterOperation extends FileIOOperation<OpenWriterOperation> implements OpenWriterOperationBuilder {
+  protected class OpenWriterOperation extends FileIOOperation<OpenWriterOperation> implements OpenWriterOperationBuilder,
+      NeedsFileOrOuputStream<OpenWriterOperationBuilder> {
     private String compression;
+    private FSDataOutputStream outputStream;
 
+    @Override
+    public NeedsTableConfiguration<OpenWriterOperationBuilder> forOutputStream(String extenstion, FSDataOutputStream outputStream, Configuration fsConf) {
+      this.outputStream = outputStream;
+      setConfiguration(fsConf);
+      setFilename("foo" + extenstion);
+      return this;
+    }
+
+    @Override
     public OpenWriterOperation withCompression(String compression) {
       this.compression = compression;
       return this;
@@ -290,6 +311,21 @@ public abstract class FileOperations {
       return compression;
     }
 
+    public FSDataOutputStream getOutputStream() {
+      return outputStream;
+    }
+
+    @Override
+    protected void validate() {
+      if (outputStream == null) {
+        super.validate();
+      } else {
+        Objects.requireNonNull(getConfiguration());
+        Objects.requireNonNull(getTableConfiguration());
+      }
+    }
+
+    @Override
     public FileSKVWriter build() throws IOException {
       validate();
       return openWriter(this);
@@ -359,6 +395,7 @@ public abstract class FileOperations {
    * Operation object for opening an index.
    */
   protected class OpenIndexOperation extends FileReaderOperation<OpenIndexOperation> implements OpenIndexOperationBuilder {
+    @Override
     public FileSKVIterator build() throws IOException {
       validate();
       return openIndex(this);
@@ -378,6 +415,7 @@ public abstract class FileOperations {
     private boolean inclusive;
 
     /** Set the range over which the constructed iterator will search. */
+    @Override
     public OpenScanReaderOperation overRange(Range range, Set<ByteSequence> columnFamilies, boolean inclusive) {
       this.range = range;
       this.columnFamilies = columnFamilies;
@@ -407,6 +445,7 @@ public abstract class FileOperations {
     }
 
     /** Execute the operation, constructing a scan iterator. */
+    @Override
     public FileSKVIterator build() throws IOException {
       validate();
       return openScanReader(this);
@@ -427,11 +466,13 @@ public abstract class FileOperations {
     /**
      * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to {@code seekToBeginning(true)}.
      */
+    @Override
     public OpenReaderOperation seekToBeginning() {
       return seekToBeginning(true);
     }
 
     /** If true, seek the constructed iterator to the beginning of its domain before returning. */
+    @Override
     public OpenReaderOperation seekToBeginning(boolean seekToBeginning) {
       this.seekToBeginning = seekToBeginning;
       return this;
@@ -442,6 +483,7 @@ public abstract class FileOperations {
     }
 
     /** Execute the operation, constructing the specified file reader. */
+    @Override
     public FileSKVIterator build() throws IOException {
       validate();
       return openReader(this);
@@ -473,6 +515,11 @@ public abstract class FileOperations {
     public NeedsFileSystem<ReturnType> forFile(String filename);
   }
 
+  public static interface NeedsFileOrOuputStream<ReturnType> extends NeedsFile<ReturnType> {
+    /** Specify the file this operation should apply to. */
+    public NeedsTableConfiguration<ReturnType> forOutputStream(String extenstion, FSDataOutputStream out, Configuration fsConf);
+  }
+
   /**
    * Type wrapper to ensure that {@code inFileSystem(...)} is called before other methods.
    */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 981a2e6..f6269e7 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -412,7 +412,7 @@ public class RFile {
     public void append(Key key, Value value) throws IOException {
 
       if (key.compareTo(prevKey) < 0) {
-        throw new IllegalStateException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
+        throw new IllegalArgumentException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
       }
 
       currentLocalityGroup.updateColumnCount(key);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 5d15973..96d31ce 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -77,21 +78,8 @@ public class RFileOperations extends FileOperations {
 
   @Override
   protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
-    Configuration conf = options.getConfiguration();
-    AccumuloConfiguration acuconf = options.getTableConfiguration();
 
-    int hrep = conf.getInt("dfs.replication", -1);
-    int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
-    int rep = hrep;
-    if (trep > 0 && trep != hrep) {
-      rep = trep;
-    }
-    long hblock = conf.getLong("dfs.block.size", 1 << 26);
-    long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
-    long block = hblock;
-    if (tblock > 0)
-      block = tblock;
-    int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+    AccumuloConfiguration acuconf = options.getTableConfiguration();
 
     long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
     long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
@@ -106,11 +94,32 @@ public class RFileOperations extends FileOperations {
     String compression = options.getCompression();
     compression = compression == null ? options.getTableConfiguration().get(Property.TABLE_FILE_COMPRESSION_TYPE) : compression;
 
-    String file = options.getFilename();
-    FileSystem fs = options.getFileSystem();
+    FSDataOutputStream outputStream = options.getOutputStream();
+
+    Configuration conf = options.getConfiguration();
+
+    if (outputStream == null) {
+      int hrep = conf.getInt("dfs.replication", -1);
+      int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
+      int rep = hrep;
+      if (trep > 0 && trep != hrep) {
+        rep = trep;
+      }
+      long hblock = conf.getLong("dfs.block.size", 1 << 26);
+      long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
+      long block = hblock;
+      if (tblock > 0)
+        block = tblock;
+      int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+
+      String file = options.getFilename();
+      FileSystem fs = options.getFileSystem();
+
+      outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
+    }
 
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block),
-        options.getRateLimiter()), compression, conf, acuconf);
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(outputStream, options.getRateLimiter()), compression, conf,
+        acuconf);
 
     RFile.Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
     return writer;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
index 97e4f5c..8188ba3 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
@@ -35,12 +35,19 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
+import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
 import org.apache.accumulo.core.iterators.system.SynchronizedIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
@@ -384,4 +391,12 @@ public class IteratorUtil {
     }
     return toIteratorSettings(ic);
   }
+
+  public static SortedKeyValueIterator<Key,Value> setupSystemScanIterators(SortedKeyValueIterator<Key,Value> source, Set<Column> cols, Authorizations auths,
+      byte[] defaultVisibility) throws IOException {
+    DeletingIterator delIter = new DeletingIterator(source, false);
+    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, cols);
+    return new VisibilityFilter(colFilter, auths, defaultVisibility);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
index f0bd528..8abf4e7 100644
--- a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.accumulo.core.sample.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -169,6 +171,16 @@ public class SamplerConfigurationImpl implements Writable {
     return className + " " + options;
   }
 
+  public static void checkDisjoint(Map<String,String> props, SamplerConfiguration samplerConfiguration) {
+    if (props.isEmpty() || samplerConfiguration == null) {
+      return;
+    }
+
+    Map<String,String> sampleProps = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap();
+
+    checkArgument(Collections.disjoint(props.keySet(), sampleProps.keySet()), "Properties and derived sampler properties are not disjoint");
+  }
+
   public static TSamplerConfiguration toThrift(SamplerConfiguration samplerConfig) {
     if (samplerConfig == null)
       return null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index 07757a6..a5255c7 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@ -49,6 +49,8 @@ public class LocalityGroupUtil {
   public static final Set<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
 
   public static Set<ByteSequence> families(Collection<Column> columns) {
+    if (columns.size() == 0)
+      return EMPTY_CF_SET;
     Set<ByteSequence> result = new HashSet<ByteSequence>(columns.size());
     for (Column col : columns) {
       result.add(new ArrayByteSequence(col.getColumnFamily()));