You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/02/05 18:42:32 UTC

accumulo git commit: ACCUMULO-3420 Metrics Gathering Object Added

Repository: accumulo
Updated Branches:
  refs/heads/master 6cbe886e3 -> 8c2294df8


ACCUMULO-3420 Metrics Gathering Object Added

Signed-off-by: Christopher Tubbs <ct...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8c2294df
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8c2294df
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8c2294df

Branch: refs/heads/master
Commit: 8c2294df8a3e602f1571861196ff92fddeecf350
Parents: 6cbe886
Author: Jenna Huston <je...@gmail.com>
Authored: Wed Dec 17 11:47:51 2014 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Feb 5 12:23:05 2015 -0500

----------------------------------------------------------------------
 .../core/file/rfile/MetricsGatherer.java        |  87 ++++
 .../accumulo/core/file/rfile/PrintInfo.java     |  51 +-
 .../apache/accumulo/core/file/rfile/RFile.java  |  50 +-
 .../core/file/rfile/VisMetricsGatherer.java     | 172 +++++++
 .../core/file/rfile/VisibilityMetric.java       |  73 +++
 .../core/file/rfile/RFileMetricsTest.java       | 515 +++++++++++++++++++
 .../accumulo/core/file/rfile/RFileTest.java     |  30 +-
 7 files changed, 953 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java
new file mode 100644
index 0000000..bfda9aa
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Interface used to gather metrics from RFiles.
+ *
+ * @param <T>
+ *          Type used to return metrics in getMetrics(). This does not impact collection of metrics at all, is only used in that method.
+ */
+public interface MetricsGatherer<T> {
+
+  /**
+   * Initialize the gatherer when it is registered with the RFile Reader
+   *
+   * @param cf
+   *          Map of the LocalityGroup names to their column families
+   */
+  void init(Map<String,ArrayList<ByteSequence>> cf);
+
+  /**
+   * Start a new LocalityGroup. This method is used when the RFile seeks to the next LocalityGroup.
+   *
+   * @param cf
+   *          Text object of the column family of the first entry in the locality group
+   */
+  void startLocalityGroup(Text cf);
+
+  /**
+   * Collect and store metrics for the given entry.
+   *
+   * @param key
+   *          Key object of the entry you are collecting metrics from
+   *
+   * @param val
+   *          Value object of the entry you are collecting metrics from
+   *
+   */
+  void addMetric(Key key, Value val);
+
+  /**
+   * Start a new block within a LocalityGroup. This method is used when the RFile moves on the the next block in the LocalityGroup.
+   */
+  void startBlock();
+
+  /**
+   * Print the results of the metrics gathering by locality group in the format: Metric name Number of keys Percentage of keys Number of blocks Percentage of
+   * blocks
+   *
+   * @param hash
+   *          Boolean to determine whether the values being printed should be hashed
+   * @param metricWord
+   *          String of the name of the metric that was collected
+   * @param out
+   *          PrintStream of where the information should be written to
+   */
+  void printMetrics(boolean hash, String metricWord, PrintStream out);
+
+  /**
+   * @return the metrics gathered
+   */
+  T getMetrics();
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index 9ff1dd2..591d477 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -18,6 +18,8 @@ package org.apache.accumulo.core.file.rfile;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.core.cli.Help;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -47,6 +49,10 @@ public class PrintInfo implements KeywordExecutable {
   static class Opts extends Help {
     @Parameter(names = {"-d", "--dump"}, description = "dump the key/value pairs")
     boolean dump = false;
+    @Parameter(names = {"-v", "--vis"}, description = "show visibility metrics")
+    boolean vis = false;
+    @Parameter(names = {"--visHash"}, description = "show visibilities as hashes, implies -v")
+    boolean hash = false;
     @Parameter(names = {"--histogram"}, description = "print a histogram of the key-value sizes")
     boolean histogram = false;
     @Parameter(description = " <file> { <file> ... }")
@@ -98,29 +104,46 @@ public class PrintInfo implements KeywordExecutable {
 
       CachableBlockFile.Reader _rdr = new CachableBlockFile.Reader(fs, path, conf, null, null, aconf);
       Reader iter = new RFile.Reader(_rdr);
+      MetricsGatherer<Map<String, ArrayList<VisibilityMetric>>> vmg = new VisMetricsGatherer();
+
+      if (opts.vis || opts.hash)
+        iter.registerMetrics(vmg);
 
       iter.printInfo();
       System.out.println();
       org.apache.accumulo.core.file.rfile.bcfile.PrintInfo.main(new String[] {arg});
 
-      if (opts.histogram || opts.dump) {
-        iter.seek(new Range((Key) null, (Key) null), new ArrayList<ByteSequence>(), false);
-        while (iter.hasTop()) {
-          Key key = iter.getTopKey();
-          Value value = iter.getTopValue();
-          if (opts.dump)
-            System.out.println(key + " -> " + value);
-          if (opts.histogram) {
-            long size = key.getSize() + value.getSize();
-            int bucket = (int) Math.log10(size);
-            countBuckets[bucket]++;
-            sizeBuckets[bucket] += size;
-            totalSize += size;
+      Map<String, ArrayList<ByteSequence>> localityGroupCF = null;
+
+      if (opts.histogram || opts.dump || opts.vis || opts.hash) {
+        localityGroupCF = iter.getLocalityGroupCF();
+
+        for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) {
+
+          iter.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
+          while (iter.hasTop()) {
+            Key key = iter.getTopKey();
+            Value value = iter.getTopValue();
+            if (opts.dump)
+              System.out.println(key + " -> " + value);
+            if (opts.histogram) {
+              long size = key.getSize() + value.getSize();
+              int bucket = (int) Math.log10(size);
+              countBuckets[bucket]++;
+              sizeBuckets[bucket] += size;
+              totalSize += size;
+            }
+            iter.next();
           }
-          iter.next();
         }
       }
+      System.out.println();
+
       iter.close();
+
+      if (opts.vis || opts.hash)
+        vmg.printMetrics(opts.hash, "Visibility", System.out);
+
       if (opts.histogram) {
         System.out.println("Up to size      count      %-age");
         for (int i = 1; i < countBuckets.length; i++) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 0b464d8..888924d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -554,6 +554,8 @@ public class RFile {
 
       if (entriesLeft == 0) {
         currBlock.close();
+        if (metricsGatherer != null)
+          metricsGatherer.startBlock();
 
         if (iiter.hasNext()) {
           IndexEntry indexEntry = iiter.next();
@@ -561,7 +563,7 @@ public class RFile {
           currBlock = getDataBlock(indexEntry);
 
           checkRange = range.afterEndKey(indexEntry.getKey());
-          if (!checkRange)
+    if (!checkRange)
             hasTop = true;
 
         } else {
@@ -575,6 +577,10 @@ public class RFile {
       prevKey = rk.getKey();
       rk.readFields(currBlock);
       val.readFields(currBlock);
+
+      if (metricsGatherer != null)
+        metricsGatherer.addMetric(rk.getKey(), val);
+
       entriesLeft--;
       if (checkRange)
         hasTop = !range.afterEndKey(rk.getKey());
@@ -760,6 +766,11 @@ public class RFile {
       while (hasTop() && range.beforeStartKey(getTopKey())) {
         next();
       }
+
+      if (metricsGatherer != null) {
+        metricsGatherer.startLocalityGroup(rk.getKey().getColumnFamily());
+        metricsGatherer.addMetric(rk.getKey(), val);
+      }
     }
 
     @Override
@@ -803,6 +814,12 @@ public class RFile {
     public InterruptibleIterator getIterator() {
       return this;
     }
+
+    private MetricsGatherer<?> metricsGatherer;
+
+    public void registerMetrics(MetricsGatherer<?> vmg) {
+      metricsGatherer = vmg;
+    }
   }
 
   public static class Reader extends HeapIterator implements FileSKVIterator {
@@ -973,8 +990,38 @@ public class RFile {
 
     }
 
+    public Map<String,ArrayList<ByteSequence>> getLocalityGroupCF() {
+      Map<String,ArrayList<ByteSequence>> cf = new HashMap<>();
+
+      for (LocalityGroupMetadata lcg : localityGroups) {
+        ArrayList<ByteSequence> setCF = new ArrayList<ByteSequence>();
+
+        for (Entry<ByteSequence,MutableLong> entry : lcg.columnFamilies.entrySet()) {
+          setCF.add(entry.getKey());
+        }
+
+        cf.put(lcg.name, setCF);
+      }
+
+      return cf;
+    }
+
     private int numLGSeeked = 0;
 
+    /**
+     * Method that registers the given MetricsGatherer. You can only register one as it will clobber any previously set. The MetricsGatherer should be
+     * registered before iterating through the LocalityGroups.
+     *
+     * @param vmg
+     *          MetricsGatherer to be registered with the LocalityGroupReaders
+     */
+    public void registerMetrics(MetricsGatherer<?> vmg) {
+      vmg.init(getLocalityGroupCF());
+      for (LocalityGroupReader lgr : lgReaders) {
+        lgr.registerMetrics(vmg);
+      }
+    }
+
     @Override
     public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
       numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
@@ -999,7 +1046,6 @@ public class RFile {
       for (LocalityGroupMetadata lgm : localityGroups) {
         lgm.printInfo();
       }
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java
new file mode 100644
index 0000000..6050e41
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Charsets;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.AtomicLongMap;
+
+/**
+ * This class provides visibility metrics per locality group. The Map in getMetrics() maps the locality group name to an ArrayList of VisibilityMetric objects.
+ * These contain the components of a visibility metric; the visibility as a String, the number of times that is seen in a locality group, the percentage of keys
+ * that contain that visibility in the locality group, the number of blocks in the locality group that contain the visibility, and the percentage of blocks in
+ * the locality group that contain the visibility.
+ */
+public class VisMetricsGatherer implements MetricsGatherer<Map<String,ArrayList<VisibilityMetric>>> {
+
+  protected Map<String,AtomicLongMap<String>> metric;
+  protected Map<String,AtomicLongMap<String>> blocks;
+  protected ArrayList<Long> numEntries;
+  protected ArrayList<Integer> numBlocks;
+  private ArrayList<String> inBlock;
+  protected ArrayList<String> localityGroups;
+  private int numLG;
+  private Map<String,ArrayList<ByteSequence>> localityGroupCF;
+
+  public VisMetricsGatherer() {
+    metric = new HashMap<>();
+    blocks = new HashMap<>();
+    numEntries = new ArrayList<>();
+    numBlocks = new ArrayList<>();
+    inBlock = new ArrayList<>();
+    localityGroups = new ArrayList<>();
+    numLG = 0;
+  }
+
+  @Override
+  public void init(Map<String,ArrayList<ByteSequence>> cf) {
+    localityGroupCF = cf;
+  }
+
+  @Override
+  public void startLocalityGroup(Text oneCF) {
+    String name = null;
+    ByteSequence cf = new ArrayByteSequence(oneCF.toString());
+    for (Entry<String,ArrayList<ByteSequence>> entry : localityGroupCF.entrySet()) {
+      if (entry.getValue().contains(cf)) {
+        if (entry.getKey() == null)
+          name = null;
+        else
+          name = entry.getKey().toString();
+        break;
+      }
+    }
+    localityGroups.add(name);
+    metric.put(name, AtomicLongMap.create(new HashMap<String,Long>()));
+    blocks.put(name, AtomicLongMap.create(new HashMap<String,Long>()));
+    numLG++;
+    numEntries.add((long) 0);
+    numBlocks.add(0);
+  }
+
+  @Override
+  public void addMetric(Key key, Value val) {
+    String myMetric = key.getColumnVisibility().toString();
+    String currLG = localityGroups.get(numLG - 1);
+    if (metric.get(currLG).containsKey(myMetric)) {
+      metric.get(currLG).getAndIncrement(myMetric);
+    } else
+      metric.get(currLG).put(myMetric, 1);
+
+    numEntries.set(numLG - 1, numEntries.get(numLG - 1) + 1);
+
+    if (!inBlock.contains(myMetric) && blocks.get(currLG).containsKey(myMetric)) {
+      blocks.get(currLG).incrementAndGet(myMetric);
+      inBlock.add(myMetric);
+    } else if (!inBlock.contains(myMetric) && !blocks.get(currLG).containsKey(myMetric)) {
+      blocks.get(currLG).put(myMetric, 1);
+      inBlock.add(myMetric);
+    }
+
+  }
+
+  @Override
+  public void startBlock() {
+    inBlock.clear();
+    numBlocks.set(numLG - 1, numBlocks.get(numLG - 1) + 1);
+  }
+
+  @Override
+  public void printMetrics(boolean hash, String metricWord, PrintStream out) {
+    for (int i = 0; i < numLG; i++) {
+      String lGName = localityGroups.get(i);
+      out.print("Locality Group: ");
+      if (lGName == null)
+        out.println("<DEFAULT>");
+      else
+        out.println(localityGroups.get(i));
+      out.printf("%-27s", metricWord);
+      out.println("Number of keys" + "\t   " + "Percent of keys" + "\t" + "Number of blocks" + "\t" + "Percent of blocks");
+      for (Entry<String,Long> entry : metric.get(lGName).asMap().entrySet()) {
+        HashFunction hf = Hashing.md5();
+        HashCode hc = hf.newHasher().putString(entry.getKey(), Charsets.UTF_8).hash();
+        if (hash)
+          out.printf("%-20s", hc.toString().substring(0, 8));
+        else
+          out.printf("%-20s", entry.getKey());
+        out.print("\t\t" + entry.getValue() + "\t\t\t");
+        out.printf("%.2f", ((double) entry.getValue() / numEntries.get(i)) * 100);
+        out.print("%\t\t\t");
+
+        long blocksIn = blocks.get(lGName).get(entry.getKey());
+
+        out.print(blocksIn + "\t\t   ");
+        out.printf("%.2f", ((double) blocksIn / numBlocks.get(i)) * 100);
+        out.print("%");
+
+        out.println("");
+      }
+      out.println("Number of keys: " + numEntries.get(i));
+      out.println();
+    }
+  }
+
+  @Override
+  public Map<String,ArrayList<VisibilityMetric>> getMetrics() {
+    Map<String,ArrayList<VisibilityMetric>> getMetrics = new HashMap<>();
+    for (int i = 0; i < numLG; i++) {
+      String lGName = localityGroups.get(i);
+      ArrayList<VisibilityMetric> rows = new ArrayList<>();
+      for (Entry<String,Long> entry : metric.get(lGName).asMap().entrySet()) {
+        long vis = entry.getValue();
+        double visPer = ((double) entry.getValue() / numEntries.get(i)) * 100;
+
+        long blocksIn = blocks.get(lGName).get(entry.getKey());
+        double blocksPer = ((double) blocksIn / numBlocks.get(i)) * 100;
+
+        rows.add(new VisibilityMetric(entry.getKey(), vis, visPer, blocksIn, blocksPer));
+      }
+      getMetrics.put(lGName, rows);
+    }
+    return getMetrics;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java
new file mode 100644
index 0000000..ab7b1d7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+/**
+ * Class that holds the components of a visibility metric. The String visibility, the number of times that is seen in a locality group, the percentage of keys
+ * that contain that visibility in the locality group, the number of blocks in the locality group that contain the visibility, and the percentage of blocks in
+ * the locality group that contain the visibility.
+ */
+public class VisibilityMetric {
+
+  private long visLG, visBlock;
+  private double visLGPer, visBlockPer;
+  private String visibility;
+
+  public VisibilityMetric(String visibility, long visLG, double visLGPer, long visBlock, double visBlockPer) {
+    this.visibility = visibility;
+    this.visLG = visLG;
+    this.visLGPer = visLGPer;
+    this.visBlock = visBlock;
+    this.visBlockPer = visBlockPer;
+  }
+
+  /**
+   * @return the visibility
+   */
+  public String getVisibility() {
+    return visibility;
+  }
+
+  /**
+   * @return the visLG
+   */
+  public long getVisLG() {
+    return visLG;
+  }
+
+  /**
+   * @return the visBlock
+   */
+  public long getVisBlock() {
+    return visBlock;
+  }
+
+  /**
+   * @return the visLGPer
+   */
+  public double getVisLGPer() {
+    return visLGPer;
+  }
+
+  /**
+   * @return the visBlockPer
+   */
+  public double getVisBlockPer() {
+    return visBlockPer;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
new file mode 100644
index 0000000..e66210b
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.google.common.util.concurrent.AtomicLongMap;
+
+/**
+ *
+ */
+public class RFileMetricsTest {
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  static {
+    Logger.getLogger(org.apache.hadoop.io.compress.CodecPool.class).setLevel(Level.OFF);
+    Logger.getLogger(org.apache.hadoop.util.NativeCodeLoader.class).setLevel(Level.OFF);
+  }
+
+  public static class TestRFile extends RFileTest.TestRFile {
+
+    public TestRFile(AccumuloConfiguration accumuloConfiguration) {
+      super(accumuloConfiguration);
+    }
+
+    public VisMetricsGatherer gatherMetrics() throws IOException {
+      VisMetricsGatherer vmg = new VisMetricsGatherer();
+      reader.registerMetrics(vmg);
+      Map<String,ArrayList<ByteSequence>> localityGroupCF = reader.getLocalityGroupCF();
+
+      for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) {
+
+        reader.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
+        while (reader.hasTop()) {
+          reader.next();
+        }
+      }
+      return vmg;
+    }
+  }
+
+  public AccumuloConfiguration conf = null;
+
+  @Test
+  public void emptyFile() throws IOException {
+
+    // test an empty file
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter();
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    Map<String,AtomicLongMap<String>> metrics = vmg.metric;
+    Map<String,AtomicLongMap<String>> blocks = vmg.blocks;
+    assertEquals(0, metrics.size());
+
+    assertEquals(0, blocks.size());
+
+    trf.closeReader();
+  }
+
+  @Test
+  public void oneEntryDefaultLocGroup() throws IOException {
+
+    // test an rfile with one entry in the default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter();
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get(null);
+    AtomicLongMap<String> blocks = vmg.blocks.get(null);
+    assertEquals(1, metrics.get("L1"));
+
+    assertEquals(1, blocks.get("L1"));
+
+    assertEquals(1, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+    trf.closeReader();
+  }
+
+  @Test
+  public void twoEntriesDefaultLocGroup() throws IOException {
+
+    // test an rfile with two entries in the default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter();
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo"));
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get(null);
+    AtomicLongMap<String> blocks = vmg.blocks.get(null);
+    assertEquals(1, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(1, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+    trf.closeReader();
+
+  }
+
+  @Test
+  public void oneEntryNonDefaultLocGroup() throws IOException {
+
+    // test an rfile with two entries in a non-default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter(false);
+    Set<ByteSequence> lg1 = new HashSet<>();
+    lg1.add(new ArrayByteSequence("cf1"));
+
+    trf.writer.startNewLocalityGroup("lg1", lg1);
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+    AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+    assertEquals(1, metrics.get("L1"));
+
+    assertEquals(1, blocks.get("L1"));
+
+    assertEquals(1, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+    trf.closeReader();
+
+  }
+
+  @Test
+  public void twoEntryNonDefaultLocGroup() throws IOException {
+
+    // test an rfile with two entries in a non-default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter(false);
+    Set<ByteSequence> lg1 = new HashSet<>();
+    lg1.add(new ArrayByteSequence("cf1"));
+
+    trf.writer.startNewLocalityGroup("lg1", lg1);
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo"));
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+    AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+    assertEquals(1, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(1, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+    trf.closeReader();
+
+  }
+
+  @Test
+  public void twoNonDefaultLocGroups() throws IOException {
+
+    // test an rfile with two entries in 2 non-default locality groups
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter(false);
+    Set<ByteSequence> lg1 = new HashSet<>();
+    lg1.add(new ArrayByteSequence("cf1"));
+
+    trf.writer.startNewLocalityGroup("lg1", lg1);
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo"));
+
+    Set<ByteSequence> lg2 = new HashSet<>();
+    lg2.add(new ArrayByteSequence("cf2"));
+
+    trf.writer.startNewLocalityGroup("lg2", lg2);
+    trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "L2", 55), RFileTest.nv("foo"));
+
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+    AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+    assertEquals(1, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(1, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+    metrics = vmg.metric.get("lg2");
+    blocks = vmg.blocks.get("lg2");
+    assertEquals(1, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(1, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg2")).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg2")).longValue());
+
+    trf.closeReader();
+
+  }
+
+  @Test
+  public void nonDefaultAndDefaultLocGroup() throws IOException {
+
+    // test an rfile with 3 entries in a non-default locality group and the default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter(false);
+    Set<ByteSequence> lg1 = new HashSet<>();
+    lg1.add(new ArrayByteSequence("cf1"));
+
+    trf.writer.startNewLocalityGroup("lg1", lg1);
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L2", 55), RFileTest.nv("foo"));
+
+    trf.writer.startDefaultLocalityGroup();
+    trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo"));
+
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+    AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+    assertEquals(2, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(1, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(3, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+    metrics = vmg.metric.get(null);
+    blocks = vmg.blocks.get(null);
+    assertEquals(1, metrics.get("A"));
+    assertEquals(1, metrics.get("B"));
+
+    assertEquals(1, blocks.get("A"));
+    assertEquals(1, blocks.get("B"));
+
+    assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+    trf.closeReader();
+
+  }
+
+  @Test
+  public void multiCFNonDefaultAndDefaultLocGroup() throws IOException {
+
+    // test an rfile with multiple column families in a non-default locality group and the default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter(false);
+    Set<ByteSequence> lg1 = new HashSet<>();
+    lg1.add(new ArrayByteSequence("cf1"));
+    lg1.add(new ArrayByteSequence("cf3"));
+
+    trf.writer.startNewLocalityGroup("lg1", lg1);
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo"));
+
+    trf.writer.startDefaultLocalityGroup();
+    trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "A", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "B", 55), RFileTest.nv("foo"));
+
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+    AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+    assertEquals(3, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(1, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+    metrics = vmg.metric.get(null);
+    blocks = vmg.blocks.get(null);
+    assertEquals(2, metrics.get("A"));
+    assertEquals(2, metrics.get("B"));
+
+    assertEquals(1, blocks.get("A"));
+    assertEquals(1, blocks.get("B"));
+
+    assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+    assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+    trf.closeReader();
+
+  }
+
+  @Test
+  public void multiBlockDefaultLocGroup() throws IOException {
+
+    // test an rfile with four blocks in the default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter(20);// Each entry is a block
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo"));
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get(null);
+    AtomicLongMap<String> blocks = vmg.blocks.get(null);
+    assertEquals(3, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(3, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+    assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+    trf.closeReader();
+
+  }
+
+  @Test
+  public void multiBlockNonDefaultLocGroup() throws IOException {
+
+    // test an rfile with four blocks in a non-default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter(false, 20);// Each entry is a block
+    Set<ByteSequence> lg1 = new HashSet<>();
+    lg1.add(new ArrayByteSequence("cf1"));
+    lg1.add(new ArrayByteSequence("cf3"));
+
+    trf.writer.startNewLocalityGroup("lg1", lg1);
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo"));
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+    AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+    assertEquals(3, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(3, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+    assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+    trf.closeReader();
+
+  }
+
+  @Test
+  public void multiBlockMultiCFNonDefaultAndDefaultLocGroup() throws IOException {
+
+    // test an rfile with multiple column families and multiple blocks in a non-default locality group and the default locality group
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter(false, 20);// Each entry is a block
+    Set<ByteSequence> lg1 = new HashSet<>();
+    lg1.add(new ArrayByteSequence("cf1"));
+    lg1.add(new ArrayByteSequence("cf3"));
+
+    trf.writer.startNewLocalityGroup("lg1", lg1);
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo"));
+
+    trf.writer.startDefaultLocalityGroup();
+    trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "A", 55), RFileTest.nv("foo"));
+    trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "B", 55), RFileTest.nv("foo"));
+
+    trf.closeWriter();
+
+    trf.openReader(false);
+
+    VisMetricsGatherer vmg = trf.gatherMetrics();
+
+    AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+    AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+    assertEquals(3, metrics.get("L1"));
+    assertEquals(1, metrics.get("L2"));
+
+    assertEquals(3, blocks.get("L1"));
+    assertEquals(1, blocks.get("L2"));
+
+    assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+    assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+    metrics = vmg.metric.get(null);
+    blocks = vmg.blocks.get(null);
+    assertEquals(2, metrics.get("A"));
+    assertEquals(2, metrics.get("B"));
+
+    assertEquals(2, blocks.get("A"));
+    assertEquals(2, blocks.get("B"));
+
+    assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+    assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+    trf.closeReader();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 1a83f33..eafadc0 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -169,13 +169,13 @@ public class RFileTest {
 
   public static class TestRFile {
 
-    private Configuration conf = CachedConfiguration.getInstance();
+    protected Configuration conf = CachedConfiguration.getInstance();
     public RFile.Writer writer;
-    private ByteArrayOutputStream baos;
-    private FSDataOutputStream dos;
-    private SeekableByteArrayInputStream bais;
-    private FSDataInputStream in;
-    private AccumuloConfiguration accumuloConfiguration;
+    protected ByteArrayOutputStream baos;
+    protected FSDataOutputStream dos;
+    protected SeekableByteArrayInputStream bais;
+    protected FSDataInputStream in;
+    protected AccumuloConfiguration accumuloConfiguration;
     public Reader reader;
     public SortedKeyValueIterator<Key,Value> iter;
 
@@ -186,18 +186,25 @@ public class RFileTest {
     }
 
     public void openWriter(boolean startDLG) throws IOException {
+      openWriter(startDLG, 1000);
+    }
 
+    public void openWriter(boolean startDLG, int blockSize) throws IOException {
       baos = new ByteArrayOutputStream();
       dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
       CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration);
-      writer = new RFile.Writer(_cbw, 1000, 1000);
+      writer = new RFile.Writer(_cbw, blockSize, 1000);
 
       if (startDLG)
         writer.startDefaultLocalityGroup();
     }
 
     public void openWriter() throws IOException {
-      openWriter(true);
+      openWriter(true, 1000);
+    }
+
+    public void openWriter(int blockSize) throws IOException {
+      openWriter(true, blockSize);
     }
 
     public void closeWriter() throws IOException {
@@ -210,6 +217,10 @@ public class RFileTest {
     }
 
     public void openReader() throws IOException {
+      openReader(true);
+    }
+
+    public void openReader(boolean cfsi) throws IOException {
 
       int fileLength = 0;
       byte[] data = null;
@@ -224,7 +235,8 @@ public class RFileTest {
 
       CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration());
       reader = new RFile.Reader(_cbr);
-      iter = new ColumnFamilySkippingIterator(reader);
+      if (cfsi)
+        iter = new ColumnFamilySkippingIterator(reader);
 
       checkIndex(reader);
     }