You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/31 21:03:39 UTC
git commit: ACCUMULO-112 added locality group support to in memory map
Updated Branches:
refs/heads/master 4313860c4 -> 0608e32f8
ACCUMULO-112 added locality group support to in memory map
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0608e32f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0608e32f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0608e32f
Branch: refs/heads/master
Commit: 0608e32f8b09f926e40677d1e23ccedd0d6e088d
Parents: 4313860
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jul 31 14:39:49 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Wed Jul 31 14:54:28 2013 -0400
----------------------------------------------------------------------
.../apache/accumulo/core/file/rfile/RFile.java | 128 +++----------
.../accumulo/core/file/rfile/RelativeKey.java | 87 +++------
.../iterators/system/LocalityGroupIterator.java | 177 +++++++++++++++++
.../accumulo/core/util/LocalityGroupUtil.java | 115 +++++++++++
.../accumulo/core/util/MutableByteSequence.java | 46 +++++
.../core/file/rfile/RelativeKeyTest.java | 8 +-
.../accumulo/core/util/PartitionerTest.java | 124 ++++++++++++
.../server/tabletserver/InMemoryMap.java | 177 +++++++++++++++--
.../accumulo/server/tabletserver/Tablet.java | 13 +-
.../server/tabletserver/InMemoryMapTest.java | 144 ++++++++++++++
.../apache/accumulo/test/IMMLGBenchmark.java | 190 +++++++++++++++++++
11 files changed, 1033 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index fe21f02..d6a2532 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -53,13 +53,17 @@ import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
-import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.HeapIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.util.MutableByteSequence;
+import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
@@ -77,24 +81,12 @@ public class RFile {
// static final int RINDEX_VER_5 = 5; // unreleased
static final int RINDEX_VER_4 = 4;
static final int RINDEX_VER_3 = 3;
-
- private static class Count {
- public Count(int i) {
- this.count = i;
- }
-
- public Count(long count) {
- this.count = count;
- }
- long count;
- }
-
private static class LocalityGroupMetadata implements Writable {
private int startBlock;
private Key firstKey;
- private Map<ByteSequence,Count> columnFamilies;
+ private Map<ByteSequence,MutableLong> columnFamilies;
private boolean isDefaultLG = false;
private String name;
@@ -104,14 +96,14 @@ public class RFile {
private MultiLevelIndex.Reader indexReader;
public LocalityGroupMetadata(int version, BlockFileReader br) {
- columnFamilies = new HashMap<ByteSequence,Count>();
+ columnFamilies = new HashMap<ByteSequence,MutableLong>();
indexReader = new MultiLevelIndex.Reader(br, version);
}
public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
this.startBlock = nextBlock;
isDefaultLG = true;
- columnFamilies = new HashMap<ByteSequence,Count>();
+ columnFamilies = new HashMap<ByteSequence,MutableLong>();
previousColumnFamilies = pcf;
indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
@@ -121,9 +113,9 @@ public class RFile {
this.startBlock = nextBlock;
this.name = name;
isDefaultLG = false;
- columnFamilies = new HashMap<ByteSequence,Count>();
+ columnFamilies = new HashMap<ByteSequence,MutableLong>();
for (ByteSequence cf : cfset) {
- columnFamilies.put(cf, new Count(0));
+ columnFamilies.put(cf, new MutableLong(0));
}
indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
@@ -155,7 +147,7 @@ public class RFile {
}
ByteSequence cf = key.getColumnFamilyData();
- Count count = columnFamilies.get(cf);
+ MutableLong count = columnFamilies.get(cf);
if (count == null) {
if (!isDefaultLG) {
@@ -171,12 +163,12 @@ public class RFile {
columnFamilies = null;
return;
}
- count = new Count(0);
+ count = new MutableLong(0);
columnFamilies.put(new ArrayByteSequence(cf.getBackingArray(), cf.offset(), cf.length()), count);
}
- count.count++;
+ count.increment();
}
@@ -199,7 +191,7 @@ public class RFile {
columnFamilies = null;
} else {
if (columnFamilies == null)
- columnFamilies = new HashMap<ByteSequence,Count>();
+ columnFamilies = new HashMap<ByteSequence,MutableLong>();
else
columnFamilies.clear();
@@ -209,7 +201,7 @@ public class RFile {
in.readFully(cf);
long count = in.readLong();
- columnFamilies.put(new ArrayByteSequence(cf), new Count(count));
+ columnFamilies.put(new ArrayByteSequence(cf), new MutableLong(count));
}
}
@@ -239,10 +231,10 @@ public class RFile {
} else {
out.writeInt(columnFamilies.size());
- for (Entry<ByteSequence,Count> entry : columnFamilies.entrySet()) {
+ for (Entry<ByteSequence,MutableLong> entry : columnFamilies.entrySet()) {
out.writeInt(entry.getKey().length());
out.write(entry.getKey().getBackingArray(), entry.getKey().offset(), entry.getKey().length());
- out.writeLong(entry.getValue().count);
+ out.writeLong(entry.getValue().longValue());
}
}
@@ -474,26 +466,23 @@ public class RFile {
}
}
- private static class LocalityGroupReader implements FileSKVIterator {
+ private static class LocalityGroupReader extends LocalityGroup implements FileSKVIterator {
private BlockFileReader reader;
private MultiLevelIndex.Reader index;
private int blockCount;
private Key firstKey;
private int startBlock;
- private Map<ByteSequence,Count> columnFamilies;
- private boolean isDefaultLocalityGroup;
private boolean closed = false;
private int version;
private boolean checkRange = true;
private LocalityGroupReader(BlockFileReader reader, LocalityGroupMetadata lgm, int version) throws IOException {
+ super(lgm.columnFamilies, lgm.isDefaultLG);
this.firstKey = lgm.firstKey;
this.index = lgm.indexReader;
this.startBlock = lgm.startBlock;
blockCount = index.size();
- this.columnFamilies = lgm.columnFamilies;
- this.isDefaultLocalityGroup = lgm.isDefaultLG;
this.version = version;
this.reader = reader;
@@ -501,12 +490,11 @@ public class RFile {
}
public LocalityGroupReader(LocalityGroupReader lgr) {
+ super(lgr.columnFamilies, lgr.isDefaultLocalityGroup);
this.firstKey = lgr.firstKey;
this.index = lgr.index;
this.startBlock = lgr.startBlock;
this.blockCount = lgr.blockCount;
- this.columnFamilies = lgr.columnFamilies;
- this.isDefaultLocalityGroup = lgr.isDefaultLocalityGroup;
this.reader = lgr.reader;
this.version = lgr.version;
}
@@ -683,7 +671,7 @@ public class RFile {
// causing the build of an index... doing this could slow down some use cases and
// and speed up others.
- MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
+ MutableByteSequence valbs = new MutableByteSequence(new byte[64], 0, 0);
SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey());
if (skippr.skipped > 0) {
entriesLeft -= skippr.skipped;
@@ -730,7 +718,7 @@ public class RFile {
if (!checkRange)
hasTop = true;
- MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
+ MutableByteSequence valbs = new MutableByteSequence(new byte[64], 0, 0);
Key currKey = null;
@@ -747,7 +735,7 @@ public class RFile {
val = new Value();
val.readFields(currBlock);
- valbs = new MByteSequence(val.get(), 0, val.getSize());
+ valbs = new MutableByteSequence(val.get(), 0, val.getSize());
// just consumed one key from the input stream, so subtract one from entries left
entriesLeft = bie.getEntriesLeft() - 1;
@@ -810,12 +798,15 @@ public class RFile {
public void setInterruptFlag(AtomicBoolean flag) {
this.interruptFlag = flag;
}
+
+ @Override
+ public InterruptibleIterator getIterator() {
+ return this;
+ }
}
public static class Reader extends HeapIterator implements FileSKVIterator {
-
- private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
-
+
private BlockFileReader reader;
private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
@@ -985,66 +976,7 @@ public class RFile {
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-
- clear();
-
- numLGSeeked = 0;
-
- Set<ByteSequence> cfSet;
- if (columnFamilies.size() > 0)
- if (columnFamilies instanceof Set<?>) {
- cfSet = (Set<ByteSequence>) columnFamilies;
- } else {
- cfSet = new HashSet<ByteSequence>();
- cfSet.addAll(columnFamilies);
- }
- else
- cfSet = Collections.emptySet();
-
- for (LocalityGroupReader lgr : lgReaders) {
-
- // when include is set to true it means this locality groups contains
- // wanted column families
- boolean include = false;
-
- if (cfSet.size() == 0) {
- include = !inclusive;
- } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
- // do not know what column families are in the default locality group,
- // only know what column families are not in it
-
- if (inclusive) {
- if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
- // default LG may contain wanted and unwanted column families
- include = true;
- }// else - everything wanted is in other locality groups, so nothing to do
- } else {
- // must include, if all excluded column families are in other locality groups
- // then there are not unwanted column families in default LG
- include = true;
- }
- } else {
- /*
- * Need to consider the following cases for inclusive and exclusive (lgcf:locality group column family set, cf:column family set) lgcf and cf are
- * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the other
- */
-
- for (Entry<ByteSequence,Count> entry : lgr.columnFamilies.entrySet())
- if (entry.getValue().count > 0)
- if (cfSet.contains(entry.getKey())) {
- if (inclusive)
- include = true;
- } else if (!inclusive) {
- include = true;
- }
- }
-
- if (include) {
- lgr.seek(range, EMPTY_CF_SET, false);
- addSource(lgr);
- numLGSeeked++;
- }// every column family is excluded, zero count, or not present
- }
+ numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
}
int getNumLocalityGroupsSeeked() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
index 97001ee..07bf6d3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
@@ -20,9 +20,9 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.util.MutableByteSequence;
import org.apache.accumulo.core.util.UnsynchronizedBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -218,31 +218,6 @@ public class RelativeKey implements Writable {
this.prevKey = this.key;
}
- static class MByteSequence extends ArrayByteSequence {
- private static final long serialVersionUID = 1L;
-
- MByteSequence(byte[] data, int offset, int length) {
- super(data, offset, length);
- }
-
- MByteSequence(ByteSequence bs) {
- super(new byte[Math.max(64, bs.length())]);
- System.arraycopy(bs.getBackingArray(), bs.offset(), data, 0, bs.length());
- this.length = bs.length();
- this.offset = 0;
- }
-
- void setArray(byte[] data) {
- this.data = data;
- this.offset = 0;
- this.length = 0;
- }
-
- void setLength(int len) {
- this.length = len;
- }
- }
-
public static class SkippR {
RelativeKey rk;
int skipped;
@@ -255,15 +230,15 @@ public class RelativeKey implements Writable {
}
}
- public static SkippR fastSkip(DataInput in, Key seekKey, MByteSequence value, Key prevKey, Key currKey) throws IOException {
+ public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence value, Key prevKey, Key currKey) throws IOException {
// this method assumes that fast skip is being called on a compressed block where the last key
// in the compressed block is >= seekKey... therefore this method shouldn't go past the end of the
// compressed block... if it does, there is probably an error in the caller's logic
// this method mostly avoids object allocation and only does compares when the row changes
- MByteSequence row, cf, cq, cv;
- MByteSequence prow, pcf, pcq, pcv;
+ MutableByteSequence row, cf, cq, cv;
+ MutableByteSequence prow, pcf, pcq, pcv;
ByteSequence stopRow = seekKey.getRowData();
ByteSequence stopCF = seekKey.getColumnFamilyData();
@@ -277,16 +252,16 @@ public class RelativeKey implements Writable {
if (currKey != null) {
- prow = new MByteSequence(currKey.getRowData());
- pcf = new MByteSequence(currKey.getColumnFamilyData());
- pcq = new MByteSequence(currKey.getColumnQualifierData());
- pcv = new MByteSequence(currKey.getColumnVisibilityData());
+ prow = new MutableByteSequence(currKey.getRowData());
+ pcf = new MutableByteSequence(currKey.getColumnFamilyData());
+ pcq = new MutableByteSequence(currKey.getColumnQualifierData());
+ pcv = new MutableByteSequence(currKey.getColumnVisibilityData());
pts = currKey.getTimestamp();
- row = new MByteSequence(currKey.getRowData());
- cf = new MByteSequence(currKey.getColumnFamilyData());
- cq = new MByteSequence(currKey.getColumnQualifierData());
- cv = new MByteSequence(currKey.getColumnVisibilityData());
+ row = new MutableByteSequence(currKey.getRowData());
+ cf = new MutableByteSequence(currKey.getColumnFamilyData());
+ cq = new MutableByteSequence(currKey.getColumnQualifierData());
+ cv = new MutableByteSequence(currKey.getColumnVisibilityData());
ts = currKey.getTimestamp();
rowCmp = row.compareTo(stopRow);
@@ -316,15 +291,15 @@ public class RelativeKey implements Writable {
}
} else {
- row = new MByteSequence(new byte[64], 0, 0);
- cf = new MByteSequence(new byte[64], 0, 0);
- cq = new MByteSequence(new byte[64], 0, 0);
- cv = new MByteSequence(new byte[64], 0, 0);
+ row = new MutableByteSequence(new byte[64], 0, 0);
+ cf = new MutableByteSequence(new byte[64], 0, 0);
+ cq = new MutableByteSequence(new byte[64], 0, 0);
+ cv = new MutableByteSequence(new byte[64], 0, 0);
- prow = new MByteSequence(new byte[64], 0, 0);
- pcf = new MByteSequence(new byte[64], 0, 0);
- pcq = new MByteSequence(new byte[64], 0, 0);
- pcv = new MByteSequence(new byte[64], 0, 0);
+ prow = new MutableByteSequence(new byte[64], 0, 0);
+ pcf = new MutableByteSequence(new byte[64], 0, 0);
+ pcq = new MutableByteSequence(new byte[64], 0, 0);
+ pcv = new MutableByteSequence(new byte[64], 0, 0);
}
byte fieldsSame = -1;
@@ -346,7 +321,7 @@ public class RelativeKey implements Writable {
if ((fieldsSame & ROW_SAME) != ROW_SAME) {
- MByteSequence tmp = prow;
+ MutableByteSequence tmp = prow;
prow = row;
row = tmp;
@@ -362,7 +337,7 @@ public class RelativeKey implements Writable {
if ((fieldsSame & CF_SAME) != CF_SAME) {
- MByteSequence tmp = pcf;
+ MutableByteSequence tmp = pcf;
pcf = cf;
cf = tmp;
@@ -377,7 +352,7 @@ public class RelativeKey implements Writable {
if ((fieldsSame & CQ_SAME) != CQ_SAME) {
- MByteSequence tmp = pcq;
+ MutableByteSequence tmp = pcq;
pcq = cq;
cq = tmp;
@@ -392,7 +367,7 @@ public class RelativeKey implements Writable {
if ((fieldsSame & CV_SAME) != CV_SAME) {
- MByteSequence tmp = pcv;
+ MutableByteSequence tmp = pcv;
pcv = cv;
cv = tmp;
@@ -431,7 +406,7 @@ public class RelativeKey implements Writable {
}
if (count > 1) {
- MByteSequence trow, tcf, tcq, tcv;
+ MutableByteSequence trow, tcf, tcq, tcv;
long tts;
// when the current keys field is same as the last, then
@@ -463,19 +438,19 @@ public class RelativeKey implements Writable {
return new SkippR(result, count, newPrevKey);
}
- private static void read(DataInput in, MByteSequence mbseq) throws IOException {
+ private static void read(DataInput in, MutableByteSequence mbseq) throws IOException {
int len = WritableUtils.readVInt(in);
read(in, mbseq, len);
}
- private static void readValue(DataInput in, MByteSequence mbseq) throws IOException {
+ private static void readValue(DataInput in, MutableByteSequence mbseq) throws IOException {
int len = in.readInt();
read(in, mbseq, len);
}
- private static void read(DataInput in, MByteSequence mbseqDestination, int len) throws IOException {
+ private static void read(DataInput in, MutableByteSequence mbseqDestination, int len) throws IOException {
if (mbseqDestination.getBackingArray().length < len) {
- mbseqDestination.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)]);
+ mbseqDestination.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)], 0, 0);
}
in.readFully(mbseqDestination.getBackingArray(), 0, len);
@@ -497,12 +472,12 @@ public class RelativeKey implements Writable {
return data;
}
- private static void readPrefix(DataInput in, MByteSequence dest, ByteSequence prefixSource) throws IOException {
+ private static void readPrefix(DataInput in, MutableByteSequence dest, ByteSequence prefixSource) throws IOException {
int prefixLen = WritableUtils.readVInt(in);
int remainingLen = WritableUtils.readVInt(in);
int len = prefixLen + remainingLen;
if (dest.getBackingArray().length < len) {
- dest.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)]);
+ dest.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)], 0, 0);
}
if (prefixSource.isBackedByArray()) {
System.arraycopy(prefixSource.getBackingArray(), prefixSource.offset(), dest.getBackingArray(), 0, prefixLen);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
new file mode 100644
index 0000000..c0045ac
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.system;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ *
+ */
+public class LocalityGroupIterator extends HeapIterator implements InterruptibleIterator {
+
+ private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
+
+ public static class LocalityGroup {
+ /**
+ * @param localityGroup
+ * @param env
+ */
+ private LocalityGroup(LocalityGroup localityGroup, IteratorEnvironment env) {
+ this(localityGroup.columnFamilies, localityGroup.isDefaultLocalityGroup);
+ this.iterator = (InterruptibleIterator) localityGroup.iterator.deepCopy(env);
+ }
+
+ public LocalityGroup(InterruptibleIterator iterator, Map<ByteSequence,MutableLong> columnFamilies, boolean isDefaultLocalityGroup) {
+ this(columnFamilies, isDefaultLocalityGroup);
+ this.iterator = iterator;
+ }
+
+ public LocalityGroup(Map<ByteSequence,MutableLong> columnFamilies, boolean isDefaultLocalityGroup) {
+ this.isDefaultLocalityGroup = isDefaultLocalityGroup;
+ this.columnFamilies = columnFamilies;
+ }
+
+ public InterruptibleIterator getIterator() {
+ return iterator;
+ }
+
+ protected boolean isDefaultLocalityGroup;
+ protected Map<ByteSequence,MutableLong> columnFamilies;
+ private InterruptibleIterator iterator;
+ }
+
+ private LocalityGroup groups[];
+ private Set<ByteSequence> nonDefaultColumnFamilies;
+ private AtomicBoolean interruptFlag;
+
+ public LocalityGroupIterator(LocalityGroup groups[], Set<ByteSequence> nonDefaultColumnFamilies) {
+ super(groups.length);
+ this.groups = groups;
+ this.nonDefaultColumnFamilies = nonDefaultColumnFamilies;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public static final int seek(HeapIterator hiter, LocalityGroup[] groups, Set<ByteSequence> nonDefaultColumnFamilies, Range range,
+ Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ hiter.clear();
+
+ int numLGSeeked = 0;
+
+ Set<ByteSequence> cfSet;
+ if (columnFamilies.size() > 0)
+ if (columnFamilies instanceof Set<?>) {
+ cfSet = (Set<ByteSequence>) columnFamilies;
+ } else {
+ cfSet = new HashSet<ByteSequence>();
+ cfSet.addAll(columnFamilies);
+ }
+ else
+ cfSet = Collections.emptySet();
+
+ for (LocalityGroup lgr : groups) {
+ // when include is set to true it means this locality groups contains
+ // wanted column families
+ boolean include = false;
+
+ if (cfSet.size() == 0) {
+ include = !inclusive;
+ } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
+ // do not know what column families are in the default locality group,
+ // only know what column families are not in it
+
+ if (inclusive) {
+ if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
+ // default LG may contain wanted and unwanted column families
+ include = true;
+ }// else - everything wanted is in other locality groups, so nothing to do
+ } else {
+ // must include, if all excluded column families are in other locality groups
+ // then there are not unwanted column families in default LG
+ include = true;
+ }
+ } else {
+ /*
+ * Need to consider the following cases for inclusive and exclusive (lgcf:locality group column family set, cf:column family set) lgcf and cf are
+ * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the other
+ */
+
+ for (Entry<ByteSequence,MutableLong> entry : lgr.columnFamilies.entrySet())
+ if (entry.getValue().longValue() > 0)
+ if (cfSet.contains(entry.getKey())) {
+ if (inclusive)
+ include = true;
+ } else if (!inclusive) {
+ include = true;
+ }
+ }
+
+ if (include) {
+ lgr.getIterator().seek(range, EMPTY_CF_SET, false);
+ hiter.addSource(lgr.getIterator());
+ numLGSeeked++;
+ }// every column family is excluded, zero count, or not present
+ }
+
+ return numLGSeeked;
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ seek(this, groups, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ LocalityGroup[] groupsCopy = new LocalityGroup[groups.length];
+
+ for (int i = 0; i < groups.length; i++) {
+ groupsCopy[i] = new LocalityGroup(groups[i], env);
+ if (interruptFlag != null)
+ groupsCopy[i].getIterator().setInterruptFlag(interruptFlag);
+ }
+
+ return new LocalityGroupIterator(groupsCopy, nonDefaultColumnFamilies);
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ this.interruptFlag = flag;
+ for (LocalityGroup lgr : groups) {
+ lgr.getIterator().setInterruptFlag(flag);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index 2dfbf86..a209a00 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@ -16,10 +16,12 @@
*/
package org.apache.accumulo.core.util;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -32,6 +34,10 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.io.Text;
public class LocalityGroupUtil {
@@ -175,4 +181,113 @@ public class LocalityGroupUtil {
return ecf;
}
+ private static class PartitionedMutation extends Mutation {
+ private byte[] row;
+ private List<ColumnUpdate> updates;
+
+ PartitionedMutation(byte[] row, List<ColumnUpdate> updates) {
+ this.row = row;
+ this.updates = updates;
+ }
+
+ @Override
+ public byte[] getRow() {
+ return row;
+ }
+
+ @Override
+ public List<ColumnUpdate> getUpdates() {
+ return updates;
+ }
+
+ @Override
+ public TMutation toThrift() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Mutation m) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class Partitioner {
+
+ private Map<ByteSequence,Integer> colfamToLgidMap;
+ private Map<ByteSequence,MutableLong>[] groups;
+
+ public Partitioner(Map<ByteSequence,MutableLong> groups[]) {
+ this.groups = groups;
+ this.colfamToLgidMap = new HashMap<ByteSequence,Integer>();
+
+ for (int i = 0; i < groups.length; i++) {
+ for (ByteSequence cf : groups[i].keySet()) {
+ colfamToLgidMap.put(cf, i);
+ }
+ }
+ }
+
+ public void partition(List<Mutation> mutations, List<Mutation> partitionedMutations[]) {
+
+ MutableByteSequence mbs = new MutableByteSequence(new byte[0], 0, 0);
+
+ @SuppressWarnings("unchecked")
+ List<ColumnUpdate> parts[] = new List[groups.length + 1];
+
+ for (Mutation mutation : mutations) {
+ if (mutation.getUpdates().size() == 1) {
+ int lgid = getLgid(mbs, mutation.getUpdates().get(0));
+ partitionedMutations[lgid].add(mutation);
+ } else {
+ for (int i = 0; i < parts.length; i++) {
+ parts[i] = null;
+ }
+
+ int lgcount = 0;
+
+ for (ColumnUpdate cu : mutation.getUpdates()) {
+ int lgid = getLgid(mbs, cu);
+
+ if (parts[lgid] == null) {
+ parts[lgid] = new ArrayList<ColumnUpdate>();
+ lgcount++;
+ }
+
+ parts[lgid].add(cu);
+ }
+
+ if (lgcount == 1) {
+ for (int i = 0; i < parts.length; i++)
+ if (parts[i] != null) {
+ partitionedMutations[i].add(mutation);
+ break;
+ }
+ } else {
+ for (int i = 0; i < parts.length; i++)
+ if (parts[i] != null)
+ partitionedMutations[i].add(new PartitionedMutation(mutation.getRow(), parts[i]));
+ }
+ }
+ }
+ }
+
+ private Integer getLgid(MutableByteSequence mbs, ColumnUpdate cu) {
+ mbs.setArray(cu.getColumnFamily(), 0, cu.getColumnFamily().length);
+ Integer lgid = colfamToLgidMap.get(mbs);
+ if (lgid == null)
+ lgid = groups.length;
+ return lgid;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/util/MutableByteSequence.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/MutableByteSequence.java b/core/src/main/java/org/apache/accumulo/core/util/MutableByteSequence.java
new file mode 100644
index 0000000..6db7170
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/MutableByteSequence.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+
+
+public class MutableByteSequence extends ArrayByteSequence {
+ private static final long serialVersionUID = 1L;
+
+ public MutableByteSequence(byte[] data, int offset, int length) {
+ super(data, offset, length);
+ }
+
+ public MutableByteSequence(ByteSequence bs) {
+ super(new byte[Math.max(64, bs.length())]);
+ System.arraycopy(bs.getBackingArray(), bs.offset(), data, 0, bs.length());
+ this.length = bs.length();
+ this.offset = 0;
+ }
+
+ public void setArray(byte[] data, int offset, int len) {
+ this.data = data;
+ this.offset = offset;
+ this.length = len;
+ }
+
+ public void setLength(int len) {
+ this.length = len;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
index 1608576..8c0e691 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
@@ -30,7 +30,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.apache.accumulo.core.util.MutableByteSequence;
import org.apache.accumulo.core.util.UnsynchronizedBuffer;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -176,7 +176,7 @@ public class RelativeKeyTest {
Key seekKey = new Key();
Key prevKey = new Key();
Key currKey = null;
- MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+ MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
assertEquals(1, skippr.skipped);
@@ -207,7 +207,7 @@ public class RelativeKeyTest {
Key seekKey = new Key("s", "t", "u", "v", 1);
Key prevKey = new Key();
Key currKey = null;
- MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+ MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
}
@@ -218,7 +218,7 @@ public class RelativeKeyTest {
Key seekKey = expectedKeys.get(seekIndex);
Key prevKey = new Key();
Key currKey = null;
- MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+ MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/test/java/org/apache/accumulo/core/util/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/util/PartitionerTest.java b/core/src/test/java/org/apache/accumulo/core/util/PartitionerTest.java
new file mode 100644
index 0000000..276720c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/util/PartitionerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PartitionerTest {
+ @Test
+ public void test1() {
+
+ @SuppressWarnings("unchecked")
+ Map<ByteSequence,MutableLong>[] groups = new Map[2];
+
+ groups[0] = new HashMap<ByteSequence,MutableLong>();
+ groups[0].put(new ArrayByteSequence("cf1"), new MutableLong(1));
+ groups[0].put(new ArrayByteSequence("cf2"), new MutableLong(1));
+
+ groups[1] = new HashMap<ByteSequence,MutableLong>();
+ groups[1].put(new ArrayByteSequence("cf3"), new MutableLong(1));
+
+ Partitioner p1 = new Partitioner(groups);
+
+ Mutation m1 = new Mutation("r1");
+ m1.put("cf1", "cq1", "v1");
+
+ Mutation m2 = new Mutation("r2");
+ m2.put("cf1", "cq1", "v2");
+ m2.put("cf2", "cq2", "v3");
+
+ Mutation m3 = new Mutation("r3");
+ m3.put("cf1", "cq1", "v4");
+ m3.put("cf3", "cq2", "v5");
+
+ Mutation m4 = new Mutation("r4");
+ m4.put("cf1", "cq1", "v6");
+ m4.put("cf3", "cq2", "v7");
+ m4.put("cf5", "cq3", "v8");
+
+ Mutation m5 = new Mutation("r5");
+ m5.put("cf5", "cq3", "v9");
+
+ List<Mutation> mutations = Arrays.asList(m1, m2, m3, m4, m5);
+ @SuppressWarnings("unchecked")
+ List<Mutation>[] partitioned = new List[3];
+
+ for (int i = 0; i < partitioned.length; i++) {
+ partitioned[i] = new ArrayList<Mutation>();
+ }
+
+ p1.partition(mutations, partitioned);
+
+ m1 = new Mutation("r1");
+ m1.put("cf1", "cq1", "v1");
+
+ m2 = new Mutation("r2");
+ m2.put("cf1", "cq1", "v2");
+ m2.put("cf2", "cq2", "v3");
+
+ m3 = new Mutation("r3");
+ m3.put("cf1", "cq1", "v4");
+
+ m4 = new Mutation("r4");
+ m4.put("cf1", "cq1", "v6");
+
+ Assert.assertEquals(toKeySet(m1,m2,m3,m4), toKeySet(partitioned[0]));
+
+ m3 = new Mutation("r3");
+ m3.put("cf3", "cq2", "v5");
+
+ m4 = new Mutation("r4");
+ m4.put("cf3", "cq2", "v7");
+
+ Assert.assertEquals(toKeySet(m3,m4), toKeySet(partitioned[1]));
+
+ m4 = new Mutation("r4");
+ m4.put("cf5", "cq3", "v8");
+
+ Assert.assertEquals(toKeySet(m4,m5), toKeySet(partitioned[2]));
+
+ }
+
+ private Set<Key> toKeySet(List<Mutation> mutations){
+ return toKeySet(mutations.toArray(new Mutation[0]));
+ }
+
+ private Set<Key> toKeySet(Mutation ... expected) {
+ HashSet<Key> ret = new HashSet<Key>();
+ for (Mutation mutation : expected)
+ for(ColumnUpdate cu : mutation.getUpdates())
+ ret.add(new Key(mutation.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(), cu.getColumnVisibility(), cu.getTimestamp()));
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
index 57f36c3..a648366 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -52,13 +53,18 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.SortedMapIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -167,6 +173,7 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
super.seek(range, columnFamilies, inclusive);
+
if (hasTop())
getTopKeyVal();
@@ -193,24 +200,37 @@ public class InMemoryMap {
private volatile String memDumpFile = null;
private final String memDumpDir;
+
+ private Map<String,Set<ByteSequence>> lggroups;
public InMemoryMap(boolean useNativeMap, String memDumpDir) {
+ this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
+ }
+
+ public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
this.memDumpDir = memDumpDir;
+ this.lggroups = lggroups;
+
+ if (lggroups.size() == 0)
+ map = newMap(useNativeMap);
+ else
+ map = new LocalityGroupMap(lggroups, useNativeMap);
+ }
+
+ public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
+ this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
+ }
+
+ private static SimpleMap newMap(boolean useNativeMap) {
if (useNativeMap && NativeMap.loadedNativeLibraries()) {
try {
- map = new NativeMapWrapper();
+ return new NativeMapWrapper();
} catch (Throwable t) {
log.error("Failed to create native map", t);
}
}
- if (map == null) {
- map = new DefaultMap();
- }
- }
-
- public InMemoryMap(AccumuloConfiguration config) {
- this(config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
+ return new DefaultMap();
}
private interface SimpleMap {
@@ -229,6 +249,115 @@ public class InMemoryMap {
public void mutate(List<Mutation> mutations, int kvCount);
}
+ private static class LocalityGroupMap implements SimpleMap {
+
+ private Map<ByteSequence,MutableLong> groupFams[];
+
+ // the last map in the array is the default locality group
+ private SimpleMap maps[];
+ private Partitioner partitioner;
+ private List<Mutation>[] partitioned;
+ private Set<ByteSequence> nonDefaultColumnFamilies;
+
+ @SuppressWarnings("unchecked")
+ LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
+ this.groupFams = new Map[groups.size()];
+ this.maps = new SimpleMap[groups.size() + 1];
+ this.partitioned = new List[groups.size() + 1];
+ this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
+
+ for (int i = 0; i < maps.length; i++) {
+ maps[i] = newMap(useNativeMap);
+ }
+
+ int count = 0;
+ for (Set<ByteSequence> cfset : groups.values()) {
+ HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
+ for (ByteSequence bs : cfset)
+ map.put(bs, new MutableLong(1));
+ this.groupFams[count++] = map;
+ nonDefaultColumnFamilies.addAll(cfset);
+ }
+
+ partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
+
+ for (int i = 0; i < partitioned.length; i++) {
+ partitioned[i] = new ArrayList<Mutation>();
+ }
+ }
+
+ @Override
+ public Value get(Key key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ int sum = 0;
+ for (SimpleMap map : maps)
+ sum += map.size();
+ return sum;
+ }
+
+ @Override
+ public InterruptibleIterator skvIterator() {
+ LocalityGroup groups[] = new LocalityGroup[maps.length];
+ for (int i = 0; i < groups.length; i++) {
+ if (i < groupFams.length)
+ groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
+ else
+ groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
+ }
+
+
+ return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
+ }
+
+ @Override
+ public void delete() {
+ for (SimpleMap map : maps)
+ map.delete();
+ }
+
+ @Override
+ public long getMemoryUsed() {
+ long sum = 0;
+ for (SimpleMap map : maps)
+ sum += map.getMemoryUsed();
+ return sum;
+ }
+
+ @Override
+ public synchronized void mutate(List<Mutation> mutations, int kvCount) {
+ // this method is synchronized because it reuses objects to avoid allocation,
+ // currently, the method that calls this is synchronized so there is no
+ // loss in parallelism.... synchronization was added here for future proofing
+
+ try{
+ partitioner.partition(mutations, partitioned);
+
+ for (int i = 0; i < partitioned.length; i++) {
+ if (partitioned[i].size() > 0) {
+ maps[i].mutate(partitioned[i], kvCount);
+ for (Mutation m : partitioned[i])
+ kvCount += m.getUpdates().size();
+ }
+ }
+ } finally {
+ // clear immediately so mutations can be garbage collected
+ for (List<Mutation> list : partitioned) {
+ list.clear();
+ }
+ }
+ }
+
+ }
+
private static class DefaultMap implements SimpleMap {
private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
private AtomicLong bytesInMemory = new AtomicLong();
@@ -568,18 +697,23 @@ public class InMemoryMap {
newConf.setInt("io.seqfile.compress.blocksize", 100000);
FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
- out.startDefaultLocalityGroup();
+
InterruptibleIterator iter = map.skvIterator();
- iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+
+ HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
- while (iter.hasTop() && activeIters.size() > 0) {
- // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
- // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
- Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
- out.append(iter.getTopKey(), newValue);
- iter.next();
+ for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
+ allfams.addAll(entry.getValue());
+ out.startNewLocalityGroup(entry.getKey(), entry.getValue());
+ iter.seek(new Range(), entry.getValue(), true);
+ dumpLocalityGroup(out, iter);
}
+ out.startDefaultLocalityGroup();
+ iter.seek(new Range(), allfams, false);
+
+ dumpLocalityGroup(out, iter);
+
out.close();
log.debug("Created mem dump file " + tmpFile);
@@ -614,4 +748,15 @@ public class InMemoryMap {
tmpMap.delete();
}
+
+ private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
+ while (iter.hasTop() && activeIters.size() > 0) {
+ // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
+ // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
+ Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+ out.append(iter.getTopKey(), newValue);
+ iter.next();
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index d985f4a..0272a2f 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -90,6 +90,7 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -259,7 +260,11 @@ public class Tablet {
private CommitSession commitSession;
TabletMemory() {
- memTable = new InMemoryMap(tabletServer.getSystemConfiguration());
+ try {
+ memTable = new InMemoryMap(acuTableConf);
+ } catch (LocalityGroupConfigurationError e) {
+ throw new RuntimeException(e);
+ }
commitSession = new CommitSession(nextSeq, memTable);
nextSeq += 2;
}
@@ -282,7 +287,11 @@ public class Tablet {
}
otherMemTable = memTable;
- memTable = new InMemoryMap(tabletServer.getSystemConfiguration());
+ try {
+ memTable = new InMemoryMap(acuTableConf);
+ } catch (LocalityGroupConfigurationError e) {
+ throw new RuntimeException(e);
+ }
CommitSession oldCommitSession = commitSession;
commitSession = new CommitSession(nextSeq, memTable);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
index 97c8eec..fd5e661 100644
--- a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
@@ -18,8 +18,13 @@ package org.apache.accumulo.server.tabletserver;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -82,6 +87,14 @@ public class InMemoryMapTest extends TestCase {
}
+ static Set<ByteSequence> newCFSet(String... cfs) {
+ HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
+ for (String cf : cfs) {
+ cfSet.add(new ArrayByteSequence(cf));
+ }
+ return cfSet;
+ }
+
public void test2() throws Exception {
InMemoryMap imm = new InMemoryMap(false, "/tmp");
@@ -345,4 +358,135 @@ public class InMemoryMapTest extends TestCase {
}
}
+ public void testLocalityGroups() throws Exception {
+
+ Map<String,Set<ByteSequence>> lggroups1 = new HashMap<String,Set<ByteSequence>>();
+ lggroups1.put("lg1", newCFSet("cf1", "cf2"));
+ lggroups1.put("lg2", newCFSet("cf3", "cf4"));
+
+ InMemoryMap imm = new InMemoryMap(lggroups1, false, "/tmp");
+
+ Mutation m1 = new Mutation("r1");
+ m1.put("cf1", "x", 2, "1");
+ m1.put("cf1", "y", 2, "2");
+ m1.put("cf3", "z", 2, "3");
+ m1.put("foo", "b", 2, "9");
+
+ Mutation m2 = new Mutation("r2");
+ m2.put("cf2", "x", 3, "5");
+
+ Mutation m3 = new Mutation("r3");
+ m3.put("foo", "b", 4, "6");
+
+ Mutation m4 = new Mutation("r4");
+ m4.put("foo", "b", 5, "7");
+ m4.put("cf4", "z", 5, "8");
+
+ Mutation m5 = new Mutation("r5");
+ m5.put("cf3", "z", 6, "A");
+ m5.put("cf4", "z", 6, "B");
+
+ imm.mutate(Arrays.asList(m1, m2, m3, m4, m5));
+
+ MemoryIterator iter1 = imm.skvIterator();
+
+ seekLocalityGroups(iter1);
+ SortedKeyValueIterator<Key,Value> dc1 = iter1.deepCopy(null);
+ seekLocalityGroups(dc1);
+
+ assertTrue(imm.getNumEntries() == 10);
+ assertTrue(imm.estimatedSizeInBytes() > 0);
+
+ imm.delete(0);
+
+ seekLocalityGroups(iter1);
+ seekLocalityGroups(dc1);
+ // TODO uncomment following when ACCUMULO-1628 is fixed
+ // seekLocalityGroups(iter1.deepCopy(null));
+ }
+
+ private void seekLocalityGroups(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
+ iter1.seek(new Range(), newCFSet("cf1"), true);
+ ae(iter1, "r1", "cf1:x", 2, "1");
+ ae(iter1, "r1", "cf1:y", 2, "2");
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range("r2", "r4"), newCFSet("cf1"), true);
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), newCFSet("cf3"), true);
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ ae(iter1, "r5", "cf3:z", 6, "A");
+ ae(iter1, "r5", "cf4:z", 6, "B");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), newCFSet("foo"), true);
+ ae(iter1, "r1", "foo:b", 2, "9");
+ ae(iter1, "r3", "foo:b", 4, "6");
+ ae(iter1, "r4", "foo:b", 5, "7");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), newCFSet("cf1", "cf3"), true);
+ ae(iter1, "r1", "cf1:x", 2, "1");
+ ae(iter1, "r1", "cf1:y", 2, "2");
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ ae(iter1, "r5", "cf3:z", 6, "A");
+ ae(iter1, "r5", "cf4:z", 6, "B");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range("r2", "r4"), newCFSet("cf1", "cf3"), true);
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), newCFSet("cf1", "cf3", "foo"), true);
+ assertAll(iter1);
+
+ iter1.seek(new Range("r1", "r2"), newCFSet("cf1", "cf3", "foo"), true);
+ ae(iter1, "r1", "cf1:x", 2, "1");
+ ae(iter1, "r1", "cf1:y", 2, "2");
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r1", "foo:b", 2, "9");
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ assertAll(iter1);
+
+ iter1.seek(new Range(), newCFSet("cf1"), false);
+ assertAll(iter1);
+
+ iter1.seek(new Range(), newCFSet("cf1", "cf2"), false);
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r1", "foo:b", 2, "9");
+ ae(iter1, "r3", "foo:b", 4, "6");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ ae(iter1, "r4", "foo:b", 5, "7");
+ ae(iter1, "r5", "cf3:z", 6, "A");
+ ae(iter1, "r5", "cf4:z", 6, "B");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range("r2"), newCFSet("cf1", "cf3", "foo"), true);
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ assertFalse(iter1.hasTop());
+ }
+
+ private void assertAll(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
+ ae(iter1, "r1", "cf1:x", 2, "1");
+ ae(iter1, "r1", "cf1:y", 2, "2");
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r1", "foo:b", 2, "9");
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ ae(iter1, "r3", "foo:b", 4, "6");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ ae(iter1, "r4", "foo:b", 5, "7");
+ ae(iter1, "r5", "cf3:z", 6, "A");
+ ae(iter1, "r5", "cf4:z", 6, "B");
+ assertFalse(iter1.hasTop());
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
new file mode 100644
index 0000000..948a741
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class IMMLGBenchmark {
+ public static void main(String[] args) throws Exception {
+ ZooKeeperInstance zki = new ZooKeeperInstance("test16", "localhost");
+ Connector conn = zki.getConnector("root", new PasswordToken("secret"));
+
+ int numlg = Integer.parseInt(args[0]);
+
+ ArrayList<byte[]> cfset = new ArrayList<byte[]>();
+
+ for (int i = 0; i < 32; i++) {
+ cfset.add(String.format("%04x", i).getBytes());
+ }
+
+ Map<String,Stat> stats = new TreeMap<String,Stat>();
+
+ for (int i = 0; i < 5; i++) {
+ runTest(conn, numlg, cfset, i > 1 ? stats : null);
+ System.out.println();
+ }
+
+ for (Entry<String,Stat> entry : stats.entrySet()) {
+ System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().getAverage());
+ }
+
+ }
+
+ private static void runTest(Connector conn, int numlg, ArrayList<byte[]> cfset, Map<String,Stat> stats) throws Exception {
+ String table = "immlgb";
+
+ try {
+ conn.tableOperations().delete(table);
+ } catch (TableNotFoundException tnfe) {}
+
+ conn.tableOperations().create(table);
+ conn.tableOperations().setProperty(table, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "snappy");
+
+ setupLocalityGroups(conn, numlg, cfset, table);
+
+ addStat(stats, "write", write(conn, cfset, table));
+ addStat(stats, "scan cf", scan(conn, cfset, table, false));
+ addStat(stats, "scan cf:cq", scan(conn, cfset, table, true));
+ // TODO time reading all data
+
+ long t1 = System.currentTimeMillis();
+ conn.tableOperations().flush(table, null, null, true);
+ long t2 = System.currentTimeMillis();
+
+ addStat(stats, "flush", t2 - t1);
+ }
+
+ private static void addStat(Map<String,Stat> stats, String s, long wt) {
+ System.out.println(s + ":" + wt);
+
+ if (stats == null)
+ return;
+
+ Stat stat = stats.get(s);
+ if (stat == null) {
+ stat = new Stat();
+ stats.put(s, stat);
+ }
+ stat.addStat(wt);
+ }
+
+ private static long scan(Connector conn, ArrayList<byte[]> cfset, String table, boolean cq) throws TableNotFoundException {
+ Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+
+ if (!cq)
+ scanner.fetchColumnFamily(new Text(cfset.get(15)));
+ else
+ scanner.fetchColumn(new Text(cfset.get(15)), new Text(cfset.get(15)));
+
+ long t1 = System.currentTimeMillis();
+
+ @SuppressWarnings("unused")
+ int count = 0;
+ for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) {
+ count++;
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ return t2 - t1;
+
+ }
+
+ private static long write(Connector conn, ArrayList<byte[]> cfset, String table) throws TableNotFoundException, MutationsRejectedException {
+ Random rand = new Random();
+
+ byte val[] = new byte[50];
+
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+
+ long t1 = System.currentTimeMillis();
+
+ for (int i = 0; i < 1 << 15; i++) {
+ byte[] row = FastFormat.toZeroPaddedString(Math.abs(rand.nextLong()), 16, 16, new byte[0]);
+
+ Mutation m = new Mutation(row);
+ for (byte[] cf : cfset) {
+ byte[] cq = FastFormat.toZeroPaddedString(rand.nextInt(1 << 16), 4, 16, new byte[0]);
+ rand.nextBytes(val);
+ m.put(cf, cq, val);
+ }
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ long t2 = System.currentTimeMillis();
+
+ return t2 - t1;
+ }
+
+ private static void setupLocalityGroups(Connector conn, int numlg, ArrayList<byte[]> cfset, String table) throws AccumuloException,
+ AccumuloSecurityException, TableNotFoundException {
+ if (numlg > 1) {
+ int numCF = cfset.size() / numlg;
+ int gNum = 0;
+
+ Iterator<byte[]> cfiter = cfset.iterator();
+ Map<String,Set<Text>> groups = new HashMap<String,Set<Text>>();
+ while (cfiter.hasNext()) {
+ HashSet<Text> groupCols = new HashSet<Text>();
+ for (int i = 0; i < numCF && cfiter.hasNext(); i++) {
+ groupCols.add(new Text(cfiter.next()));
+ }
+
+ groups.put("lg" + (gNum++), groupCols);
+ }
+
+ conn.tableOperations().setLocalityGroups(table, groups);
+ conn.tableOperations().offline(table);
+ UtilWaitThread.sleep(1000);
+ conn.tableOperations().online(table);
+ }
+ }
+}