You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/09/21 15:51:31 UTC

[6/7] accumulo git commit: ACCUMULO-3913 Added per table sampling

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/compaction/NullType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/NullType.java b/core/src/main/java/org/apache/accumulo/core/compaction/NullType.java
new file mode 100644
index 0000000..fb4c452
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/NullType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.compaction;
+
+import com.google.common.base.Preconditions;
+
+public class NullType implements Type {
+  @Override
+  public String convert(String str) {
+    Preconditions.checkArgument(str == null);
+    return "";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 5bd5c8a..400577c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -514,6 +514,16 @@ public enum Property {
   @Experimental
   TABLE_VOLUME_CHOOSER("table.volume.chooser", "org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME,
       "The class that will be used to select which volume will be used to create new files for this table."),
+  TABLE_SAMPLER(
+      "table.sampler",
+      "",
+      PropertyType.CLASSNAME,
+      "The name of a class that implements org.apache.accumulo.core.Sampler.  Setting this option enables storing a sample of data which can be scanned."
+          + "  Always having a current sample can useful for query optimization and data comprehension.   After enabling sampling for an existing table, a compaction "
+          + "is needed to compute the sample for existing data.  The compact command in the shell has an option to only compact files without sample data."),
+  TABLE_SAMPLER_OPTS("table.sampler.opt.", null, PropertyType.PREFIX,
+      "The property is used to set options for a sampler.  If a sample had two options like hasher and modulous, then the two properties "
+          + "table.sampler.opt.hasher=${hash algorithm} and table.sampler.opt.modulous=${mod} would be set."),
 
   // VFS ClassLoader properties
   VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY(AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY, "", PropertyType.STRING,
@@ -776,7 +786,7 @@ public enum Property {
     return validTableProperties.contains(key) || key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())
         || key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())
         || key.startsWith(Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey()) || key.startsWith(Property.TABLE_REPLICATION_TARGET.getKey())
-        || key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey());
+        || key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey()) || key.startsWith(TABLE_SAMPLER_OPTS.getKey());
   }
 
   private static final EnumSet<Property> fixedProperties = EnumSet.of(Property.TSERV_CLIENTPORT, Property.TSERV_NATIVEMAP_ENABLED,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index a5bea83..758df12 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.file.keyfunctor.KeyFunctor;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -424,6 +425,11 @@ public class BloomFilterLayer {
       reader.setInterruptFlag(flag);
     }
 
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      return new BloomFilterLayer.Reader(reader.getSample(sampleConfig), bfl);
+    }
+
   }
 
   public static void main(String[] args) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
index 60970e2..3713453 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
@@ -21,14 +21,17 @@ import java.io.IOException;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 
-public interface FileSKVIterator extends InterruptibleIterator {
+public interface FileSKVIterator extends InterruptibleIterator, AutoCloseable {
   Key getFirstKey() throws IOException;
 
   Key getLastKey() throws IOException;
 
   DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException;
 
+  FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig);
+
   void closeDeepCopies() throws IOException;
 
   void close() throws IOException;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index fb2762f..75cfa7e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.MapFileIterator;
 import org.apache.accumulo.core.iterators.system.SequenceFileIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -132,6 +133,11 @@ public class MapFileOperations extends FileOperations {
     public void setInterruptFlag(AtomicBoolean flag) {
       ((FileSKVIterator) reader).setInterruptFlag(flag);
     }
+
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      return ((FileSKVIterator) reader).getSample(sampleConfig);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
index f220a58..01af184 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.HeapIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 
 class MultiIndexIterator extends HeapIterator implements FileSKVIterator {
 
@@ -93,4 +94,9 @@ class MultiIndexIterator extends HeapIterator implements FileSKVIterator {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+    throw new UnsupportedOperationException();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index 2109478..1a383e4 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -265,7 +265,7 @@ public class MultiLevelIndex {
 
     public void readFields(DataInput in, int version) throws IOException {
 
-      if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
+      if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7 || version == RFile.RINDEX_VER_8) {
         level = in.readInt();
         offset = in.readInt();
         hasNext = in.readBoolean();
@@ -736,7 +736,7 @@ public class MultiLevelIndex {
 
       size = 0;
 
-      if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
+      if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7 || version == RFile.RINDEX_VER_8) {
         size = in.readInt();
       }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index 5a3e911..4631a4d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.start.spi.KeywordExecutable;
@@ -54,6 +55,8 @@ public class PrintInfo implements KeywordExecutable {
     boolean hash = false;
     @Parameter(names = {"--histogram"}, description = "print a histogram of the key-value sizes")
     boolean histogram = false;
+    @Parameter(names = {"--useSample"}, description = "Use sample data for --dump, --vis, --histogram options")
+    boolean useSample = false;
     @Parameter(description = " <file> { <file> ... }")
     List<String> files = new ArrayList<String>();
     @Parameter(names = {"-c", "--config"}, variableArity = true, description = "Comma-separated Hadoop configuration files")
@@ -119,14 +122,27 @@ public class PrintInfo implements KeywordExecutable {
       if (opts.histogram || opts.dump || opts.vis || opts.hash) {
         localityGroupCF = iter.getLocalityGroupCF();
 
+        FileSKVIterator dataIter = iter;
+        if (opts.useSample) {
+          dataIter = iter.getSample();
+
+          if (dataIter == null) {
+            System.out.println("ERROR : This rfile has no sample data");
+            return;
+          }
+        }
+
         for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) {
 
-          iter.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
-          while (iter.hasTop()) {
-            Key key = iter.getTopKey();
-            Value value = iter.getTopValue();
-            if (opts.dump)
+          dataIter.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
+          while (dataIter.hasTop()) {
+            Key key = dataIter.getTopKey();
+            Value value = dataIter.getTopValue();
+            if (opts.dump) {
               System.out.println(key + " -> " + value);
+              if (System.out.checkError())
+                return;
+            }
             if (opts.histogram) {
               long size = key.getSize() + value.getSize();
               int bucket = (int) Math.log10(size);
@@ -134,7 +150,7 @@ public class PrintInfo implements KeywordExecutable {
               sizeBuckets[bucket] += size;
               totalSize += size;
             }
-            iter.next();
+            dataIter.next();
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 54b01b4..9564f0b 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -36,6 +36,8 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ArrayByteSequence;
@@ -62,12 +64,17 @@ import org.apache.accumulo.core.iterators.system.HeapIterator;
 import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
 import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 public class RFile {
 
   public static final String EXTENSION = "rf";
@@ -77,15 +84,38 @@ public class RFile {
   private RFile() {}
 
   private static final int RINDEX_MAGIC = 0x20637474;
-  static final int RINDEX_VER_7 = 7;
-  static final int RINDEX_VER_6 = 6;
+
+  static final int RINDEX_VER_8 = 8; // Added sample storage. There is a sample locality group for each locality group. Sample are built using a Sampler and
+                                     // sampler configuration. The Sampler and its configuration are stored in RFile. Persisting the method of producing the
+                                     // sample allows a user of RFile to determine if the sample is useful.
+  static final int RINDEX_VER_7 = 7; // Added support for prefix encoding and encryption. Before this change only exact matches within a key field were deduped
+                                     // for consecutive keys. After this change, if consecutive key fields have the same prefix then the prefix is only stored
+                                     // once.
+  static final int RINDEX_VER_6 = 6; // Added support for multilevel indexes. Before this the index was one list with an entry for each data block. For large
+                                     // files, a large index needed to be read into memory before any seek could be done. After this change the index is a fat
+                                     // tree, and opening a large rfile is much faster. Like the previous version of Rfile, each index node in the tree is kept
+                                     // in memory serialized and used in its serialized form.
   // static final int RINDEX_VER_5 = 5; // unreleased
-  static final int RINDEX_VER_4 = 4;
-  static final int RINDEX_VER_3 = 3;
+  static final int RINDEX_VER_4 = 4; // Added support for seeking using serialized indexes. After this change index is no longer deserialized when rfile opened.
+                                     // Entire serialized index is read into memory as single byte array. For seeks, serialized index is used to find blocks
+                                     // (the binary search deserializes the specific entries its needs). This resulted in less memory usage (no object overhead)
+                                     // and faster open times for RFiles.
+  static final int RINDEX_VER_3 = 3; // Initial released version of RFile. R is for relative encoding. A keys is encoded relative to the previous key. The
+                                     // initial version deduped key fields that were the same for consecutive keys. For sorted data this is a common occurrence.
+                                     // This version supports locality groups. Each locality group has an index pointing to set of data blocks. Each data block
+                                     // contains relatively encoded keys and values.
+
+  // Buffer sample data so that many sample data blocks are stored contiguously.
+  private static int sampleBufferSize = 10000000;
+
+  @VisibleForTesting
+  public static void setSampleBufferSize(int bufferSize) {
+    sampleBufferSize = bufferSize;
+  }
 
   private static class LocalityGroupMetadata implements Writable {
 
-    private int startBlock;
+    private int startBlock = -1;
     private Key firstKey;
     private Map<ByteSequence,MutableLong> columnFamilies;
 
@@ -95,14 +125,15 @@ public class RFile {
 
     private MultiLevelIndex.BufferedWriter indexWriter;
     private MultiLevelIndex.Reader indexReader;
+    private int version;
 
     public LocalityGroupMetadata(int version, BlockFileReader br) {
       columnFamilies = new HashMap<ByteSequence,MutableLong>();
       indexReader = new MultiLevelIndex.Reader(br, version);
+      this.version = version;
     }
 
-    public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
-      this.startBlock = nextBlock;
+    public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
       isDefaultLG = true;
       columnFamilies = new HashMap<ByteSequence,MutableLong>();
       previousColumnFamilies = pcf;
@@ -110,8 +141,7 @@ public class RFile {
       indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
     }
 
-    public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int nextBlock, int indexBlockSize, BlockFileWriter bfw) {
-      this.startBlock = nextBlock;
+    public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int indexBlockSize, BlockFileWriter bfw) {
       this.name = name;
       isDefaultLG = false;
       columnFamilies = new HashMap<ByteSequence,MutableLong>();
@@ -181,7 +211,9 @@ public class RFile {
         name = in.readUTF();
       }
 
-      startBlock = in.readInt();
+      if (version == RINDEX_VER_3 || version == RINDEX_VER_4 || version == RINDEX_VER_6 || version == RINDEX_VER_7) {
+        startBlock = in.readInt();
+      }
 
       int size = in.readInt();
 
@@ -224,8 +256,6 @@ public class RFile {
         out.writeUTF(name);
       }
 
-      out.writeInt(startBlock);
-
       if (isDefaultLG && columnFamilies == null) {
         // only expect null when default LG, otherwise let a NPE occur
         out.writeInt(-1);
@@ -246,26 +276,27 @@ public class RFile {
       indexWriter.close(out);
     }
 
-    public void printInfo() throws IOException {
+    public void printInfo(boolean isSample) throws IOException {
       PrintStream out = System.out;
-      out.println("Locality group         : " + (isDefaultLG ? "<DEFAULT>" : name));
-      out.println("\tStart block          : " + startBlock);
-      out.println("\tNum   blocks         : " + String.format("%,d", indexReader.size()));
+      out.printf("%-24s : %s\n", (isSample ? "Sample " : "") + "Locality group ", (isDefaultLG ? "<DEFAULT>" : name));
+      if (version == RINDEX_VER_3 || version == RINDEX_VER_4 || version == RINDEX_VER_6 || version == RINDEX_VER_7) {
+        out.printf("\t%-22s : %d\n", "Start block", startBlock);
+      }
+      out.printf("\t%-22s : %,d\n", "Num   blocks", indexReader.size());
       TreeMap<Integer,Long> sizesByLevel = new TreeMap<Integer,Long>();
       TreeMap<Integer,Long> countsByLevel = new TreeMap<Integer,Long>();
       indexReader.getIndexInfo(sizesByLevel, countsByLevel);
       for (Entry<Integer,Long> entry : sizesByLevel.descendingMap().entrySet()) {
-        out.println("\tIndex level " + entry.getKey() + "        : "
-            + String.format("%,d bytes  %,d blocks", entry.getValue(), countsByLevel.get(entry.getKey())));
+        out.printf("\t%-22s : %,d bytes  %,d blocks\n", "Index level " + entry.getKey(), entry.getValue(), countsByLevel.get(entry.getKey()));
       }
-      out.println("\tFirst key            : " + firstKey);
+      out.printf("\t%-22s : %s\n", "First key", firstKey);
 
       Key lastKey = null;
       if (indexReader.size() > 0) {
         lastKey = indexReader.getLastKey();
       }
 
-      out.println("\tLast key             : " + lastKey);
+      out.printf("\t%-22s : %s\n", "Last key", lastKey);
 
       long numKeys = 0;
       IndexIterator countIter = indexReader.lookup(new Key());
@@ -273,48 +304,193 @@ public class RFile {
         numKeys += countIter.next().getNumEntries();
       }
 
-      out.println("\tNum entries          : " + String.format("%,d", numKeys));
-      out.println("\tColumn families      : " + (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet()));
+      out.printf("\t%-22s : %,d\n", "Num entries", numKeys);
+      out.printf("\t%-22s : %s\n", "Column families", (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet()));
     }
 
   }
 
-  public static class Writer implements FileSKVWriter {
+  private static class SampleEntry {
+    Key key;
+    Value val;
 
-    public static final int MAX_CF_IN_DLG = 1000;
+    SampleEntry(Key key, Value val) {
+      this.key = new Key(key);
+      this.val = new Value(val);
+    }
+  }
+
+  private static class SampleLocalityGroupWriter {
+
+    private Sampler sampler;
+
+    private List<SampleEntry> entries = new ArrayList<>();
+    private long dataSize = 0;
+
+    private LocalityGroupWriter lgr;
+
+    public SampleLocalityGroupWriter(LocalityGroupWriter lgr, Sampler sampler) {
+      this.lgr = lgr;
+      this.sampler = sampler;
+    }
+
+    public void append(Key key, Value value) throws IOException {
+      if (sampler.accept(key)) {
+        entries.add(new SampleEntry(key, value));
+        dataSize += key.getSize() + value.getSize();
+      }
+    }
+
+    public void close() throws IOException {
+      for (SampleEntry se : entries) {
+        lgr.append(se.key, se.val);
+      }
+
+      lgr.close();
+    }
+
+    public void flushIfNeeded() throws IOException {
+      if (dataSize > sampleBufferSize) {
+        // the reason to write out all but one key is so that closeBlock() can always eventually be called with true
+        List<SampleEntry> subList = entries.subList(0, entries.size() - 1);
+
+        if (subList.size() > 0) {
+          for (SampleEntry se : subList) {
+            lgr.append(se.key, se.val);
+          }
+
+          lgr.closeBlock(subList.get(subList.size() - 1).key, false);
+
+          subList.clear();
+          dataSize = 0;
+        }
+      }
+    }
+  }
+
+  private static class LocalityGroupWriter {
 
     private BlockFileWriter fileWriter;
     private ABlockWriter blockWriter;
 
     // private BlockAppender blockAppender;
     private long blockSize = 100000;
-    private int indexBlockSize;
     private int entries = 0;
 
-    private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
     private LocalityGroupMetadata currentLocalityGroup = null;
-    private int nextBlock = 0;
 
     private Key lastKeyInBlock = null;
 
+    private Key prevKey = new Key();
+
+    private SampleLocalityGroupWriter sample;
+
+    LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) {
+      this.fileWriter = fileWriter;
+      this.blockSize = blockSize;
+      this.currentLocalityGroup = currentLocalityGroup;
+      this.sample = sample;
+    }
+
+    public void append(Key key, Value value) throws IOException {
+
+      if (key.compareTo(prevKey) < 0) {
+        throw new IllegalStateException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
+      }
+
+      currentLocalityGroup.updateColumnCount(key);
+
+      if (currentLocalityGroup.getFirstKey() == null) {
+        currentLocalityGroup.setFirstKey(key);
+      }
+
+      if (sample != null) {
+        sample.append(key, value);
+      }
+
+      if (blockWriter == null) {
+        blockWriter = fileWriter.prepareDataBlock();
+      } else if (blockWriter.getRawSize() > blockSize) {
+        closeBlock(prevKey, false);
+        blockWriter = fileWriter.prepareDataBlock();
+      }
+
+      RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
+
+      rk.write(blockWriter);
+      value.write(blockWriter);
+      entries++;
+
+      prevKey = new Key(key);
+      lastKeyInBlock = prevKey;
+
+    }
+
+    private void closeBlock(Key key, boolean lastBlock) throws IOException {
+      blockWriter.close();
+
+      if (lastBlock)
+        currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+      else
+        currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+
+      if (sample != null)
+        sample.flushIfNeeded();
+
+      blockWriter = null;
+      lastKeyInBlock = null;
+      entries = 0;
+    }
+
+    public void close() throws IOException {
+      if (blockWriter != null) {
+        closeBlock(lastKeyInBlock, true);
+      }
+
+      if (sample != null) {
+        sample.close();
+      }
+    }
+  }
+
+  public static class Writer implements FileSKVWriter {
+
+    public static final int MAX_CF_IN_DLG = 1000;
+
+    private BlockFileWriter fileWriter;
+
+    // private BlockAppender blockAppender;
+    private long blockSize = 100000;
+    private int indexBlockSize;
+
+    private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
+    private ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<LocalityGroupMetadata>();
+    private LocalityGroupMetadata currentLocalityGroup = null;
+    private LocalityGroupMetadata sampleLocalityGroup = null;
+
     private boolean dataClosed = false;
     private boolean closed = false;
-    private Key prevKey = new Key();
     private boolean startedDefaultLocalityGroup = false;
 
     private HashSet<ByteSequence> previousColumnFamilies;
     private long length = -1;
 
+    private LocalityGroupWriter lgWriter;
+
+    private SamplerConfigurationImpl samplerConfig;
+    private Sampler sampler;
+
     public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
-      this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+      this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX), null, null);
     }
 
-    public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
+    public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize, SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException {
       this.blockSize = blockSize;
       this.indexBlockSize = indexBlockSize;
       this.fileWriter = bfw;
-      this.blockWriter = null;
       previousColumnFamilies = new HashSet<ByteSequence>();
+      this.samplerConfig = samplerConfig;
+      this.sampler = sampler;
     }
 
     @Override
@@ -329,10 +505,12 @@ public class RFile {
       ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
 
       mba.writeInt(RINDEX_MAGIC);
-      mba.writeInt(RINDEX_VER_7);
+      mba.writeInt(RINDEX_VER_8);
 
-      if (currentLocalityGroup != null)
+      if (currentLocalityGroup != null) {
         localityGroups.add(currentLocalityGroup);
+        sampleGroups.add(sampleLocalityGroup);
+      }
 
       mba.writeInt(localityGroups.size());
 
@@ -340,6 +518,18 @@ public class RFile {
         lc.write(mba);
       }
 
+      if (samplerConfig == null) {
+        mba.writeBoolean(false);
+      } else {
+        mba.writeBoolean(true);
+
+        for (LocalityGroupMetadata lc : sampleGroups) {
+          lc.write(mba);
+        }
+
+        samplerConfig.write(mba);
+      }
+
       mba.close();
       fileWriter.close();
       length = fileWriter.getLength();
@@ -355,8 +545,8 @@ public class RFile {
 
       dataClosed = true;
 
-      if (blockWriter != null) {
-        closeBlock(lastKeyInBlock, true);
+      if (lgWriter != null) {
+        lgWriter.close();
       }
     }
 
@@ -367,46 +557,7 @@ public class RFile {
         throw new IllegalStateException("Cannont append, data closed");
       }
 
-      if (key.compareTo(prevKey) < 0) {
-        throw new IllegalStateException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
-      }
-
-      currentLocalityGroup.updateColumnCount(key);
-
-      if (currentLocalityGroup.getFirstKey() == null) {
-        currentLocalityGroup.setFirstKey(key);
-      }
-
-      if (blockWriter == null) {
-        blockWriter = fileWriter.prepareDataBlock();
-      } else if (blockWriter.getRawSize() > blockSize) {
-        closeBlock(prevKey, false);
-        blockWriter = fileWriter.prepareDataBlock();
-      }
-
-      RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
-
-      rk.write(blockWriter);
-      value.write(blockWriter);
-      entries++;
-
-      prevKey = new Key(key);
-      lastKeyInBlock = prevKey;
-
-    }
-
-    private void closeBlock(Key key, boolean lastBlock) throws IOException {
-      blockWriter.close();
-
-      if (lastBlock)
-        currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
-      else
-        currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
-
-      blockWriter = null;
-      lastKeyInBlock = null;
-      entries = 0;
-      nextBlock++;
+      lgWriter.append(key, value);
     }
 
     @Override
@@ -425,28 +576,35 @@ public class RFile {
         throw new IllegalStateException("Can not start anymore new locality groups after default locality group started");
       }
 
-      if (blockWriter != null) {
-        closeBlock(lastKeyInBlock, true);
+      if (lgWriter != null) {
+        lgWriter.close();
       }
 
       if (currentLocalityGroup != null) {
         localityGroups.add(currentLocalityGroup);
+        sampleGroups.add(sampleLocalityGroup);
       }
 
       if (columnFamilies == null) {
         startedDefaultLocalityGroup = true;
-        currentLocalityGroup = new LocalityGroupMetadata(nextBlock, previousColumnFamilies, indexBlockSize, fileWriter);
+        currentLocalityGroup = new LocalityGroupMetadata(previousColumnFamilies, indexBlockSize, fileWriter);
+        sampleLocalityGroup = new LocalityGroupMetadata(previousColumnFamilies, indexBlockSize, fileWriter);
       } else {
         if (!Collections.disjoint(columnFamilies, previousColumnFamilies)) {
           HashSet<ByteSequence> overlap = new HashSet<ByteSequence>(columnFamilies);
           overlap.retainAll(previousColumnFamilies);
           throw new IllegalArgumentException("Column families over lap with previous locality group : " + overlap);
         }
-        currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, nextBlock, indexBlockSize, fileWriter);
+        currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, indexBlockSize, fileWriter);
+        sampleLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, indexBlockSize, fileWriter);
         previousColumnFamilies.addAll(columnFamilies);
       }
 
-      prevKey = new Key();
+      SampleLocalityGroupWriter sampleWriter = null;
+      if (sampler != null) {
+        sampleWriter = new SampleLocalityGroupWriter(new LocalityGroupWriter(fileWriter, blockSize, sampleLocalityGroup, null), sampler);
+      }
+      lgWriter = new LocalityGroupWriter(fileWriter, blockSize, currentLocalityGroup, sampleWriter);
     }
 
     @Override
@@ -616,8 +774,9 @@ public class RFile {
       if (columnFamilies.size() != 0 || inclusive)
         throw new IllegalArgumentException("I do not know how to filter column families");
 
-      if (interruptFlag != null && interruptFlag.get())
+      if (interruptFlag != null && interruptFlag.get()) {
         throw new IterationInterruptedException();
+      }
 
       try {
         _seek(range);
@@ -830,6 +989,11 @@ public class RFile {
     public void registerMetrics(MetricsGatherer<?> vmg) {
       metricsGatherer = vmg;
     }
+
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      throw new UnsupportedOperationException();
+    }
   }
 
   public static class Reader extends HeapIterator implements FileSKVIterator {
@@ -837,8 +1001,12 @@ public class RFile {
     private BlockFileReader reader;
 
     private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
+    private ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<LocalityGroupMetadata>();
+
+    private LocalityGroupReader currentReaders[];
+    private LocalityGroupReader readers[];
+    private LocalityGroupReader sampleReaders[];
 
-    private LocalityGroupReader lgReaders[];
     private HashSet<ByteSequence> nonDefaultColumnFamilies;
 
     private List<Reader> deepCopies;
@@ -846,6 +1014,10 @@ public class RFile {
 
     private AtomicBoolean interruptFlag;
 
+    private SamplerConfigurationImpl samplerConfig = null;
+
+    private int rfileVersion;
+
     public Reader(BlockFileReader rdr) throws IOException {
       this.reader = rdr;
 
@@ -853,14 +1025,15 @@ public class RFile {
       try {
         int magic = mb.readInt();
         int ver = mb.readInt();
+        rfileVersion = ver;
 
         if (magic != RINDEX_MAGIC)
           throw new IOException("Did not see expected magic number, saw " + magic);
-        if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
+        if (ver != RINDEX_VER_8 && ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
           throw new IOException("Did not see expected version, saw " + ver);
 
         int size = mb.readInt();
-        lgReaders = new LocalityGroupReader[size];
+        currentReaders = new LocalityGroupReader[size];
 
         deepCopies = new LinkedList<Reader>();
 
@@ -869,8 +1042,28 @@ public class RFile {
           lgm.readFields(mb);
           localityGroups.add(lgm);
 
-          lgReaders[i] = new LocalityGroupReader(reader, lgm, ver);
+          currentReaders[i] = new LocalityGroupReader(reader, lgm, ver);
+        }
+
+        readers = currentReaders;
+
+        if (ver == RINDEX_VER_8 && mb.readBoolean()) {
+          sampleReaders = new LocalityGroupReader[size];
+
+          for (int i = 0; i < size; i++) {
+            LocalityGroupMetadata lgm = new LocalityGroupMetadata(ver, rdr);
+            lgm.readFields(mb);
+            sampleGroups.add(lgm);
+
+            sampleReaders[i] = new LocalityGroupReader(reader, lgm, ver);
+          }
+
+          samplerConfig = new SamplerConfigurationImpl(mb);
+        } else {
+          sampleReaders = null;
+          samplerConfig = null;
         }
+
       } finally {
         mb.close();
       }
@@ -881,24 +1074,53 @@ public class RFile {
           nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet());
       }
 
-      createHeap(lgReaders.length);
+      createHeap(currentReaders.length);
+    }
+
+    private Reader(Reader r, LocalityGroupReader sampleReaders[]) {
+      super(sampleReaders.length);
+      this.reader = r.reader;
+      this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
+      this.currentReaders = new LocalityGroupReader[sampleReaders.length];
+      this.deepCopies = r.deepCopies;
+      this.deepCopy = false;
+      this.readers = r.readers;
+      this.sampleReaders = r.sampleReaders;
+      this.samplerConfig = r.samplerConfig;
+      this.rfileVersion = r.rfileVersion;
+      for (int i = 0; i < sampleReaders.length; i++) {
+        this.currentReaders[i] = sampleReaders[i];
+        this.currentReaders[i].setInterruptFlag(r.interruptFlag);
+      }
     }
 
-    private Reader(Reader r) {
-      super(r.lgReaders.length);
+    private Reader(Reader r, boolean useSample) {
+      super(r.currentReaders.length);
       this.reader = r.reader;
       this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
-      this.lgReaders = new LocalityGroupReader[r.lgReaders.length];
+      this.currentReaders = new LocalityGroupReader[r.currentReaders.length];
       this.deepCopies = r.deepCopies;
       this.deepCopy = true;
-      for (int i = 0; i < lgReaders.length; i++) {
-        this.lgReaders[i] = new LocalityGroupReader(r.lgReaders[i]);
-        this.lgReaders[i].setInterruptFlag(r.interruptFlag);
+      this.samplerConfig = r.samplerConfig;
+      this.rfileVersion = r.rfileVersion;
+      this.readers = r.readers;
+      this.sampleReaders = r.sampleReaders;
+
+      for (int i = 0; i < r.readers.length; i++) {
+        if (useSample) {
+          this.currentReaders[i] = new LocalityGroupReader(r.sampleReaders[i]);
+          this.currentReaders[i].setInterruptFlag(r.interruptFlag);
+        } else {
+          this.currentReaders[i] = new LocalityGroupReader(r.readers[i]);
+          this.currentReaders[i].setInterruptFlag(r.interruptFlag);
+        }
+
       }
+
     }
 
     private void closeLocalityGroupReaders() {
-      for (LocalityGroupReader lgr : lgReaders) {
+      for (LocalityGroupReader lgr : currentReaders) {
         try {
           lgr.close();
         } catch (IOException e) {
@@ -926,6 +1148,16 @@ public class RFile {
       closeDeepCopies();
       closeLocalityGroupReaders();
 
+      if (sampleReaders != null) {
+        for (LocalityGroupReader lgr : sampleReaders) {
+          try {
+            lgr.close();
+          } catch (IOException e) {
+            log.warn("Errored out attempting to close LocalityGroupReader.", e);
+          }
+        }
+      }
+
       try {
         reader.close();
       } finally {
@@ -937,17 +1169,17 @@ public class RFile {
 
     @Override
     public Key getFirstKey() throws IOException {
-      if (lgReaders.length == 0) {
+      if (currentReaders.length == 0) {
         return null;
       }
 
       Key minKey = null;
 
-      for (int i = 0; i < lgReaders.length; i++) {
+      for (int i = 0; i < currentReaders.length; i++) {
         if (minKey == null) {
-          minKey = lgReaders[i].getFirstKey();
+          minKey = currentReaders[i].getFirstKey();
         } else {
-          Key firstKey = lgReaders[i].getFirstKey();
+          Key firstKey = currentReaders[i].getFirstKey();
           if (firstKey != null && firstKey.compareTo(minKey) < 0)
             minKey = firstKey;
         }
@@ -958,17 +1190,17 @@ public class RFile {
 
     @Override
     public Key getLastKey() throws IOException {
-      if (lgReaders.length == 0) {
+      if (currentReaders.length == 0) {
         return null;
       }
 
       Key maxKey = null;
 
-      for (int i = 0; i < lgReaders.length; i++) {
+      for (int i = 0; i < currentReaders.length; i++) {
         if (maxKey == null) {
-          maxKey = lgReaders[i].getLastKey();
+          maxKey = currentReaders[i].getLastKey();
         } else {
-          Key lastKey = lgReaders[i].getLastKey();
+          Key lastKey = currentReaders[i].getLastKey();
           if (lastKey != null && lastKey.compareTo(maxKey) > 0)
             maxKey = lastKey;
         }
@@ -988,10 +1220,26 @@ public class RFile {
 
     @Override
     public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-      Reader copy = new Reader(this);
-      copy.setInterruptFlagInternal(interruptFlag);
-      deepCopies.add(copy);
-      return copy;
+      if (env != null && env.isSamplingEnabled()) {
+        SamplerConfiguration sc = env.getSamplerConfiguration();
+        if (sc == null) {
+          throw new SampleNotPresentException();
+        }
+
+        if (this.samplerConfig != null && this.samplerConfig.equals(new SamplerConfigurationImpl(sc))) {
+          Reader copy = new Reader(this, true);
+          copy.setInterruptFlagInternal(interruptFlag);
+          deepCopies.add(copy);
+          return copy;
+        } else {
+          throw new SampleNotPresentException();
+        }
+      } else {
+        Reader copy = new Reader(this, false);
+        copy.setInterruptFlagInternal(interruptFlag);
+        deepCopies.add(copy);
+        return copy;
+      }
     }
 
     @Override
@@ -1027,14 +1275,20 @@ public class RFile {
      */
     public void registerMetrics(MetricsGatherer<?> vmg) {
       vmg.init(getLocalityGroupCF());
-      for (LocalityGroupReader lgr : lgReaders) {
+      for (LocalityGroupReader lgr : currentReaders) {
         lgr.registerMetrics(vmg);
       }
+
+      if (sampleReaders != null) {
+        for (LocalityGroupReader lgr : sampleReaders) {
+          lgr.registerMetrics(vmg);
+        }
+      }
     }
 
     @Override
     public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-      numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
+      numLGSeeked = LocalityGroupIterator.seek(this, currentReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
     }
 
     int getNumLocalityGroupsSeeked() {
@@ -1045,16 +1299,53 @@ public class RFile {
 
       ArrayList<Iterator<IndexEntry>> indexes = new ArrayList<Iterator<IndexEntry>>();
 
-      for (LocalityGroupReader lgr : lgReaders) {
+      for (LocalityGroupReader lgr : currentReaders) {
         indexes.add(lgr.getIndex());
       }
 
       return new MultiIndexIterator(this, indexes);
     }
 
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      Preconditions.checkNotNull(sampleConfig);
+
+      if (this.samplerConfig != null && this.samplerConfig.equals(sampleConfig)) {
+        Reader copy = new Reader(this, sampleReaders);
+        copy.setInterruptFlagInternal(interruptFlag);
+        return copy;
+      }
+
+      return null;
+    }
+
+    // only visible for printinfo
+    FileSKVIterator getSample() {
+      if (samplerConfig == null)
+        return null;
+      return getSample(this.samplerConfig);
+    }
+
     public void printInfo() throws IOException {
+
+      System.out.printf("%-24s : %d\n", "RFile Version", rfileVersion);
+      System.out.println();
+
       for (LocalityGroupMetadata lgm : localityGroups) {
-        lgm.printInfo();
+        lgm.printInfo(false);
+      }
+
+      if (sampleGroups.size() > 0) {
+
+        System.out.println();
+        System.out.printf("%-24s :\n", "Sample Configuration");
+        System.out.printf("\t%-22s : %s\n", "Sampler class ", samplerConfig.getClassName());
+        System.out.printf("\t%-22s : %s\n", "Sampler options ", samplerConfig.getOptions());
+        System.out.println();
+
+        for (LocalityGroupMetadata lgm : sampleGroups) {
+          lgm.printInfo(true);
+        }
       }
     }
 
@@ -1071,7 +1362,7 @@ public class RFile {
 
     private void setInterruptFlagInternal(AtomicBoolean flag) {
       this.interruptFlag = flag;
-      for (LocalityGroupReader lgr : lgReaders) {
+      for (LocalityGroupReader lgr : currentReaders) {
         lgr.setInterruptFlag(interruptFlag);
       }
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 088abfe..17e8e96 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -33,6 +33,9 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.sample.impl.SamplerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -123,8 +126,15 @@ public class RFileOperations extends FileOperations {
     long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
     long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
 
+    SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(acuconf);
+    Sampler sampler = null;
+
+    if (samplerConfig != null) {
+      sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
+    }
+
     CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf);
-    Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
+    Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
     return writer;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
index 5a53e93..5dbafa6 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
@@ -18,6 +18,8 @@ package org.apache.accumulo.core.iterators;
 
 import java.io.IOException;
 
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
@@ -37,4 +39,52 @@ public interface IteratorEnvironment {
   void registerSideChannel(SortedKeyValueIterator<Key,Value> iter);
 
   Authorizations getAuthorizations();
+
+  /**
+   * Returns a new iterator environment object that can be used to create deep copies over sample data. The new object created will use the current sampling
+   * configuration for the table. The existing iterator environment object will not be modified.
+   *
+   * <p>
+   * Since sample data could be created in many different ways, a good practice for an iterator is to verify the sampling configuration is as expected.
+   *
+   * <p>
+   *
+   * <pre>
+   * <code>
+   *   class MyIter implements SortedKeyValueIterator&lt;Key,Value&gt; {
+   *     SortedKeyValueIterator&lt;Key,Value&gt; source;
+   *     SortedKeyValueIterator&lt;Key,Value&gt; sampleIter;
+   *     &#64;Override
+   *     void init(SortedKeyValueIterator&lt;Key,Value&gt; source, Map&lt;String,String&gt; options, IteratorEnvironment env) {
+   *       IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled();
+   *       //do some sanity checks on sampling config
+   *       validateSamplingConfiguration(sampleEnv.getSamplerConfiguration());
+   *       sampleIter = source.deepCopy(sampleEnv);
+   *       this.source = source;
+   *     }
+   *   }
+   * </code>
+   * </pre>
+   *
+   * @throws SampleNotPresentException
+   *           when sampling is not configured for table.
+   * @since 1.8.0
+   */
+  IteratorEnvironment cloneWithSamplingEnabled();
+
+  /**
+   * There are at least two conditions under which sampling will be enabled for an environment. One condition is when sampling is enabled for the scan that
+   * starts everything. Another possibility is for a deep copy created with an environment created by calling {@link #cloneWithSamplingEnabled()}
+   *
+   * @return true if sampling is enabled for this environment.
+   * @since 1.8.0
+   */
+  boolean isSamplingEnabled();
+
+  /**
+   *
+   * @return sampling configuration is sampling is enabled for environment, otherwise returns null.
+   * @since 1.8.0
+   */
+  SamplerConfiguration getSamplerConfiguration();
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
index 3999b6f..25c010d 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.accumulo.core.client.SampleNotPresentException;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
@@ -53,6 +54,9 @@ public class SortedMapIterator implements InterruptibleIterator {
 
   @Override
   public SortedMapIterator deepCopy(IteratorEnvironment env) {
+    if (env != null && env.isSamplingEnabled()) {
+      throw new SampleNotPresentException();
+    }
     return new SortedMapIterator(map, interruptFlag);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
index 7723ef1..5b37b30 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
@@ -56,8 +56,6 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
 
   @Override
   public Key getTopKey() {
-    if (source == null)
-      throw new IllegalStateException("no source set");
     if (seenSeek == false)
       throw new IllegalStateException("never been seeked");
     return getSource().getTopKey();
@@ -65,8 +63,6 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
 
   @Override
   public Value getTopValue() {
-    if (source == null)
-      throw new IllegalStateException("no source set");
     if (seenSeek == false)
       throw new IllegalStateException("never been seeked");
     return getSource().getTopValue();
@@ -74,8 +70,6 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
 
   @Override
   public boolean hasTop() {
-    if (source == null)
-      throw new IllegalStateException("no source set");
     if (seenSeek == false)
       throw new IllegalStateException("never been seeked");
     return getSource().hasTop();
@@ -89,8 +83,6 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
 
   @Override
   public void next() throws IOException {
-    if (source == null)
-      throw new IllegalStateException("no source set");
     if (seenSeek == false)
       throw new IllegalStateException("never been seeked");
     getSource().next();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/system/EmptyIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/EmptyIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/EmptyIterator.java
new file mode 100644
index 0000000..b791eb1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/EmptyIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.iterators.system;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+public class EmptyIterator implements InterruptibleIterator {
+
+  public static final EmptyIterator EMPTY_ITERATOR = new EmptyIterator();
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {}
+
+  @Override
+  public boolean hasTop() {
+    return false;
+  }
+
+  @Override
+  public void next() throws IOException {
+    // nothing should call this since hasTop always returns false
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {}
+
+  @Override
+  public Key getTopKey() {
+    // nothing should call this since hasTop always returns false
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Value getTopValue() {
+    // nothing should call this since hasTop always returns false
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return EMPTY_ITERATOR;
+  }
+
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {}
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
index 9d59570..f9f0600 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.file.map.MapFileUtil;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -154,4 +155,9 @@ public class MapFileIterator implements FileSKVIterator {
   public void close() throws IOException {
     reader.close();
   }
+
+  @Override
+  public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/system/SampleIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SampleIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SampleIterator.java
new file mode 100644
index 0000000..aedcdba
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SampleIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.iterators.system;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.sample.Sampler;
+
+public class SampleIterator extends Filter {
+
+  private Sampler sampler = new RowSampler();
+
+  public SampleIterator(SortedKeyValueIterator<Key,Value> iter, Sampler sampler) {
+    setSource(iter);
+    this.sampler = sampler;
+  }
+
+  @Override
+  public boolean accept(Key k, Value v) {
+    return sampler.accept(k);
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new SampleIterator(getSource().deepCopy(env), sampler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java
index 8710acd..8ea3800 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Reader;
 
@@ -126,4 +127,9 @@ public class SequenceFileIterator implements FileSKVIterator {
   public void setInterruptFlag(AtomicBoolean flag) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/AbstractHashSampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/AbstractHashSampler.java b/core/src/main/java/org/apache/accumulo/core/sample/AbstractHashSampler.java
new file mode 100644
index 0000000..ae2b951
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/AbstractHashSampler.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample;
+
+import java.util.Set;
+
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+/**
+ * A base class that can be used to create Samplers based on hashing. This class offers consistent options for configuring the hash function. The subclass
+ * decides which parts of the key to hash.
+ *
+ * <p>
+ * This class support two options passed into {@link #init(SamplerConfiguration)}. One option is {@code hasher} which specifies a hashing algorithm. Valid
+ * values for this option are {@code md5}, {@code sha1}, and {@code murmur3_32}. If you are not sure, then choose {@code murmur3_32}.
+ *
+ * <p>
+ * The second option is {@code modulus} which can have any positive integer as a value.
+ *
+ * <p>
+ * Any data where {@code hash(data) % modulus == 0} will be selected for the sample.
+ *
+ * @since 1.8.0
+ */
+
+public abstract class AbstractHashSampler implements Sampler {
+
+  private HashFunction hashFunction;
+  private int modulus;
+
+  private static final Set<String> VALID_OPTIONS = ImmutableSet.of("hasher", "modulus");
+
+  /**
+   * Subclasses with options should override this method and return true if the option is valid for the subclass or if {@code super.isValidOption(opt)} returns
+   * true.
+   */
+
+  protected boolean isValidOption(String option) {
+    return VALID_OPTIONS.contains(option);
+  }
+
+  /**
+   * Subclasses with options should override this method and call {@code super.init(config)}.
+   */
+
+  @Override
+  public void init(SamplerConfiguration config) {
+    String hasherOpt = config.getOptions().get("hasher");
+    String modulusOpt = config.getOptions().get("modulus");
+
+    Preconditions.checkNotNull(hasherOpt, "Hasher not specified");
+    Preconditions.checkNotNull(modulusOpt, "Modulus not specified");
+
+    for (String option : config.getOptions().keySet()) {
+      Preconditions.checkArgument(isValidOption(option), "Unknown option : %s", option);
+    }
+
+    switch (hasherOpt) {
+      case "murmur3_32":
+        hashFunction = Hashing.murmur3_32();
+        break;
+      case "md5":
+        hashFunction = Hashing.md5();
+        break;
+      case "sha1":
+        hashFunction = Hashing.sha1();
+        break;
+      default:
+        throw new IllegalArgumentException("Uknown hahser " + hasherOpt);
+    }
+
+    modulus = Integer.parseInt(modulusOpt);
+  }
+
+  /**
+   * Subclass must override this method and hash some portion of the key.
+   */
+  protected abstract HashCode hash(HashFunction hashFunction, Key k);
+
+  @Override
+  public boolean accept(Key k) {
+    return hash(hashFunction, k).asInt() % modulus == 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java b/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
new file mode 100644
index 0000000..ad68cf6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample;
+
+import java.util.Set;
+
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+
+/**
+ * This sampler can hash any subset of a Key's fields. The fields that hashed for the sample are determined by the configuration options passed in
+ * {@link #init(SamplerConfiguration)}. The following key values are valid options.
+ *
+ * <UL>
+ * <li>row=true|false
+ * <li>family=true|false
+ * <li>qualifier=true|false
+ * <li>visibility=true|false
+ * </UL>
+ *
+ * <p>
+ * If not specified in the options, fields default to false.
+ *
+ * <p>
+ * To determine what options are valid for hashing see {@link AbstractHashSampler}
+ *
+ * <p>
+ * To configure Accumulo to generate sample data on one thousandth of the column qualifiers, the following SamplerConfiguration could be created and used to
+ * configure a table.
+ *
+ * <p>
+ * {@code new SamplerConfiguration(RowColumnSampler.class.getName()).setOptions(ImmutableMap.of("hasher","murmur3_32","modulus","1009","qualifier","true"))}
+ *
+ * <p>
+ * With this configuration, if a column qualifier is selected then all key values contains that column qualifier will end up in the sample data.
+ *
+ * @since 1.8.0
+ */
+
+public class RowColumnSampler extends AbstractHashSampler {
+
+  private boolean row = true;
+  private boolean family = true;
+  private boolean qualifier = true;
+  private boolean visibility = true;
+
+  private static final Set<String> VALID_OPTIONS = ImmutableSet.of("row", "family", "qualifier", "visibility");
+
+  private boolean hashField(SamplerConfiguration config, String field) {
+    String optValue = config.getOptions().get(field);
+    if (optValue != null) {
+      return Boolean.parseBoolean(optValue);
+    }
+
+    return false;
+  }
+
+  @Override
+  protected boolean isValidOption(String option) {
+    return super.isValidOption(option) || VALID_OPTIONS.contains(option);
+  }
+
+  @Override
+  public void init(SamplerConfiguration config) {
+    super.init(config);
+
+    row = hashField(config, "row");
+    family = hashField(config, "family");
+    qualifier = hashField(config, "qualifier");
+    visibility = hashField(config, "visibility");
+
+    if (!row && !family && !qualifier && !visibility) {
+      throw new IllegalStateException("Must hash at least one key field");
+    }
+  }
+
+  private void putByteSquence(ByteSequence data, Hasher hasher) {
+    hasher.putBytes(data.getBackingArray(), data.offset(), data.length());
+  }
+
+  @Override
+  protected HashCode hash(HashFunction hashFunction, Key k) {
+    Hasher hasher = hashFunction.newHasher();
+
+    if (row) {
+      putByteSquence(k.getRowData(), hasher);
+    }
+
+    if (family) {
+      putByteSquence(k.getColumnFamilyData(), hasher);
+    }
+
+    if (qualifier) {
+      putByteSquence(k.getColumnQualifierData(), hasher);
+    }
+
+    if (visibility) {
+      putByteSquence(k.getColumnVisibilityData(), hasher);
+    }
+
+    return hasher.hash();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/RowSampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/RowSampler.java b/core/src/main/java/org/apache/accumulo/core/sample/RowSampler.java
new file mode 100644
index 0000000..8690a1c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/RowSampler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+
+/**
+ * Builds a sample based on entire rows. If a row is selected for the sample, then all of its columns will be included.
+ *
+ * <p>
+ * To determine what options are valid for hashing see {@link AbstractHashSampler}. This class offers no addition options, it always hashes on the row.
+ *
+ * <p>
+ * To configure Accumulo to generate sample data on one thousandth of the rows, the following SamplerConfiguration could be created and used to configure a
+ * table.
+ *
+ * <p>
+ * {@code new SamplerConfiguration(RowSampler.class.getName()).setOptions(ImmutableMap.of("hasher","murmur3_32","modulus","1009"))}
+ *
+ * @since 1.8.0
+ */
+
+public class RowSampler extends AbstractHashSampler {
+
+  @Override
+  protected HashCode hash(HashFunction hashFunction, Key k) {
+    ByteSequence row = k.getRowData();
+    return hashFunction.hashBytes(row.getBackingArray(), row.offset(), row.length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/Sampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/Sampler.java b/core/src/main/java/org/apache/accumulo/core/sample/Sampler.java
new file mode 100644
index 0000000..64adeec
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/Sampler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample;
+
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+
+/**
+ * A function that decides which key values are stored in a tables sample. As Accumuo compacts data and creates rfiles it uses a Sampler to decided what to
+ * store in the rfiles sample section. The class name of the Sampler and the Samplers configuration are stored in each rfile. A scan of a tables sample will
+ * only succeed if all rfiles were created with the same sampler and sampler configuration.
+ *
+ * <p>
+ * Since the decisions that Sampler makes are persisted, the behavior of a Sampler for a given configuration should always be the same. One way to offer a new
+ * behavior is to offer new options, while still supporting old behavior with a Samplers existing options.
+ *
+ * <p>
+ * Ideally a sampler that selects a Key k1 would also select updates for k1. For example if a Sampler selects :
+ * {@code row='000989' family='name' qualifier='last' visibility='ADMIN' time=9 value='Doe'}, it would be nice if it also selected :
+ * {@code row='000989' family='name' qualifier='last' visibility='ADMIN' time=20 value='Dough'}. Using hash and modulo on the key fields is a good way to
+ * accomplish this and {@link AbstractHashSampler} provides a good basis for implementation.
+ *
+ * @since 1.8.0
+ */
+
+public interface Sampler {
+
+  /**
+   * An implementation of Sampler must have a noarg constructor. After construction this method is called once to initialize a sampler before it is used.
+   *
+   * @param config
+   *          Configuration options for a sampler.
+   */
+  void init(SamplerConfiguration config);
+
+  /**
+   * @param k
+   *          A key that was written to a rfile.
+   * @return True if the key (and its associtated value) should be stored in the rfile's sample. Return false if it should not be included.
+   */
+  boolean accept(Key k);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
new file mode 100644
index 0000000..348def4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Writable;
+
+public class SamplerConfigurationImpl implements Writable {
+  private String className;
+  private Map<String,String> options;
+
+  public SamplerConfigurationImpl(DataInput in) throws IOException {
+    readFields(in);
+  }
+
+  public SamplerConfigurationImpl(SamplerConfiguration sc) {
+    this.className = sc.getSamplerClassName();
+    this.options = new HashMap<>(sc.getOptions());
+  }
+
+  public SamplerConfigurationImpl(String className, Map<String,String> options) {
+    this.className = className;
+    this.options = options;
+  }
+
+  public SamplerConfigurationImpl() {}
+
+  public String getClassName() {
+    return className;
+  }
+
+  public Map<String,String> getOptions() {
+    return Collections.unmodifiableMap(options);
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * className.hashCode() + options.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof SamplerConfigurationImpl) {
+      SamplerConfigurationImpl osc = (SamplerConfigurationImpl) o;
+
+      return className.equals(osc.className) && options.equals(osc.options);
+    }
+
+    return false;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // The Writable serialization methods for this class are called by RFile and therefore must be very stable. An alternative way to serialize this class is to
+    // use Thrift. That was not used here inorder to avoid making RFile depend on Thrift.
+
+    // versioning info
+    out.write(1);
+
+    out.writeUTF(className);
+
+    out.writeInt(options.size());
+
+    for (Entry<String,String> entry : options.entrySet()) {
+      out.writeUTF(entry.getKey());
+      out.writeUTF(entry.getValue());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int version = in.readByte();
+
+    if (version != 1) {
+      throw new IllegalArgumentException("Unexpected version " + version);
+    }
+
+    className = in.readUTF();
+
+    options = new HashMap<String,String>();
+
+    int num = in.readInt();
+
+    for (int i = 0; i < num; i++) {
+      String key = in.readUTF();
+      String val = in.readUTF();
+      options.put(key, val);
+    }
+  }
+
+  public SamplerConfiguration toSamplerConfiguration() {
+    SamplerConfiguration sc = new SamplerConfiguration(className);
+    sc.setOptions(options);
+    return sc;
+  }
+
+  public List<Pair<String,String>> toTableProperties() {
+    ArrayList<Pair<String,String>> props = new ArrayList<>();
+
+    for (Entry<String,String> entry : options.entrySet()) {
+      props.add(new Pair<String,String>(Property.TABLE_SAMPLER_OPTS.getKey() + entry.getKey(), entry.getValue()));
+    }
+
+    // intentionally added last, so its set last
+    props.add(new Pair<String,String>(Property.TABLE_SAMPLER.getKey(), className));
+
+    return props;
+  }
+
+  public Map<String,String> toTablePropertiesMap() {
+    LinkedHashMap<String,String> propsMap = new LinkedHashMap<>();
+    for (Pair<String,String> pair : toTableProperties()) {
+      propsMap.put(pair.getFirst(), pair.getSecond());
+    }
+
+    return propsMap;
+  }
+
+  public static SamplerConfigurationImpl newSamplerConfig(AccumuloConfiguration acuconf) {
+    String className = acuconf.get(Property.TABLE_SAMPLER);
+
+    if (className == null || className.equals("")) {
+      return null;
+    }
+
+    Map<String,String> rawOptions = acuconf.getAllPropertiesWithPrefix(Property.TABLE_SAMPLER_OPTS);
+    Map<String,String> options = new HashMap<>();
+
+    for (Entry<String,String> entry : rawOptions.entrySet()) {
+      String key = entry.getKey().substring(Property.TABLE_SAMPLER_OPTS.getKey().length());
+      options.put(key, entry.getValue());
+    }
+
+    return new SamplerConfigurationImpl(className, options);
+  }
+
+  @Override
+  public String toString() {
+    return className + " " + options;
+  }
+
+  public static TSamplerConfiguration toThrift(SamplerConfiguration samplerConfig) {
+    if (samplerConfig == null)
+      return null;
+    return new TSamplerConfiguration(samplerConfig.getSamplerClassName(), samplerConfig.getOptions());
+  }
+
+  public static SamplerConfiguration fromThrift(TSamplerConfiguration tsc) {
+    if (tsc == null)
+      return null;
+    return new SamplerConfiguration(tsc.getClassName()).setOptions(tsc.getOptions());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
new file mode 100644
index 0000000..3f11fbe
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample.impl;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+
+public class SamplerFactory {
+  public static Sampler newSampler(SamplerConfigurationImpl config, AccumuloConfiguration acuconf) throws IOException {
+    String context = acuconf.get(Property.TABLE_CLASSPATH);
+
+    Class<? extends Sampler> clazz;
+    try {
+      if (context != null && !context.equals(""))
+        clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, config.getClassName(), Sampler.class);
+      else
+        clazz = AccumuloVFSClassLoader.loadClass(config.getClassName(), Sampler.class);
+
+      Sampler sampler = clazz.newInstance();
+
+      sampler.init(config.toSamplerConfiguration());
+
+      return sampler;
+
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}