You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/03/07 22:09:04 UTC
accumulo git commit: ACCUMULO-4597 fixed bug in rfile-info
Repository: accumulo
Updated Branches:
refs/heads/1.7 c3a0d1d5e -> de80cf53b
ACCUMULO-4597 fixed bug in rfile-info
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/de80cf53
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/de80cf53
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/de80cf53
Branch: refs/heads/1.7
Commit: de80cf53b66f58a9a99d8a9dd54b5a716fa8df44
Parents: c3a0d1d
Author: Keith Turner <kt...@apache.org>
Authored: Mon Mar 6 18:48:29 2017 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Mar 6 18:48:29 2017 -0500
----------------------------------------------------------------------
.../accumulo/core/file/rfile/PrintInfo.java | 7 +-
.../apache/accumulo/core/file/rfile/RFile.java | 18 +++++-
.../accumulo/core/util/LocalityGroupUtil.java | 34 ++++++++++
.../core/file/rfile/RFileMetricsTest.java | 68 ++++++++++++++++++--
4 files changed, 116 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de80cf53/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index 0f2a935..366e4a8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.core.file.rfile;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -30,6 +29,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -119,9 +119,8 @@ public class PrintInfo implements KeywordExecutable {
if (opts.histogram || opts.dump || opts.vis || opts.hash) {
localityGroupCF = iter.getLocalityGroupCF();
- for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) {
-
- iter.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
+ for (String lgName : localityGroupCF.keySet()) {
+ LocalityGroupUtil.seek(iter, new Range(), lgName, localityGroupCF);
while (iter.hasTop()) {
Key key = iter.getTopKey();
Value value = iter.getTopValue();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de80cf53/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 851226c..bab2266 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -62,6 +62,7 @@ import org.apache.accumulo.core.iterators.system.HeapIterator;
import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.MutableByteSequence;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.math.stat.descriptive.SummaryStatistics;
@@ -69,6 +70,8 @@ import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
public class RFile {
public static final String EXTENSION = "rf";
@@ -1015,14 +1018,23 @@ public class RFile {
}
+ /**
+ * @return map of locality group names to column families. The default locality group will have {@code null} for a name. RFile will only track up to
+ * {@value Writer#MAX_CF_IN_DLG} families for the default locality group. After this it will stop tracking. For the case where the default group has
+ * more thn {@value Writer#MAX_CF_IN_DLG} families an empty list of families is returned.
+ * @see LocalityGroupUtil#seek(Reader, Range, String, Map)
+ */
public Map<String,ArrayList<ByteSequence>> getLocalityGroupCF() {
Map<String,ArrayList<ByteSequence>> cf = new HashMap<>();
for (LocalityGroupMetadata lcg : localityGroups) {
- ArrayList<ByteSequence> setCF = new ArrayList<>();
+ ArrayList<ByteSequence> setCF;
- for (Entry<ByteSequence,MutableLong> entry : lcg.columnFamilies.entrySet()) {
- setCF.add(entry.getKey());
+ if (lcg.columnFamilies == null) {
+ Preconditions.checkState(lcg.isDefaultLG, " Group %s has null families. Only expect default locality group to have null families.", lcg.name);
+ setCF = new ArrayList<>();
+ } else {
+ setCF = new ArrayList<>(lcg.columnFamilies.keySet());
}
cf.put(lcg.name, setCF);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de80cf53/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index 5d06e69..a99fe01 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.core.util;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -36,7 +37,9 @@ import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.io.Text;
@@ -294,4 +297,35 @@ public class LocalityGroupUtil {
}
}
+ /**
+ * This method created to help seek an rfile for a locality group obtained from {@link Reader#getLocalityGroupCF()}. This method can possibly return an empty
+ * list for the default locality group. When this happens the default locality group needs to be seeked differently. This method helps do that.
+ *
+ * <p>
+ * For the default locality group will seek using the families of all other locality groups non-inclusive.
+ *
+ * @see Reader#getLocalityGroupCF()
+ */
+ public static void seek(Reader reader, Range range, String lgName, Map<String,ArrayList<ByteSequence>> localityGroupCF) throws IOException {
+
+ Collection<ByteSequence> families;
+ boolean inclusive;
+ if (lgName == null) {
+ // this is the default locality group, create a set of all families not in the default group
+ Set<ByteSequence> nonDefaultFamilies = new HashSet<>();
+ for (Entry<String,ArrayList<ByteSequence>> entry : localityGroupCF.entrySet()) {
+ if (entry.getKey() != null) {
+ nonDefaultFamilies.addAll(entry.getValue());
+ }
+ }
+
+ families = nonDefaultFamilies;
+ inclusive = false;
+ } else {
+ families = localityGroupCF.get(lgName);
+ inclusive = true;
+ }
+
+ reader.seek(range, families, inclusive);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de80cf53/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
index 89a63d1..7cb0ed6 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
@@ -21,15 +21,17 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
@@ -93,9 +95,8 @@ public class RFileMetricsTest {
reader.registerMetrics(vmg);
Map<String,ArrayList<ByteSequence>> localityGroupCF = reader.getLocalityGroupCF();
- for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) {
-
- reader.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
+ for (String lgName : localityGroupCF.keySet()) {
+ LocalityGroupUtil.seek(reader, new Range(), lgName, localityGroupCF);
while (reader.hasTop()) {
reader.next();
}
@@ -507,4 +508,63 @@ public class RFileMetricsTest {
trf.closeReader();
}
+ @Test
+ public void testManyFamiliesInDefaultLocGroup() throws IOException {
+ trf.openWriter(false, 1024);
+
+ String fam1 = String.format("%06x", 9000);
+ String fam2 = String.format("%06x", 9001);
+
+ Set<ByteSequence> lg1 = new HashSet<>();
+ lg1.add(new ArrayByteSequence(fam1));
+ lg1.add(new ArrayByteSequence(fam2));
+
+ trf.writer.startNewLocalityGroup("lg1", lg1);
+
+ for (int row = 0; row < 1100; row++) {
+ String rs = String.format("%06x", row);
+ trf.writer.append(new Key(rs, fam1, "q4", "A", 42l), new Value("v".getBytes()));
+ trf.writer.append(new Key(rs, fam2, "q4", "A|B", 42l), new Value("v".getBytes()));
+ }
+
+ trf.writer.startDefaultLocalityGroup();
+
+ String vis[] = new String[] {"A", "A&B", "A|C", "B&C", "Boo"};
+
+ int fam = 0;
+ for (int row = 0; row < 1000; row++) {
+ String rs = String.format("%06x", row);
+ for (int v = 0; v < 5; v++) {
+ String fs = String.format("%06x", fam++);
+ trf.writer.append(new Key(rs, fs, "q4", vis[v], 42l), new Value("v".getBytes()));
+ }
+ }
+
+ trf.closeWriter();
+
+ trf.openReader(false);
+
+ VisMetricsGatherer vmg = trf.gatherMetrics();
+
+ Map<String,Long> expected = new HashMap<>();
+ Map<String,Long> expectedBlocks = new HashMap<>();
+ for (String v : vis) {
+ expected.put(v, 1000l);
+ expectedBlocks.put(v, 67l);
+ }
+ assertEquals(expected, vmg.metric.get(null).asMap());
+ assertEquals(expectedBlocks, vmg.blocks.get(null).asMap());
+
+ expected.clear();
+ expectedBlocks.clear();
+ expected.put("A", 1100l);
+ expected.put("A|B", 1100l);
+ expectedBlocks.put("A", 32l);
+ expectedBlocks.put("A|B", 32l);
+ assertEquals(expected, vmg.metric.get("lg1").asMap());
+ assertEquals(expectedBlocks, vmg.blocks.get("lg1").asMap());
+
+ assertEquals(2, vmg.metric.keySet().size());
+ assertEquals(2, vmg.blocks.keySet().size());
+ }
}