You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/31 21:03:39 UTC

git commit: ACCUMULO-112 added locality group support to in memory map

Updated Branches:
  refs/heads/master 4313860c4 -> 0608e32f8


ACCUMULO-112 added locality group support to in memory map


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0608e32f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0608e32f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0608e32f

Branch: refs/heads/master
Commit: 0608e32f8b09f926e40677d1e23ccedd0d6e088d
Parents: 4313860
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jul 31 14:39:49 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Wed Jul 31 14:54:28 2013 -0400

----------------------------------------------------------------------
 .../apache/accumulo/core/file/rfile/RFile.java  | 128 +++----------
 .../accumulo/core/file/rfile/RelativeKey.java   |  87 +++------
 .../iterators/system/LocalityGroupIterator.java | 177 +++++++++++++++++
 .../accumulo/core/util/LocalityGroupUtil.java   | 115 +++++++++++
 .../accumulo/core/util/MutableByteSequence.java |  46 +++++
 .../core/file/rfile/RelativeKeyTest.java        |   8 +-
 .../accumulo/core/util/PartitionerTest.java     | 124 ++++++++++++
 .../server/tabletserver/InMemoryMap.java        | 177 +++++++++++++++--
 .../accumulo/server/tabletserver/Tablet.java    |  13 +-
 .../server/tabletserver/InMemoryMapTest.java    | 144 ++++++++++++++
 .../apache/accumulo/test/IMMLGBenchmark.java    | 190 +++++++++++++++++++
 11 files changed, 1033 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index fe21f02..d6a2532 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -53,13 +53,17 @@ import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
 import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
-import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
 import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.HeapIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.util.MutableByteSequence;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.io.Writable;
 import org.apache.log4j.Logger;
 
@@ -77,24 +81,12 @@ public class RFile {
   // static final int RINDEX_VER_5 = 5; // unreleased
   static final int RINDEX_VER_4 = 4;
   static final int RINDEX_VER_3 = 3;
-  
-  private static class Count {
-    public Count(int i) {
-      this.count = i;
-    }
-    
-    public Count(long count) {
-      this.count = count;
-    }
     
-    long count;
-  }
-  
   private static class LocalityGroupMetadata implements Writable {
     
     private int startBlock;
     private Key firstKey;
-    private Map<ByteSequence,Count> columnFamilies;
+    private Map<ByteSequence,MutableLong> columnFamilies;
     
     private boolean isDefaultLG = false;
     private String name;
@@ -104,14 +96,14 @@ public class RFile {
     private MultiLevelIndex.Reader indexReader;
     
     public LocalityGroupMetadata(int version, BlockFileReader br) {
-      columnFamilies = new HashMap<ByteSequence,Count>();
+      columnFamilies = new HashMap<ByteSequence,MutableLong>();
       indexReader = new MultiLevelIndex.Reader(br, version);
     }
     
     public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
       this.startBlock = nextBlock;
       isDefaultLG = true;
-      columnFamilies = new HashMap<ByteSequence,Count>();
+      columnFamilies = new HashMap<ByteSequence,MutableLong>();
       previousColumnFamilies = pcf;
       
       indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
@@ -121,9 +113,9 @@ public class RFile {
       this.startBlock = nextBlock;
       this.name = name;
       isDefaultLG = false;
-      columnFamilies = new HashMap<ByteSequence,Count>();
+      columnFamilies = new HashMap<ByteSequence,MutableLong>();
       for (ByteSequence cf : cfset) {
-        columnFamilies.put(cf, new Count(0));
+        columnFamilies.put(cf, new MutableLong(0));
       }
       
       indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
@@ -155,7 +147,7 @@ public class RFile {
       }
       
       ByteSequence cf = key.getColumnFamilyData();
-      Count count = columnFamilies.get(cf);
+      MutableLong count = columnFamilies.get(cf);
       
       if (count == null) {
         if (!isDefaultLG) {
@@ -171,12 +163,12 @@ public class RFile {
           columnFamilies = null;
           return;
         }
-        count = new Count(0);
+        count = new MutableLong(0);
         columnFamilies.put(new ArrayByteSequence(cf.getBackingArray(), cf.offset(), cf.length()), count);
         
       }
       
-      count.count++;
+      count.increment();
       
     }
     
@@ -199,7 +191,7 @@ public class RFile {
         columnFamilies = null;
       } else {
         if (columnFamilies == null)
-          columnFamilies = new HashMap<ByteSequence,Count>();
+          columnFamilies = new HashMap<ByteSequence,MutableLong>();
         else
           columnFamilies.clear();
         
@@ -209,7 +201,7 @@ public class RFile {
           in.readFully(cf);
           long count = in.readLong();
           
-          columnFamilies.put(new ArrayByteSequence(cf), new Count(count));
+          columnFamilies.put(new ArrayByteSequence(cf), new MutableLong(count));
         }
       }
       
@@ -239,10 +231,10 @@ public class RFile {
       } else {
         out.writeInt(columnFamilies.size());
         
-        for (Entry<ByteSequence,Count> entry : columnFamilies.entrySet()) {
+        for (Entry<ByteSequence,MutableLong> entry : columnFamilies.entrySet()) {
           out.writeInt(entry.getKey().length());
           out.write(entry.getKey().getBackingArray(), entry.getKey().offset(), entry.getKey().length());
-          out.writeLong(entry.getValue().count);
+          out.writeLong(entry.getValue().longValue());
         }
       }
       
@@ -474,26 +466,23 @@ public class RFile {
     }
   }
   
-  private static class LocalityGroupReader implements FileSKVIterator {
+  private static class LocalityGroupReader extends LocalityGroup implements FileSKVIterator {
     
     private BlockFileReader reader;
     private MultiLevelIndex.Reader index;
     private int blockCount;
     private Key firstKey;
     private int startBlock;
-    private Map<ByteSequence,Count> columnFamilies;
-    private boolean isDefaultLocalityGroup;
     private boolean closed = false;
     private int version;
     private boolean checkRange = true;
     
     private LocalityGroupReader(BlockFileReader reader, LocalityGroupMetadata lgm, int version) throws IOException {
+      super(lgm.columnFamilies, lgm.isDefaultLG);
       this.firstKey = lgm.firstKey;
       this.index = lgm.indexReader;
       this.startBlock = lgm.startBlock;
       blockCount = index.size();
-      this.columnFamilies = lgm.columnFamilies;
-      this.isDefaultLocalityGroup = lgm.isDefaultLG;
       this.version = version;
       
       this.reader = reader;
@@ -501,12 +490,11 @@ public class RFile {
     }
     
     public LocalityGroupReader(LocalityGroupReader lgr) {
+      super(lgr.columnFamilies, lgr.isDefaultLocalityGroup);
       this.firstKey = lgr.firstKey;
       this.index = lgr.index;
       this.startBlock = lgr.startBlock;
       this.blockCount = lgr.blockCount;
-      this.columnFamilies = lgr.columnFamilies;
-      this.isDefaultLocalityGroup = lgr.isDefaultLocalityGroup;
       this.reader = lgr.reader;
       this.version = lgr.version;
     }
@@ -683,7 +671,7 @@ public class RFile {
           // causing the build of an index... doing this could slow down some use cases and
           // and speed up others.
 
-          MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
+          MutableByteSequence valbs = new MutableByteSequence(new byte[64], 0, 0);
           SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey());
           if (skippr.skipped > 0) {
             entriesLeft -= skippr.skipped;
@@ -730,7 +718,7 @@ public class RFile {
           if (!checkRange)
             hasTop = true;
 
-          MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
+          MutableByteSequence valbs = new MutableByteSequence(new byte[64], 0, 0);
 
           Key currKey = null;
 
@@ -747,7 +735,7 @@ public class RFile {
                 val = new Value();
 
                 val.readFields(currBlock);
-                valbs = new MByteSequence(val.get(), 0, val.getSize());
+                valbs = new MutableByteSequence(val.get(), 0, val.getSize());
                 
                 // just consumed one key from the input stream, so subtract one from entries left
                 entriesLeft = bie.getEntriesLeft() - 1;
@@ -810,12 +798,15 @@ public class RFile {
     public void setInterruptFlag(AtomicBoolean flag) {
       this.interruptFlag = flag;
     }
+    
+    @Override
+    public InterruptibleIterator getIterator() {
+      return this;
+    }
   }
   
   public static class Reader extends HeapIterator implements FileSKVIterator {
-    
-    private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
-    
+
     private BlockFileReader reader;
     
     private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
@@ -985,66 +976,7 @@ public class RFile {
     
     @Override
     public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-      
-      clear();
-      
-      numLGSeeked = 0;
-      
-      Set<ByteSequence> cfSet;
-      if (columnFamilies.size() > 0)
-        if (columnFamilies instanceof Set<?>) {
-          cfSet = (Set<ByteSequence>) columnFamilies;
-        } else {
-          cfSet = new HashSet<ByteSequence>();
-          cfSet.addAll(columnFamilies);
-        }
-      else
-        cfSet = Collections.emptySet();
-      
-      for (LocalityGroupReader lgr : lgReaders) {
-        
-        // when include is set to true it means this locality groups contains
-        // wanted column families
-        boolean include = false;
-        
-        if (cfSet.size() == 0) {
-          include = !inclusive;
-        } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
-          // do not know what column families are in the default locality group,
-          // only know what column families are not in it
-          
-          if (inclusive) {
-            if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
-              // default LG may contain wanted and unwanted column families
-              include = true;
-            }// else - everything wanted is in other locality groups, so nothing to do
-          } else {
-            // must include, if all excluded column families are in other locality groups
-            // then there are not unwanted column families in default LG
-            include = true;
-          }
-        } else {
-          /*
-           * Need to consider the following cases for inclusive and exclusive (lgcf:locality group column family set, cf:column family set) lgcf and cf are
-           * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the other
-           */
-          
-          for (Entry<ByteSequence,Count> entry : lgr.columnFamilies.entrySet())
-            if (entry.getValue().count > 0)
-              if (cfSet.contains(entry.getKey())) {
-                if (inclusive)
-                  include = true;
-              } else if (!inclusive) {
-                include = true;
-              }
-        }
-        
-        if (include) {
-          lgr.seek(range, EMPTY_CF_SET, false);
-          addSource(lgr);
-          numLGSeeked++;
-        }// every column family is excluded, zero count, or not present
-      }
+      numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
     }
     
     int getNumLocalityGroupsSeeked() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
index 97001ee..07bf6d3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
@@ -20,9 +20,9 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.accumulo.core.util.UnsynchronizedBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -218,31 +218,6 @@ public class RelativeKey implements Writable {
     this.prevKey = this.key;
   }
   
-  static class MByteSequence extends ArrayByteSequence {
-    private static final long serialVersionUID = 1L;
-
-    MByteSequence(byte[] data, int offset, int length) {
-      super(data, offset, length);
-    }
-    
-    MByteSequence(ByteSequence bs) {
-      super(new byte[Math.max(64, bs.length())]);
-      System.arraycopy(bs.getBackingArray(), bs.offset(), data, 0, bs.length());
-      this.length = bs.length();
-      this.offset = 0;
-    }
-    
-    void setArray(byte[] data) {
-      this.data = data;
-      this.offset = 0;
-      this.length = 0;
-    }
-    
-    void setLength(int len) {
-      this.length = len;
-    }
-  }
-  
   public static class SkippR {
     RelativeKey rk;
     int skipped;
@@ -255,15 +230,15 @@ public class RelativeKey implements Writable {
     }
   }
   
-  public static SkippR fastSkip(DataInput in, Key seekKey, MByteSequence value, Key prevKey, Key currKey) throws IOException {
+  public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence value, Key prevKey, Key currKey) throws IOException {
     // this method assumes that fast skip is being called on a compressed block where the last key
     // in the compressed block is >= seekKey... therefore this method shouldn't go past the end of the
     // compressed block... if it does, there is probably an error in the caller's logic
     
     // this method mostly avoids object allocation and only does compares when the row changes
     
-    MByteSequence row, cf, cq, cv;
-    MByteSequence prow, pcf, pcq, pcv;
+    MutableByteSequence row, cf, cq, cv;
+    MutableByteSequence prow, pcf, pcq, pcv;
     
     ByteSequence stopRow = seekKey.getRowData();
     ByteSequence stopCF = seekKey.getColumnFamilyData();
@@ -277,16 +252,16 @@ public class RelativeKey implements Writable {
     
     if (currKey != null) {
       
-      prow = new MByteSequence(currKey.getRowData());
-      pcf = new MByteSequence(currKey.getColumnFamilyData());
-      pcq = new MByteSequence(currKey.getColumnQualifierData());
-      pcv = new MByteSequence(currKey.getColumnVisibilityData());
+      prow = new MutableByteSequence(currKey.getRowData());
+      pcf = new MutableByteSequence(currKey.getColumnFamilyData());
+      pcq = new MutableByteSequence(currKey.getColumnQualifierData());
+      pcv = new MutableByteSequence(currKey.getColumnVisibilityData());
       pts = currKey.getTimestamp();
       
-      row = new MByteSequence(currKey.getRowData());
-      cf = new MByteSequence(currKey.getColumnFamilyData());
-      cq = new MByteSequence(currKey.getColumnQualifierData());
-      cv = new MByteSequence(currKey.getColumnVisibilityData());
+      row = new MutableByteSequence(currKey.getRowData());
+      cf = new MutableByteSequence(currKey.getColumnFamilyData());
+      cq = new MutableByteSequence(currKey.getColumnQualifierData());
+      cv = new MutableByteSequence(currKey.getColumnVisibilityData());
       ts = currKey.getTimestamp();
       
       rowCmp = row.compareTo(stopRow);
@@ -316,15 +291,15 @@ public class RelativeKey implements Writable {
       }
       
     } else {
-      row = new MByteSequence(new byte[64], 0, 0);
-      cf = new MByteSequence(new byte[64], 0, 0);
-      cq = new MByteSequence(new byte[64], 0, 0);
-      cv = new MByteSequence(new byte[64], 0, 0);
+      row = new MutableByteSequence(new byte[64], 0, 0);
+      cf = new MutableByteSequence(new byte[64], 0, 0);
+      cq = new MutableByteSequence(new byte[64], 0, 0);
+      cv = new MutableByteSequence(new byte[64], 0, 0);
       
-      prow = new MByteSequence(new byte[64], 0, 0);
-      pcf = new MByteSequence(new byte[64], 0, 0);
-      pcq = new MByteSequence(new byte[64], 0, 0);
-      pcv = new MByteSequence(new byte[64], 0, 0);
+      prow = new MutableByteSequence(new byte[64], 0, 0);
+      pcf = new MutableByteSequence(new byte[64], 0, 0);
+      pcq = new MutableByteSequence(new byte[64], 0, 0);
+      pcv = new MutableByteSequence(new byte[64], 0, 0);
     }
     
     byte fieldsSame = -1;
@@ -346,7 +321,7 @@ public class RelativeKey implements Writable {
       
       if ((fieldsSame & ROW_SAME) != ROW_SAME) {
         
-        MByteSequence tmp = prow;
+        MutableByteSequence tmp = prow;
         prow = row;
         row = tmp;
         
@@ -362,7 +337,7 @@ public class RelativeKey implements Writable {
       
       if ((fieldsSame & CF_SAME) != CF_SAME) {
         
-        MByteSequence tmp = pcf;
+        MutableByteSequence tmp = pcf;
         pcf = cf;
         cf = tmp;
         
@@ -377,7 +352,7 @@ public class RelativeKey implements Writable {
       
       if ((fieldsSame & CQ_SAME) != CQ_SAME) {
         
-        MByteSequence tmp = pcq;
+        MutableByteSequence tmp = pcq;
         pcq = cq;
         cq = tmp;
         
@@ -392,7 +367,7 @@ public class RelativeKey implements Writable {
       
       if ((fieldsSame & CV_SAME) != CV_SAME) {
         
-        MByteSequence tmp = pcv;
+        MutableByteSequence tmp = pcv;
         pcv = cv;
         cv = tmp;
         
@@ -431,7 +406,7 @@ public class RelativeKey implements Writable {
     }
     
     if (count > 1) {
-      MByteSequence trow, tcf, tcq, tcv;
+      MutableByteSequence trow, tcf, tcq, tcv;
       long tts;
       
       // when the current keys field is same as the last, then
@@ -463,19 +438,19 @@ public class RelativeKey implements Writable {
     return new SkippR(result, count, newPrevKey);
   }
   
-  private static void read(DataInput in, MByteSequence mbseq) throws IOException {
+  private static void read(DataInput in, MutableByteSequence mbseq) throws IOException {
     int len = WritableUtils.readVInt(in);
     read(in, mbseq, len);
   }
   
-  private static void readValue(DataInput in, MByteSequence mbseq) throws IOException {
+  private static void readValue(DataInput in, MutableByteSequence mbseq) throws IOException {
     int len = in.readInt();
     read(in, mbseq, len);
   }
   
-  private static void read(DataInput in, MByteSequence mbseqDestination, int len) throws IOException {
+  private static void read(DataInput in, MutableByteSequence mbseqDestination, int len) throws IOException {
     if (mbseqDestination.getBackingArray().length < len) {
-      mbseqDestination.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)]);
+      mbseqDestination.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)], 0, 0);
     }
     
     in.readFully(mbseqDestination.getBackingArray(), 0, len);
@@ -497,12 +472,12 @@ public class RelativeKey implements Writable {
     return data;
     }
     
-  private static void readPrefix(DataInput in, MByteSequence dest, ByteSequence prefixSource) throws IOException {
+  private static void readPrefix(DataInput in, MutableByteSequence dest, ByteSequence prefixSource) throws IOException {
     int prefixLen = WritableUtils.readVInt(in);
     int remainingLen = WritableUtils.readVInt(in);
     int len = prefixLen + remainingLen;
     if (dest.getBackingArray().length < len) {
-      dest.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)]);
+      dest.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)], 0, 0);
     }
     if (prefixSource.isBackedByArray()) {
       System.arraycopy(prefixSource.getBackingArray(), prefixSource.offset(), dest.getBackingArray(), 0, prefixLen);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
new file mode 100644
index 0000000..c0045ac
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.system;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * 
+ */
+public class LocalityGroupIterator extends HeapIterator implements InterruptibleIterator {
+
+  private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
+
+  public static class LocalityGroup {
+    /**
+     * @param localityGroup
+     * @param env
+     */
+    private LocalityGroup(LocalityGroup localityGroup, IteratorEnvironment env) {
+      this(localityGroup.columnFamilies, localityGroup.isDefaultLocalityGroup);
+      this.iterator = (InterruptibleIterator) localityGroup.iterator.deepCopy(env);
+    }
+    
+    public LocalityGroup(InterruptibleIterator iterator, Map<ByteSequence,MutableLong> columnFamilies, boolean isDefaultLocalityGroup) {
+      this(columnFamilies, isDefaultLocalityGroup);
+      this.iterator = iterator;
+    }
+    
+    public LocalityGroup(Map<ByteSequence,MutableLong> columnFamilies, boolean isDefaultLocalityGroup) {
+      this.isDefaultLocalityGroup = isDefaultLocalityGroup;
+      this.columnFamilies = columnFamilies;
+    }
+
+    public InterruptibleIterator getIterator() {
+      return iterator;
+    }
+
+    protected boolean isDefaultLocalityGroup;
+    protected Map<ByteSequence,MutableLong> columnFamilies;
+    private InterruptibleIterator iterator;
+  }
+  
+  private LocalityGroup groups[];
+  private Set<ByteSequence> nonDefaultColumnFamilies;
+  private AtomicBoolean interruptFlag;
+
+  public LocalityGroupIterator(LocalityGroup groups[], Set<ByteSequence> nonDefaultColumnFamilies) {
+    super(groups.length);
+    this.groups = groups;
+    this.nonDefaultColumnFamilies = nonDefaultColumnFamilies;
+  }
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  public static final int seek(HeapIterator hiter, LocalityGroup[] groups, Set<ByteSequence> nonDefaultColumnFamilies, Range range,
+      Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    hiter.clear();
+    
+    int numLGSeeked = 0;
+    
+    Set<ByteSequence> cfSet;
+    if (columnFamilies.size() > 0)
+      if (columnFamilies instanceof Set<?>) {
+        cfSet = (Set<ByteSequence>) columnFamilies;
+      } else {
+        cfSet = new HashSet<ByteSequence>();
+        cfSet.addAll(columnFamilies);
+      }
+    else
+      cfSet = Collections.emptySet();
+
+    for (LocalityGroup lgr : groups) {
+      // when include is set to true it means this locality groups contains
+      // wanted column families
+      boolean include = false;
+      
+      if (cfSet.size() == 0) {
+        include = !inclusive;
+      } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
+        // do not know what column families are in the default locality group,
+        // only know what column families are not in it
+        
+        if (inclusive) {
+          if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
+            // default LG may contain wanted and unwanted column families
+            include = true;
+          }// else - everything wanted is in other locality groups, so nothing to do
+        } else {
+          // must include, if all excluded column families are in other locality groups
+          // then there are not unwanted column families in default LG
+          include = true;
+        }
+      } else {
+        /*
+         * Need to consider the following cases for inclusive and exclusive (lgcf:locality group column family set, cf:column family set) lgcf and cf are
+         * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the other
+         */
+        
+        for (Entry<ByteSequence,MutableLong> entry : lgr.columnFamilies.entrySet())
+          if (entry.getValue().longValue() > 0)
+            if (cfSet.contains(entry.getKey())) {
+              if (inclusive)
+                include = true;
+            } else if (!inclusive) {
+              include = true;
+            }
+      }
+
+      if (include) {
+        lgr.getIterator().seek(range, EMPTY_CF_SET, false);
+        hiter.addSource(lgr.getIterator());
+        numLGSeeked++;
+      }// every column family is excluded, zero count, or not present
+    }
+    
+    return numLGSeeked;
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    seek(this, groups, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    LocalityGroup[] groupsCopy = new LocalityGroup[groups.length];
+    
+    for (int i = 0; i < groups.length; i++) {
+      groupsCopy[i] = new LocalityGroup(groups[i], env);
+      if (interruptFlag != null)
+        groupsCopy[i].getIterator().setInterruptFlag(interruptFlag);
+    }
+    
+    return new LocalityGroupIterator(groupsCopy, nonDefaultColumnFamilies);
+  }
+  
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    this.interruptFlag = flag;
+    for (LocalityGroup lgr : groups) {
+      lgr.getIterator().setInterruptFlag(flag);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index 2dfbf86..a209a00 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@ -16,10 +16,12 @@
  */
 package org.apache.accumulo.core.util;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -32,6 +34,10 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.io.Text;
 
 public class LocalityGroupUtil {
@@ -175,4 +181,113 @@ public class LocalityGroupUtil {
     return ecf;
   }
   
+  private static class PartitionedMutation extends Mutation {
+    private byte[] row;
+    private List<ColumnUpdate> updates;
+    
+    PartitionedMutation(byte[] row, List<ColumnUpdate> updates) {
+      this.row = row;
+      this.updates = updates;
+    }
+    
+    @Override
+    public byte[] getRow() {
+      return row;
+    }
+    
+    @Override
+    public List<ColumnUpdate> getUpdates() {
+      return updates;
+    }
+    
+    @Override
+    public TMutation toThrift() {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public int hashCode() {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public boolean equals(Mutation m) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static class Partitioner {
+    
+    private Map<ByteSequence,Integer> colfamToLgidMap;
+    private Map<ByteSequence,MutableLong>[] groups;
+    
+    public Partitioner(Map<ByteSequence,MutableLong> groups[]) {
+      this.groups = groups;
+      this.colfamToLgidMap = new HashMap<ByteSequence,Integer>();
+      
+      for (int i = 0; i < groups.length; i++) {
+        for (ByteSequence cf : groups[i].keySet()) {
+          colfamToLgidMap.put(cf, i);
+        }
+      }
+    }
+    
+    public void partition(List<Mutation> mutations, List<Mutation> partitionedMutations[]) {
+
+      MutableByteSequence mbs = new MutableByteSequence(new byte[0], 0, 0);
+      
+      @SuppressWarnings("unchecked")
+      List<ColumnUpdate> parts[] = new List[groups.length + 1];
+      
+      for (Mutation mutation : mutations) {
+        if (mutation.getUpdates().size() == 1) {
+          int lgid = getLgid(mbs, mutation.getUpdates().get(0));
+          partitionedMutations[lgid].add(mutation);
+        } else {
+          for (int i = 0; i < parts.length; i++) {
+            parts[i] = null;
+          }
+          
+          int lgcount = 0;
+
+          for (ColumnUpdate cu : mutation.getUpdates()) {
+            int lgid = getLgid(mbs, cu);
+
+            if (parts[lgid] == null) {
+              parts[lgid] = new ArrayList<ColumnUpdate>();
+              lgcount++;
+            }
+            
+            parts[lgid].add(cu);
+          }
+          
+          if (lgcount == 1) {
+            for (int i = 0; i < parts.length; i++)
+              if (parts[i] != null) {
+                partitionedMutations[i].add(mutation);
+                break;
+              }
+          } else {
+            for (int i = 0; i < parts.length; i++)
+              if (parts[i] != null)
+                partitionedMutations[i].add(new PartitionedMutation(mutation.getRow(), parts[i]));
+          }
+        }
+      }
+    }
+    
+    private Integer getLgid(MutableByteSequence mbs, ColumnUpdate cu) {
+      mbs.setArray(cu.getColumnFamily(), 0, cu.getColumnFamily().length);
+      Integer lgid = colfamToLgidMap.get(mbs);
+      if (lgid == null)
+        lgid = groups.length;
+      return lgid;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/main/java/org/apache/accumulo/core/util/MutableByteSequence.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/MutableByteSequence.java b/core/src/main/java/org/apache/accumulo/core/util/MutableByteSequence.java
new file mode 100644
index 0000000..6db7170
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/MutableByteSequence.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+
+
+public class MutableByteSequence extends ArrayByteSequence {
+  private static final long serialVersionUID = 1L;
+
+  public MutableByteSequence(byte[] data, int offset, int length) {
+    super(data, offset, length);
+  }
+  
+  public MutableByteSequence(ByteSequence bs) {
+    super(new byte[Math.max(64, bs.length())]);
+    System.arraycopy(bs.getBackingArray(), bs.offset(), data, 0, bs.length());
+    this.length = bs.length();
+    this.offset = 0;
+  }
+  
+  public void setArray(byte[] data, int offset, int len) {
+    this.data = data;
+    this.offset = offset;
+    this.length = len;
+  }
+  
+  public void setLength(int len) {
+    this.length = len;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
index 1608576..8c0e691 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
@@ -30,7 +30,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.accumulo.core.util.UnsynchronizedBuffer;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -176,7 +176,7 @@ public class RelativeKeyTest {
     Key seekKey = new Key();
     Key prevKey = new Key();
     Key currKey = null;
-    MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+    MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
     
     RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
     assertEquals(1, skippr.skipped);
@@ -207,7 +207,7 @@ public class RelativeKeyTest {
     Key seekKey = new Key("s", "t", "u", "v", 1);
     Key prevKey = new Key();
     Key currKey = null;
-    MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+    MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
     
     RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
   }
@@ -218,7 +218,7 @@ public class RelativeKeyTest {
     Key seekKey = expectedKeys.get(seekIndex);
     Key prevKey = new Key();
     Key currKey = null;
-    MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+    MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
     
     RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/core/src/test/java/org/apache/accumulo/core/util/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/util/PartitionerTest.java b/core/src/test/java/org/apache/accumulo/core/util/PartitionerTest.java
new file mode 100644
index 0000000..276720c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/util/PartitionerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PartitionerTest {
+  @Test
+  public void test1() {
+    
+    @SuppressWarnings("unchecked")
+    Map<ByteSequence,MutableLong>[] groups = new Map[2];
+    
+    groups[0] = new HashMap<ByteSequence,MutableLong>();
+    groups[0].put(new ArrayByteSequence("cf1"), new MutableLong(1));
+    groups[0].put(new ArrayByteSequence("cf2"), new MutableLong(1));
+    
+    groups[1] = new HashMap<ByteSequence,MutableLong>();
+    groups[1].put(new ArrayByteSequence("cf3"), new MutableLong(1));
+    
+    Partitioner p1 = new Partitioner(groups);
+    
+    Mutation m1 = new Mutation("r1");
+    m1.put("cf1", "cq1", "v1");
+    
+    Mutation m2 = new Mutation("r2");
+    m2.put("cf1", "cq1", "v2");
+    m2.put("cf2", "cq2", "v3");
+    
+    Mutation m3 = new Mutation("r3");
+    m3.put("cf1", "cq1", "v4");
+    m3.put("cf3", "cq2", "v5");
+    
+    Mutation m4 = new Mutation("r4");
+    m4.put("cf1", "cq1", "v6");
+    m4.put("cf3", "cq2", "v7");
+    m4.put("cf5", "cq3", "v8");
+    
+    Mutation m5 = new Mutation("r5");
+    m5.put("cf5", "cq3", "v9");
+    
+    List<Mutation> mutations = Arrays.asList(m1, m2, m3, m4, m5);
+    @SuppressWarnings("unchecked")
+    List<Mutation>[] partitioned = new List[3];
+    
+    for (int i = 0; i < partitioned.length; i++) {
+      partitioned[i] = new ArrayList<Mutation>();
+    }
+    
+    p1.partition(mutations, partitioned);
+    
+    m1 = new Mutation("r1");
+    m1.put("cf1", "cq1", "v1");
+    
+    m2 = new Mutation("r2");
+    m2.put("cf1", "cq1", "v2");
+    m2.put("cf2", "cq2", "v3");
+    
+    m3 = new Mutation("r3");
+    m3.put("cf1", "cq1", "v4");
+    
+    m4 = new Mutation("r4");
+    m4.put("cf1", "cq1", "v6");
+    
+    Assert.assertEquals(toKeySet(m1,m2,m3,m4), toKeySet(partitioned[0]));
+    
+    m3 = new Mutation("r3");
+    m3.put("cf3", "cq2", "v5");
+    
+    m4 = new Mutation("r4");
+    m4.put("cf3", "cq2", "v7");
+    
+    Assert.assertEquals(toKeySet(m3,m4),  toKeySet(partitioned[1]));
+    
+    m4 = new Mutation("r4");
+    m4.put("cf5", "cq3", "v8");
+    
+    Assert.assertEquals(toKeySet(m4,m5),  toKeySet(partitioned[2]));
+    
+  }
+
+  private Set<Key> toKeySet(List<Mutation> mutations){
+    return toKeySet(mutations.toArray(new Mutation[0]));
+  }
+  
+  private Set<Key> toKeySet(Mutation ... expected) {
+    HashSet<Key> ret = new HashSet<Key>();
+    for (Mutation mutation : expected) 
+      for(ColumnUpdate cu : mutation.getUpdates())
+       ret.add(new Key(mutation.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(), cu.getColumnVisibility(), cu.getTimestamp()));
+    
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
index 57f36c3..a648366 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -52,13 +53,18 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.SortedMapIterator;
 import org.apache.accumulo.core.iterators.WrappingIterator;
 import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
 import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
 import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -167,6 +173,7 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible
 
   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
     super.seek(range, columnFamilies, inclusive);
+    
     if (hasTop())
       getTopKeyVal();
 
@@ -193,24 +200,37 @@ public class InMemoryMap {
   
   private volatile String memDumpFile = null;
   private final String memDumpDir;
+
+  private Map<String,Set<ByteSequence>> lggroups;
   
   public InMemoryMap(boolean useNativeMap, String memDumpDir) {
+    this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
+  }
+
+  public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
     this.memDumpDir = memDumpDir;
+    this.lggroups = lggroups;
+    
+    if (lggroups.size() == 0)
+      map = newMap(useNativeMap);
+    else
+      map = new LocalityGroupMap(lggroups, useNativeMap);
+  }
+  
+  public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
+    this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
+  }
+  
+  private static SimpleMap newMap(boolean useNativeMap) {
     if (useNativeMap && NativeMap.loadedNativeLibraries()) {
       try {
-        map = new NativeMapWrapper();
+        return new NativeMapWrapper();
       } catch (Throwable t) {
         log.error("Failed to create native map", t);
       }
     }
     
-    if (map == null) {
-      map = new DefaultMap();
-    }
-  }
-  
-  public InMemoryMap(AccumuloConfiguration config) {
-    this(config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
+    return new DefaultMap();
   }
   
   private interface SimpleMap {
@@ -229,6 +249,115 @@ public class InMemoryMap {
     public void mutate(List<Mutation> mutations, int kvCount);
   }
   
+  private static class LocalityGroupMap implements SimpleMap {
+    
+    private Map<ByteSequence,MutableLong> groupFams[];
+    
+    // the last map in the array is the default locality group
+    private SimpleMap maps[];
+    private Partitioner partitioner;
+    private List<Mutation>[] partitioned;
+    private Set<ByteSequence> nonDefaultColumnFamilies;
+    
+    @SuppressWarnings("unchecked")
+    LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
+      this.groupFams = new Map[groups.size()];
+      this.maps = new SimpleMap[groups.size() + 1];
+      this.partitioned = new List[groups.size() + 1];
+      this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
+      
+      for (int i = 0; i < maps.length; i++) {
+        maps[i] = newMap(useNativeMap);
+      }
+
+      int count = 0;
+      for (Set<ByteSequence> cfset : groups.values()) {
+        HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
+        for (ByteSequence bs : cfset)
+          map.put(bs, new MutableLong(1));
+        this.groupFams[count++] = map;
+        nonDefaultColumnFamilies.addAll(cfset);
+      }
+      
+      partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
+      
+      for (int i = 0; i < partitioned.length; i++) {
+        partitioned[i] = new ArrayList<Mutation>();
+      }
+    }
+
+    @Override
+    public Value get(Key key) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public int size() {
+      int sum = 0;
+      for (SimpleMap map : maps)
+        sum += map.size();
+      return sum;
+    }
+    
+    @Override
+    public InterruptibleIterator skvIterator() {
+      LocalityGroup groups[] = new LocalityGroup[maps.length];
+      for (int i = 0; i < groups.length; i++) {
+        if (i < groupFams.length)
+          groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
+        else
+          groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
+      }
+
+
+      return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
+    }
+    
+    @Override
+    public void delete() {
+      for (SimpleMap map : maps)
+        map.delete();
+    }
+    
+    @Override
+    public long getMemoryUsed() {
+      long sum = 0;
+      for (SimpleMap map : maps)
+        sum += map.getMemoryUsed();
+      return sum;
+    }
+    
+    @Override
+    public synchronized void mutate(List<Mutation> mutations, int kvCount) {
+      // this method is synchronized because it reuses objects to avoid allocation,
+      // currently, the method that calls this is synchronized so there is no
+      // loss in parallelism.... synchronization was added here for future proofing
+      
+      try{
+        partitioner.partition(mutations, partitioned);
+        
+        for (int i = 0; i < partitioned.length; i++) {
+          if (partitioned[i].size() > 0) {
+            maps[i].mutate(partitioned[i], kvCount);
+            for (Mutation m : partitioned[i])
+              kvCount += m.getUpdates().size();
+          }
+        }
+      } finally {
+        // clear immediately so mutations can be garbage collected
+        for (List<Mutation> list : partitioned) {
+          list.clear();
+        }
+      }
+    }
+    
+  }
+
   private static class DefaultMap implements SimpleMap {
     private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
     private AtomicLong bytesInMemory = new AtomicLong();
@@ -568,18 +697,23 @@ public class InMemoryMap {
         newConf.setInt("io.seqfile.compress.blocksize", 100000);
         
         FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
-        out.startDefaultLocalityGroup();
+        
         InterruptibleIterator iter = map.skvIterator();
-        iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+       
+        HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
         
-        while (iter.hasTop() && activeIters.size() > 0) {
-          // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
-          // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
-          Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
-          out.append(iter.getTopKey(), newValue);
-          iter.next();
+        for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
+          allfams.addAll(entry.getValue());
+          out.startNewLocalityGroup(entry.getKey(), entry.getValue());
+          iter.seek(new Range(), entry.getValue(), true);
+          dumpLocalityGroup(out, iter);
         }
         
+        out.startDefaultLocalityGroup();
+        iter.seek(new Range(), allfams, false);
+       
+        dumpLocalityGroup(out, iter);
+        
         out.close();
         
         log.debug("Created mem dump file " + tmpFile);
@@ -614,4 +748,15 @@ public class InMemoryMap {
     
     tmpMap.delete();
   }
+
+  private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
+    while (iter.hasTop() && activeIters.size() > 0) {
+      // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
+      // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
+      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+      out.append(iter.getTopKey(), newValue);
+      iter.next();
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index d985f4a..0272a2f 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -90,6 +90,7 @@ import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -259,7 +260,11 @@ public class Tablet {
     private CommitSession commitSession;
     
     TabletMemory() {
-      memTable = new InMemoryMap(tabletServer.getSystemConfiguration());
+      try {
+        memTable = new InMemoryMap(acuTableConf);
+      } catch (LocalityGroupConfigurationError e) {
+        throw new RuntimeException(e);
+      }
       commitSession = new CommitSession(nextSeq, memTable);
       nextSeq += 2;
     }
@@ -282,7 +287,11 @@ public class Tablet {
       }
       
       otherMemTable = memTable;
-      memTable = new InMemoryMap(tabletServer.getSystemConfiguration());
+      try {
+        memTable = new InMemoryMap(acuTableConf);
+      } catch (LocalityGroupConfigurationError e) {
+        throw new RuntimeException(e);
+      }
       
       CommitSession oldCommitSession = commitSession;
       commitSession = new CommitSession(nextSeq, memTable);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
index 97c8eec..fd5e661 100644
--- a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
@@ -18,8 +18,13 @@ package org.apache.accumulo.server.tabletserver;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -82,6 +87,14 @@ public class InMemoryMapTest extends TestCase {
     
   }
   
+  static Set<ByteSequence> newCFSet(String... cfs) {
+    HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
+    for (String cf : cfs) {
+      cfSet.add(new ArrayByteSequence(cf));
+    }
+    return cfSet;
+  }
+
   public void test2() throws Exception {
     InMemoryMap imm = new InMemoryMap(false, "/tmp");
     
@@ -345,4 +358,135 @@ public class InMemoryMapTest extends TestCase {
     }
   }
   
+  public void testLocalityGroups() throws Exception {
+    
+    Map<String,Set<ByteSequence>> lggroups1 = new HashMap<String,Set<ByteSequence>>();
+    lggroups1.put("lg1", newCFSet("cf1", "cf2"));
+    lggroups1.put("lg2", newCFSet("cf3", "cf4"));
+    
+    InMemoryMap imm = new InMemoryMap(lggroups1, false, "/tmp");
+    
+    Mutation m1 = new Mutation("r1");
+    m1.put("cf1", "x", 2, "1");
+    m1.put("cf1", "y", 2, "2");
+    m1.put("cf3", "z", 2, "3");
+    m1.put("foo", "b", 2, "9");
+    
+    Mutation m2 = new Mutation("r2");
+    m2.put("cf2", "x", 3, "5");
+    
+    Mutation m3 = new Mutation("r3");
+    m3.put("foo", "b", 4, "6");
+    
+    Mutation m4 = new Mutation("r4");
+    m4.put("foo", "b", 5, "7");
+    m4.put("cf4", "z", 5, "8");
+    
+    Mutation m5 = new Mutation("r5");
+    m5.put("cf3", "z", 6, "A");
+    m5.put("cf4", "z", 6, "B");
+    
+    imm.mutate(Arrays.asList(m1, m2, m3, m4, m5));
+    
+    MemoryIterator iter1 = imm.skvIterator();
+    
+    seekLocalityGroups(iter1);
+    SortedKeyValueIterator<Key,Value> dc1 = iter1.deepCopy(null);
+    seekLocalityGroups(dc1);
+    
+    assertTrue(imm.getNumEntries() == 10);
+    assertTrue(imm.estimatedSizeInBytes() > 0);
+
+    imm.delete(0);
+
+    seekLocalityGroups(iter1);
+    seekLocalityGroups(dc1);
+    // TODO uncomment following when ACCUMULO-1628 is fixed
+    // seekLocalityGroups(iter1.deepCopy(null));
+  }
+
+  private void seekLocalityGroups(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
+    iter1.seek(new Range(), newCFSet("cf1"), true);
+    ae(iter1, "r1", "cf1:x", 2, "1");
+    ae(iter1, "r1", "cf1:y", 2, "2");
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    assertFalse(iter1.hasTop());
+    
+    iter1.seek(new Range("r2", "r4"), newCFSet("cf1"), true);
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    assertFalse(iter1.hasTop());
+
+    iter1.seek(new Range(), newCFSet("cf3"), true);
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    ae(iter1, "r5", "cf3:z", 6, "A");
+    ae(iter1, "r5", "cf4:z", 6, "B");
+    assertFalse(iter1.hasTop());
+    
+    iter1.seek(new Range(), newCFSet("foo"), true);
+    ae(iter1, "r1", "foo:b", 2, "9");
+    ae(iter1, "r3", "foo:b", 4, "6");
+    ae(iter1, "r4", "foo:b", 5, "7");
+    assertFalse(iter1.hasTop());
+    
+    iter1.seek(new Range(), newCFSet("cf1", "cf3"), true);
+    ae(iter1, "r1", "cf1:x", 2, "1");
+    ae(iter1, "r1", "cf1:y", 2, "2");
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    ae(iter1, "r5", "cf3:z", 6, "A");
+    ae(iter1, "r5", "cf4:z", 6, "B");
+    assertFalse(iter1.hasTop());
+    
+    iter1.seek(new Range("r2", "r4"), newCFSet("cf1", "cf3"), true);
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    assertFalse(iter1.hasTop());
+
+    iter1.seek(new Range(), newCFSet("cf1", "cf3", "foo"), true);
+    assertAll(iter1);
+    
+    iter1.seek(new Range("r1", "r2"), newCFSet("cf1", "cf3", "foo"), true);
+    ae(iter1, "r1", "cf1:x", 2, "1");
+    ae(iter1, "r1", "cf1:y", 2, "2");
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r1", "foo:b", 2, "9");
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    assertFalse(iter1.hasTop());
+
+    iter1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+    assertAll(iter1);
+    
+    iter1.seek(new Range(), newCFSet("cf1"), false);
+    assertAll(iter1);
+    
+    iter1.seek(new Range(), newCFSet("cf1", "cf2"), false);
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r1", "foo:b", 2, "9");
+    ae(iter1, "r3", "foo:b", 4, "6");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    ae(iter1, "r4", "foo:b", 5, "7");
+    ae(iter1, "r5", "cf3:z", 6, "A");
+    ae(iter1, "r5", "cf4:z", 6, "B");
+    assertFalse(iter1.hasTop());
+
+    iter1.seek(new Range("r2"), newCFSet("cf1", "cf3", "foo"), true);
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    assertFalse(iter1.hasTop());
+  }
+
+  private void assertAll(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
+    ae(iter1, "r1", "cf1:x", 2, "1");
+    ae(iter1, "r1", "cf1:y", 2, "2");
+    ae(iter1, "r1", "cf3:z", 2, "3");
+    ae(iter1, "r1", "foo:b", 2, "9");
+    ae(iter1, "r2", "cf2:x", 3, "5");
+    ae(iter1, "r3", "foo:b", 4, "6");
+    ae(iter1, "r4", "cf4:z", 5, "8");
+    ae(iter1, "r4", "foo:b", 5, "7");
+    ae(iter1, "r5", "cf3:z", 6, "A");
+    ae(iter1, "r5", "cf4:z", 6, "B");
+    assertFalse(iter1.hasTop());
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0608e32f/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
new file mode 100644
index 0000000..948a741
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ */
+public class IMMLGBenchmark {
+  public static void main(String[] args) throws Exception {
+    ZooKeeperInstance zki = new ZooKeeperInstance("test16", "localhost");
+    Connector conn = zki.getConnector("root", new PasswordToken("secret"));
+    
+    int numlg = Integer.parseInt(args[0]);
+    
+    ArrayList<byte[]> cfset = new ArrayList<byte[]>();
+    
+    for (int i = 0; i < 32; i++) {
+      cfset.add(String.format("%04x", i).getBytes());
+    }
+    
+    Map<String,Stat> stats = new TreeMap<String,Stat>();
+
+    for (int i = 0; i < 5; i++) {
+      runTest(conn, numlg, cfset, i > 1 ? stats : null);
+      System.out.println();
+    }
+
+    for (Entry<String,Stat> entry : stats.entrySet()) {
+      System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().getAverage());
+    }
+
+  }
+
+  private static void runTest(Connector conn, int numlg, ArrayList<byte[]> cfset, Map<String,Stat> stats) throws Exception {
+    String table = "immlgb";
+    
+    try {
+      conn.tableOperations().delete(table);
+    } catch (TableNotFoundException tnfe) {}
+
+    conn.tableOperations().create(table);
+    conn.tableOperations().setProperty(table, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "snappy");
+    
+    setupLocalityGroups(conn, numlg, cfset, table);
+
+    addStat(stats, "write", write(conn, cfset, table));
+    addStat(stats, "scan cf", scan(conn, cfset, table, false));
+    addStat(stats, "scan cf:cq", scan(conn, cfset, table, true));
+    // TODO time reading all data
+
+    long t1 = System.currentTimeMillis();
+    conn.tableOperations().flush(table, null, null, true);
+    long t2 = System.currentTimeMillis();
+    
+    addStat(stats, "flush", t2 - t1);
+  }
+  
+  private static void addStat(Map<String,Stat> stats, String s, long wt) {
+    System.out.println(s + ":" + wt);
+    
+    if (stats == null)
+      return;
+
+    Stat stat = stats.get(s);
+    if (stat == null) {
+      stat = new Stat();
+      stats.put(s, stat);
+    }
+    stat.addStat(wt);
+  }
+  
+  private static long scan(Connector conn, ArrayList<byte[]> cfset, String table, boolean cq) throws TableNotFoundException {
+    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+    
+    if (!cq)
+      scanner.fetchColumnFamily(new Text(cfset.get(15)));
+    else
+      scanner.fetchColumn(new Text(cfset.get(15)), new Text(cfset.get(15)));
+
+    long t1 = System.currentTimeMillis();
+    
+    @SuppressWarnings("unused")
+    int count = 0;
+    for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) {
+      count++;
+    }
+    
+    long t2 = System.currentTimeMillis();
+    
+    return t2 - t1;
+    
+  }
+
+  private static long write(Connector conn, ArrayList<byte[]> cfset, String table) throws TableNotFoundException, MutationsRejectedException {
+    Random rand = new Random();
+    
+    byte val[] = new byte[50];
+    
+    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    
+    long t1 = System.currentTimeMillis();
+
+    for (int i = 0; i < 1 << 15; i++) {
+      byte[] row = FastFormat.toZeroPaddedString(Math.abs(rand.nextLong()), 16, 16, new byte[0]);
+      
+      Mutation m = new Mutation(row);
+      for (byte[] cf : cfset) {
+        byte[] cq = FastFormat.toZeroPaddedString(rand.nextInt(1 << 16), 4, 16, new byte[0]);
+        rand.nextBytes(val);
+        m.put(cf, cq, val);
+      }
+      
+      bw.addMutation(m);
+    }
+    
+    bw.close();
+    
+    long t2 = System.currentTimeMillis();
+    
+    return t2 - t1;
+  }
+
+  private static void setupLocalityGroups(Connector conn, int numlg, ArrayList<byte[]> cfset, String table) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    if (numlg > 1) {
+      int numCF = cfset.size() / numlg;
+      int gNum = 0;
+      
+      Iterator<byte[]> cfiter = cfset.iterator();
+      Map<String,Set<Text>> groups = new HashMap<String,Set<Text>>();
+      while (cfiter.hasNext()) {
+        HashSet<Text> groupCols = new HashSet<Text>();
+        for (int i = 0; i < numCF && cfiter.hasNext(); i++) {
+          groupCols.add(new Text(cfiter.next()));
+        }
+        
+        groups.put("lg" + (gNum++), groupCols);
+      }
+      
+      conn.tableOperations().setLocalityGroups(table, groups);
+      conn.tableOperations().offline(table);
+      UtilWaitThread.sleep(1000);
+      conn.tableOperations().online(table);
+    }
+  }
+}