You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/02/05 18:42:32 UTC
accumulo git commit: ACCUMULO-3420 Metrics Gathering Object Added
Repository: accumulo
Updated Branches:
refs/heads/master 6cbe886e3 -> 8c2294df8
ACCUMULO-3420 Metrics Gathering Object Added
Signed-off-by: Christopher Tubbs <ct...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8c2294df
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8c2294df
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8c2294df
Branch: refs/heads/master
Commit: 8c2294df8a3e602f1571861196ff92fddeecf350
Parents: 6cbe886
Author: Jenna Huston <je...@gmail.com>
Authored: Wed Dec 17 11:47:51 2014 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Feb 5 12:23:05 2015 -0500
----------------------------------------------------------------------
.../core/file/rfile/MetricsGatherer.java | 87 ++++
.../accumulo/core/file/rfile/PrintInfo.java | 51 +-
.../apache/accumulo/core/file/rfile/RFile.java | 50 +-
.../core/file/rfile/VisMetricsGatherer.java | 172 +++++++
.../core/file/rfile/VisibilityMetric.java | 73 +++
.../core/file/rfile/RFileMetricsTest.java | 515 +++++++++++++++++++
.../accumulo/core/file/rfile/RFileTest.java | 30 +-
7 files changed, 953 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java
new file mode 100644
index 0000000..bfda9aa
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MetricsGatherer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Interface used to gather metrics from RFiles.
+ *
+ * @param <T>
+ * Type used to return metrics in getMetrics(). This does not impact collection of metrics at all, is only used in that method.
+ */
+public interface MetricsGatherer<T> {
+
+ /**
+ * Initialize the gatherer when it is registered with the RFile Reader
+ *
+ * @param cf
+ * Map of the LocalityGroup names to their column families
+ */
+ void init(Map<String,ArrayList<ByteSequence>> cf);
+
+ /**
+ * Start a new LocalityGroup. This method is used when the RFile seeks to the next LocalityGroup.
+ *
+ * @param cf
+ * Text object of the column family of the first entry in the locality group
+ */
+ void startLocalityGroup(Text cf);
+
+ /**
+ * Collect and store metrics for the given entry.
+ *
+ * @param key
+ * Key object of the entry you are collecting metrics from
+ *
+ * @param val
+ * Value object of the entry you are collecting metrics from
+ *
+ */
+ void addMetric(Key key, Value val);
+
+ /**
+ * Start a new block within a LocalityGroup. This method is used when the RFile moves on the the next block in the LocalityGroup.
+ */
+ void startBlock();
+
+ /**
+ * Print the results of the metrics gathering by locality group in the format: Metric name Number of keys Percentage of keys Number of blocks Percentage of
+ * blocks
+ *
+ * @param hash
+ * Boolean to determine whether the values being printed should be hashed
+ * @param metricWord
+ * String of the name of the metric that was collected
+ * @param out
+ * PrintStream of where the information should be written to
+ */
+ void printMetrics(boolean hash, String metricWord, PrintStream out);
+
+ /**
+ * @return the metrics gathered
+ */
+ T getMetrics();
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index 9ff1dd2..591d477 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -18,6 +18,8 @@ package org.apache.accumulo.core.file.rfile;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -47,6 +49,10 @@ public class PrintInfo implements KeywordExecutable {
static class Opts extends Help {
@Parameter(names = {"-d", "--dump"}, description = "dump the key/value pairs")
boolean dump = false;
+ @Parameter(names = {"-v", "--vis"}, description = "show visibility metrics")
+ boolean vis = false;
+ @Parameter(names = {"--visHash"}, description = "show visibilities as hashes, implies -v")
+ boolean hash = false;
@Parameter(names = {"--histogram"}, description = "print a histogram of the key-value sizes")
boolean histogram = false;
@Parameter(description = " <file> { <file> ... }")
@@ -98,29 +104,46 @@ public class PrintInfo implements KeywordExecutable {
CachableBlockFile.Reader _rdr = new CachableBlockFile.Reader(fs, path, conf, null, null, aconf);
Reader iter = new RFile.Reader(_rdr);
+ MetricsGatherer<Map<String, ArrayList<VisibilityMetric>>> vmg = new VisMetricsGatherer();
+
+ if (opts.vis || opts.hash)
+ iter.registerMetrics(vmg);
iter.printInfo();
System.out.println();
org.apache.accumulo.core.file.rfile.bcfile.PrintInfo.main(new String[] {arg});
- if (opts.histogram || opts.dump) {
- iter.seek(new Range((Key) null, (Key) null), new ArrayList<ByteSequence>(), false);
- while (iter.hasTop()) {
- Key key = iter.getTopKey();
- Value value = iter.getTopValue();
- if (opts.dump)
- System.out.println(key + " -> " + value);
- if (opts.histogram) {
- long size = key.getSize() + value.getSize();
- int bucket = (int) Math.log10(size);
- countBuckets[bucket]++;
- sizeBuckets[bucket] += size;
- totalSize += size;
+ Map<String, ArrayList<ByteSequence>> localityGroupCF = null;
+
+ if (opts.histogram || opts.dump || opts.vis || opts.hash) {
+ localityGroupCF = iter.getLocalityGroupCF();
+
+ for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) {
+
+ iter.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
+ while (iter.hasTop()) {
+ Key key = iter.getTopKey();
+ Value value = iter.getTopValue();
+ if (opts.dump)
+ System.out.println(key + " -> " + value);
+ if (opts.histogram) {
+ long size = key.getSize() + value.getSize();
+ int bucket = (int) Math.log10(size);
+ countBuckets[bucket]++;
+ sizeBuckets[bucket] += size;
+ totalSize += size;
+ }
+ iter.next();
}
- iter.next();
}
}
+ System.out.println();
+
iter.close();
+
+ if (opts.vis || opts.hash)
+ vmg.printMetrics(opts.hash, "Visibility", System.out);
+
if (opts.histogram) {
System.out.println("Up to size count %-age");
for (int i = 1; i < countBuckets.length; i++) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 0b464d8..888924d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -554,6 +554,8 @@ public class RFile {
if (entriesLeft == 0) {
currBlock.close();
+ if (metricsGatherer != null)
+ metricsGatherer.startBlock();
if (iiter.hasNext()) {
IndexEntry indexEntry = iiter.next();
@@ -561,7 +563,7 @@ public class RFile {
currBlock = getDataBlock(indexEntry);
checkRange = range.afterEndKey(indexEntry.getKey());
- if (!checkRange)
+ if (!checkRange)
hasTop = true;
} else {
@@ -575,6 +577,10 @@ public class RFile {
prevKey = rk.getKey();
rk.readFields(currBlock);
val.readFields(currBlock);
+
+ if (metricsGatherer != null)
+ metricsGatherer.addMetric(rk.getKey(), val);
+
entriesLeft--;
if (checkRange)
hasTop = !range.afterEndKey(rk.getKey());
@@ -760,6 +766,11 @@ public class RFile {
while (hasTop() && range.beforeStartKey(getTopKey())) {
next();
}
+
+ if (metricsGatherer != null) {
+ metricsGatherer.startLocalityGroup(rk.getKey().getColumnFamily());
+ metricsGatherer.addMetric(rk.getKey(), val);
+ }
}
@Override
@@ -803,6 +814,12 @@ public class RFile {
public InterruptibleIterator getIterator() {
return this;
}
+
+ private MetricsGatherer<?> metricsGatherer;
+
+ public void registerMetrics(MetricsGatherer<?> vmg) {
+ metricsGatherer = vmg;
+ }
}
public static class Reader extends HeapIterator implements FileSKVIterator {
@@ -973,8 +990,38 @@ public class RFile {
}
+ public Map<String,ArrayList<ByteSequence>> getLocalityGroupCF() {
+ Map<String,ArrayList<ByteSequence>> cf = new HashMap<>();
+
+ for (LocalityGroupMetadata lcg : localityGroups) {
+ ArrayList<ByteSequence> setCF = new ArrayList<ByteSequence>();
+
+ for (Entry<ByteSequence,MutableLong> entry : lcg.columnFamilies.entrySet()) {
+ setCF.add(entry.getKey());
+ }
+
+ cf.put(lcg.name, setCF);
+ }
+
+ return cf;
+ }
+
private int numLGSeeked = 0;
+ /**
+ * Method that registers the given MetricsGatherer. You can only register one as it will clobber any previously set. The MetricsGatherer should be
+ * registered before iterating through the LocalityGroups.
+ *
+ * @param vmg
+ * MetricsGatherer to be registered with the LocalityGroupReaders
+ */
+ public void registerMetrics(MetricsGatherer<?> vmg) {
+ vmg.init(getLocalityGroupCF());
+ for (LocalityGroupReader lgr : lgReaders) {
+ lgr.registerMetrics(vmg);
+ }
+ }
+
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
@@ -999,7 +1046,6 @@ public class RFile {
for (LocalityGroupMetadata lgm : localityGroups) {
lgm.printInfo();
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java
new file mode 100644
index 0000000..6050e41
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisMetricsGatherer.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Charsets;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.AtomicLongMap;
+
+/**
+ * This class provides visibility metrics per locality group. The Map in getMetrics() maps the locality group name to an ArrayList of VisibilityMetric objects.
+ * These contain the components of a visibility metric; the visibility as a String, the number of times that is seen in a locality group, the percentage of keys
+ * that contain that visibility in the locality group, the number of blocks in the locality group that contain the visibility, and the percentage of blocks in
+ * the locality group that contain the visibility.
+ */
+public class VisMetricsGatherer implements MetricsGatherer<Map<String,ArrayList<VisibilityMetric>>> {
+
+ protected Map<String,AtomicLongMap<String>> metric;
+ protected Map<String,AtomicLongMap<String>> blocks;
+ protected ArrayList<Long> numEntries;
+ protected ArrayList<Integer> numBlocks;
+ private ArrayList<String> inBlock;
+ protected ArrayList<String> localityGroups;
+ private int numLG;
+ private Map<String,ArrayList<ByteSequence>> localityGroupCF;
+
+ public VisMetricsGatherer() {
+ metric = new HashMap<>();
+ blocks = new HashMap<>();
+ numEntries = new ArrayList<>();
+ numBlocks = new ArrayList<>();
+ inBlock = new ArrayList<>();
+ localityGroups = new ArrayList<>();
+ numLG = 0;
+ }
+
+ @Override
+ public void init(Map<String,ArrayList<ByteSequence>> cf) {
+ localityGroupCF = cf;
+ }
+
+ @Override
+ public void startLocalityGroup(Text oneCF) {
+ String name = null;
+ ByteSequence cf = new ArrayByteSequence(oneCF.toString());
+ for (Entry<String,ArrayList<ByteSequence>> entry : localityGroupCF.entrySet()) {
+ if (entry.getValue().contains(cf)) {
+ if (entry.getKey() == null)
+ name = null;
+ else
+ name = entry.getKey().toString();
+ break;
+ }
+ }
+ localityGroups.add(name);
+ metric.put(name, AtomicLongMap.create(new HashMap<String,Long>()));
+ blocks.put(name, AtomicLongMap.create(new HashMap<String,Long>()));
+ numLG++;
+ numEntries.add((long) 0);
+ numBlocks.add(0);
+ }
+
+ @Override
+ public void addMetric(Key key, Value val) {
+ String myMetric = key.getColumnVisibility().toString();
+ String currLG = localityGroups.get(numLG - 1);
+ if (metric.get(currLG).containsKey(myMetric)) {
+ metric.get(currLG).getAndIncrement(myMetric);
+ } else
+ metric.get(currLG).put(myMetric, 1);
+
+ numEntries.set(numLG - 1, numEntries.get(numLG - 1) + 1);
+
+ if (!inBlock.contains(myMetric) && blocks.get(currLG).containsKey(myMetric)) {
+ blocks.get(currLG).incrementAndGet(myMetric);
+ inBlock.add(myMetric);
+ } else if (!inBlock.contains(myMetric) && !blocks.get(currLG).containsKey(myMetric)) {
+ blocks.get(currLG).put(myMetric, 1);
+ inBlock.add(myMetric);
+ }
+
+ }
+
+ @Override
+ public void startBlock() {
+ inBlock.clear();
+ numBlocks.set(numLG - 1, numBlocks.get(numLG - 1) + 1);
+ }
+
+ @Override
+ public void printMetrics(boolean hash, String metricWord, PrintStream out) {
+ for (int i = 0; i < numLG; i++) {
+ String lGName = localityGroups.get(i);
+ out.print("Locality Group: ");
+ if (lGName == null)
+ out.println("<DEFAULT>");
+ else
+ out.println(localityGroups.get(i));
+ out.printf("%-27s", metricWord);
+ out.println("Number of keys" + "\t " + "Percent of keys" + "\t" + "Number of blocks" + "\t" + "Percent of blocks");
+ for (Entry<String,Long> entry : metric.get(lGName).asMap().entrySet()) {
+ HashFunction hf = Hashing.md5();
+ HashCode hc = hf.newHasher().putString(entry.getKey(), Charsets.UTF_8).hash();
+ if (hash)
+ out.printf("%-20s", hc.toString().substring(0, 8));
+ else
+ out.printf("%-20s", entry.getKey());
+ out.print("\t\t" + entry.getValue() + "\t\t\t");
+ out.printf("%.2f", ((double) entry.getValue() / numEntries.get(i)) * 100);
+ out.print("%\t\t\t");
+
+ long blocksIn = blocks.get(lGName).get(entry.getKey());
+
+ out.print(blocksIn + "\t\t ");
+ out.printf("%.2f", ((double) blocksIn / numBlocks.get(i)) * 100);
+ out.print("%");
+
+ out.println("");
+ }
+ out.println("Number of keys: " + numEntries.get(i));
+ out.println();
+ }
+ }
+
+ @Override
+ public Map<String,ArrayList<VisibilityMetric>> getMetrics() {
+ Map<String,ArrayList<VisibilityMetric>> getMetrics = new HashMap<>();
+ for (int i = 0; i < numLG; i++) {
+ String lGName = localityGroups.get(i);
+ ArrayList<VisibilityMetric> rows = new ArrayList<>();
+ for (Entry<String,Long> entry : metric.get(lGName).asMap().entrySet()) {
+ long vis = entry.getValue();
+ double visPer = ((double) entry.getValue() / numEntries.get(i)) * 100;
+
+ long blocksIn = blocks.get(lGName).get(entry.getKey());
+ double blocksPer = ((double) blocksIn / numBlocks.get(i)) * 100;
+
+ rows.add(new VisibilityMetric(entry.getKey(), vis, visPer, blocksIn, blocksPer));
+ }
+ getMetrics.put(lGName, rows);
+ }
+ return getMetrics;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java
new file mode 100644
index 0000000..ab7b1d7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/VisibilityMetric.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+/**
+ * Class that holds the components of a visibility metric. The String visibility, the number of times that is seen in a locality group, the percentage of keys
+ * that contain that visibility in the locality group, the number of blocks in the locality group that contain the visibility, and the percentage of blocks in
+ * the locality group that contain the visibility.
+ */
+public class VisibilityMetric {
+
+ private long visLG, visBlock;
+ private double visLGPer, visBlockPer;
+ private String visibility;
+
+ public VisibilityMetric(String visibility, long visLG, double visLGPer, long visBlock, double visBlockPer) {
+ this.visibility = visibility;
+ this.visLG = visLG;
+ this.visLGPer = visLGPer;
+ this.visBlock = visBlock;
+ this.visBlockPer = visBlockPer;
+ }
+
+ /**
+ * @return the visibility
+ */
+ public String getVisibility() {
+ return visibility;
+ }
+
+ /**
+ * @return the visLG
+ */
+ public long getVisLG() {
+ return visLG;
+ }
+
+ /**
+ * @return the visBlock
+ */
+ public long getVisBlock() {
+ return visBlock;
+ }
+
+ /**
+ * @return the visLGPer
+ */
+ public double getVisLGPer() {
+ return visLGPer;
+ }
+
+ /**
+ * @return the visBlockPer
+ */
+ public double getVisBlockPer() {
+ return visBlockPer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
new file mode 100644
index 0000000..e66210b
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.google.common.util.concurrent.AtomicLongMap;
+
+/**
+ *
+ */
+public class RFileMetricsTest {
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+ static {
+ Logger.getLogger(org.apache.hadoop.io.compress.CodecPool.class).setLevel(Level.OFF);
+ Logger.getLogger(org.apache.hadoop.util.NativeCodeLoader.class).setLevel(Level.OFF);
+ }
+
+ public static class TestRFile extends RFileTest.TestRFile {
+
+ public TestRFile(AccumuloConfiguration accumuloConfiguration) {
+ super(accumuloConfiguration);
+ }
+
+ public VisMetricsGatherer gatherMetrics() throws IOException {
+ VisMetricsGatherer vmg = new VisMetricsGatherer();
+ reader.registerMetrics(vmg);
+ Map<String,ArrayList<ByteSequence>> localityGroupCF = reader.getLocalityGroupCF();
+
+ for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) {
+
+ reader.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
+ while (reader.hasTop()) {
+ reader.next();
+ }
+ }
+ return vmg;
+ }
+ }
+
+ public AccumuloConfiguration conf = null;
+
+ @Test
+ public void emptyFile() throws IOException {
+
+ // test an empty file
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter();
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ Map<String,AtomicLongMap<String>> metrics = vmg.metric;
+ Map<String,AtomicLongMap<String>> blocks = vmg.blocks;
+ assertEquals(0, metrics.size());
+
+ assertEquals(0, blocks.size());
+
+ trf.closeReader();
+ }
+
+ @Test
+ public void oneEntryDefaultLocGroup() throws IOException {
+
+ // test an rfile with one entry in the default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter();
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get(null);
+ AtomicLongMap<String> blocks = vmg.blocks.get(null);
+ assertEquals(1, metrics.get("L1"));
+
+ assertEquals(1, blocks.get("L1"));
+
+ assertEquals(1, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+ trf.closeReader();
+ }
+
+ @Test
+ public void twoEntriesDefaultLocGroup() throws IOException {
+
+ // test an rfile with two entries in the default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter();
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo"));
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get(null);
+ AtomicLongMap<String> blocks = vmg.blocks.get(null);
+ assertEquals(1, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(1, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+ trf.closeReader();
+
+ }
+
+ @Test
+ public void oneEntryNonDefaultLocGroup() throws IOException {
+
+ // test an rfile with two entries in a non-default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter(false);
+ Set<ByteSequence> lg1 = new HashSet<>();
+ lg1.add(new ArrayByteSequence("cf1"));
+
+ trf.writer.startNewLocalityGroup("lg1", lg1);
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+ AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+ assertEquals(1, metrics.get("L1"));
+
+ assertEquals(1, blocks.get("L1"));
+
+ assertEquals(1, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+ trf.closeReader();
+
+ }
+
+ @Test
+ public void twoEntryNonDefaultLocGroup() throws IOException {
+
+ // test an rfile with two entries in a non-default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter(false);
+ Set<ByteSequence> lg1 = new HashSet<>();
+ lg1.add(new ArrayByteSequence("cf1"));
+
+ trf.writer.startNewLocalityGroup("lg1", lg1);
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo"));
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+ AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+ assertEquals(1, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(1, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+ trf.closeReader();
+
+ }
+
+ @Test
+ public void twoNonDefaultLocGroups() throws IOException {
+
+ // test an rfile with two entries in 2 non-default locality groups
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter(false);
+ Set<ByteSequence> lg1 = new HashSet<>();
+ lg1.add(new ArrayByteSequence("cf1"));
+
+ trf.writer.startNewLocalityGroup("lg1", lg1);
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L2", 55), RFileTest.nv("foo"));
+
+ Set<ByteSequence> lg2 = new HashSet<>();
+ lg2.add(new ArrayByteSequence("cf2"));
+
+ trf.writer.startNewLocalityGroup("lg2", lg2);
+ trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "L2", 55), RFileTest.nv("foo"));
+
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+ AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+ assertEquals(1, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(1, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+ metrics = vmg.metric.get("lg2");
+ blocks = vmg.blocks.get("lg2");
+ assertEquals(1, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(1, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf("lg2")).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg2")).longValue());
+
+ trf.closeReader();
+
+ }
+
+ @Test
+ public void nonDefaultAndDefaultLocGroup() throws IOException {
+
+ // test an rfile with 3 entries in a non-default locality group and the default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter(false);
+ Set<ByteSequence> lg1 = new HashSet<>();
+ lg1.add(new ArrayByteSequence("cf1"));
+
+ trf.writer.startNewLocalityGroup("lg1", lg1);
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L2", 55), RFileTest.nv("foo"));
+
+ trf.writer.startDefaultLocalityGroup();
+ trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo"));
+
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+ AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+ assertEquals(2, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(1, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(3, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+ metrics = vmg.metric.get(null);
+ blocks = vmg.blocks.get(null);
+ assertEquals(1, metrics.get("A"));
+ assertEquals(1, metrics.get("B"));
+
+ assertEquals(1, blocks.get("A"));
+ assertEquals(1, blocks.get("B"));
+
+ assertEquals(2, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+ trf.closeReader();
+
+ }
+
+ @Test
+ public void multiCFNonDefaultAndDefaultLocGroup() throws IOException {
+
+ // test an rfile with multiple column families in a non-default locality group and the default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter(false);
+ Set<ByteSequence> lg1 = new HashSet<>();
+ lg1.add(new ArrayByteSequence("cf1"));
+ lg1.add(new ArrayByteSequence("cf3"));
+
+ trf.writer.startNewLocalityGroup("lg1", lg1);
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo"));
+
+ trf.writer.startDefaultLocalityGroup();
+ trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "A", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "B", 55), RFileTest.nv("foo"));
+
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+ AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+ assertEquals(3, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(1, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+ metrics = vmg.metric.get(null);
+ blocks = vmg.blocks.get(null);
+ assertEquals(2, metrics.get("A"));
+ assertEquals(2, metrics.get("B"));
+
+ assertEquals(1, blocks.get("A"));
+ assertEquals(1, blocks.get("B"));
+
+ assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+ assertEquals(1, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+ trf.closeReader();
+
+ }
+
+ @Test
+ public void multiBlockDefaultLocGroup() throws IOException {
+
+ // test an rfile with four blocks in the default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter(20);// Each entry is a block
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo"));
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get(null);
+ AtomicLongMap<String> blocks = vmg.blocks.get(null);
+ assertEquals(3, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(3, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+ assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+ trf.closeReader();
+
+ }
+
+ @Test
+ public void multiBlockNonDefaultLocGroup() throws IOException {
+
+ // test an rfile with four blocks in a non-default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter(false, 20);// Each entry is a block
+ Set<ByteSequence> lg1 = new HashSet<>();
+ lg1.add(new ArrayByteSequence("cf1"));
+ lg1.add(new ArrayByteSequence("cf3"));
+
+ trf.writer.startNewLocalityGroup("lg1", lg1);
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo"));
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+ AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+ assertEquals(3, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(3, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+ assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+ trf.closeReader();
+
+ }
+
+ @Test
+ public void multiBlockMultiCFNonDefaultAndDefaultLocGroup() throws IOException {
+
+ // test an rfile with multiple column families and multiple blocks in a non-default locality group and the default locality group
+
+ TestRFile trf = new TestRFile(conf);
+
+ trf.openWriter(false, 20);// Each entry is a block
+ Set<ByteSequence> lg1 = new HashSet<>();
+ lg1.add(new ArrayByteSequence("cf1"));
+ lg1.add(new ArrayByteSequence("cf3"));
+
+ trf.writer.startNewLocalityGroup("lg1", lg1);
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf1", "cq2", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf3", "cq1", "L1", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf3", "cq2", "L2", 55), RFileTest.nv("foo"));
+
+ trf.writer.startDefaultLocalityGroup();
+ trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "A", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf2", "cq1", "B", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "A", 55), RFileTest.nv("foo"));
+ trf.writer.append(RFileTest.nk("r1", "cf4", "cq1", "B", 55), RFileTest.nv("foo"));
+
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ AtomicLongMap<String> metrics = vmg.metric.get("lg1");
+ AtomicLongMap<String> blocks = vmg.blocks.get("lg1");
+ assertEquals(3, metrics.get("L1"));
+ assertEquals(1, metrics.get("L2"));
+
+ assertEquals(3, blocks.get("L1"));
+ assertEquals(1, blocks.get("L2"));
+
+ assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf("lg1")).longValue());
+ assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf("lg1")).longValue());
+
+ metrics = vmg.metric.get(null);
+ blocks = vmg.blocks.get(null);
+ assertEquals(2, metrics.get("A"));
+ assertEquals(2, metrics.get("B"));
+
+ assertEquals(2, blocks.get("A"));
+ assertEquals(2, blocks.get("B"));
+
+ assertEquals(4, vmg.numEntries.get(vmg.localityGroups.indexOf(null)).longValue());
+ assertEquals(4, vmg.numBlocks.get(vmg.localityGroups.indexOf(null)).longValue());
+
+ trf.closeReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8c2294df/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 1a83f33..eafadc0 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -169,13 +169,13 @@ public class RFileTest {
public static class TestRFile {
- private Configuration conf = CachedConfiguration.getInstance();
+ protected Configuration conf = CachedConfiguration.getInstance();
public RFile.Writer writer;
- private ByteArrayOutputStream baos;
- private FSDataOutputStream dos;
- private SeekableByteArrayInputStream bais;
- private FSDataInputStream in;
- private AccumuloConfiguration accumuloConfiguration;
+ protected ByteArrayOutputStream baos;
+ protected FSDataOutputStream dos;
+ protected SeekableByteArrayInputStream bais;
+ protected FSDataInputStream in;
+ protected AccumuloConfiguration accumuloConfiguration;
public Reader reader;
public SortedKeyValueIterator<Key,Value> iter;
@@ -186,18 +186,25 @@ public class RFileTest {
}
public void openWriter(boolean startDLG) throws IOException {
+ openWriter(startDLG, 1000);
+ }
+ public void openWriter(boolean startDLG, int blockSize) throws IOException {
baos = new ByteArrayOutputStream();
dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration);
- writer = new RFile.Writer(_cbw, 1000, 1000);
+ writer = new RFile.Writer(_cbw, blockSize, 1000);
if (startDLG)
writer.startDefaultLocalityGroup();
}
public void openWriter() throws IOException {
- openWriter(true);
+ openWriter(true, 1000);
+ }
+
+ public void openWriter(int blockSize) throws IOException {
+ openWriter(true, blockSize);
}
public void closeWriter() throws IOException {
@@ -210,6 +217,10 @@ public class RFileTest {
}
public void openReader() throws IOException {
+ openReader(true);
+ }
+
+ public void openReader(boolean cfsi) throws IOException {
int fileLength = 0;
byte[] data = null;
@@ -224,7 +235,8 @@ public class RFileTest {
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration());
reader = new RFile.Reader(_cbr);
- iter = new ColumnFamilySkippingIterator(reader);
+ if (cfsi)
+ iter = new ColumnFamilySkippingIterator(reader);
checkIndex(reader);
}