You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/12 09:46:38 UTC

[09/10] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT

Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
	server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d54e0fd8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d54e0fd8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d54e0fd8

Branch: refs/heads/1.6.1-SNAPSHOT
Commit: d54e0fd8636405b39a982a6fad5a3fca1593d6cf
Parents: 0d76cd5 7699e1f
Author: Josh Elser <el...@apache.org>
Authored: Thu Sep 11 17:42:01 2014 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Sep 11 17:42:01 2014 -0700

----------------------------------------------------------------------
 .../system/SourceSwitchingIterator.java         | 20 ++++------
 .../system/SourceSwitchingIteratorTest.java     | 38 +++++++++++++++++-
 .../apache/accumulo/tserver/FileManager.java    | 13 +++++++
 .../apache/accumulo/tserver/InMemoryMap.java    | 21 +++++++---
 .../org/apache/accumulo/tserver/Tablet.java     |  5 +++
 .../accumulo/tserver/InMemoryMapTest.java       | 41 ++++++++++++++++----
 6 files changed, 112 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d54e0fd8/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 8bf2517,0000000..b82b9cc
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@@ -1,562 -1,0 +1,575 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.Semaphore;
++import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 +import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.problems.ProblemReport;
 +import org.apache.accumulo.server.problems.ProblemReportingIterator;
 +import org.apache.accumulo.server.problems.ProblemReports;
 +import org.apache.accumulo.server.problems.ProblemType;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
 +public class FileManager {
 +  
 +  private static final Logger log = Logger.getLogger(FileManager.class);
 +  
 +  int maxOpen;
 +  
 +  private static class OpenReader implements Comparable<OpenReader> {
 +    long releaseTime;
 +    FileSKVIterator reader;
 +    String fileName;
 +    
 +    public OpenReader(String fileName, FileSKVIterator reader) {
 +      this.fileName = fileName;
 +      this.reader = reader;
 +      this.releaseTime = System.currentTimeMillis();
 +    }
 +    
 +    @Override
 +    public int compareTo(OpenReader o) {
 +      if (releaseTime < o.releaseTime) {
 +        return -1;
 +      } else if (releaseTime > o.releaseTime) {
 +        return 1;
 +      } else {
 +        return 0;
 +      }
 +    }
 +    
 +    @Override
 +    public boolean equals(Object obj) {
 +      if (obj instanceof OpenReader) {
 +        return compareTo((OpenReader) obj) == 0;
 +      }
 +      return false;
 +    }
 +    
 +    @Override
 +    public int hashCode() {
 +      return fileName.hashCode();
 +    }
 +  }
 +  
 +  private Map<String,List<OpenReader>> openFiles;
 +  private HashMap<FileSKVIterator,String> reservedReaders;
 +  
 +  private Semaphore filePermits;
 +  
 +  private VolumeManager fs;
 +  
 +  // the data cache and index cache are allocated in
 +  // TabletResourceManager and passed through the file opener to
 +  // CachableBlockFile which can handle the caches being
 +  // null if unallocated
 +  private BlockCache dataCache = null;
 +  private BlockCache indexCache = null;
 +  
 +  private long maxIdleTime;
 +  
 +  private final ServerConfiguration conf;
 +  
 +  private class IdleFileCloser implements Runnable {
 +    
 +    @Override
 +    public void run() {
 +      
 +      long curTime = System.currentTimeMillis();
 +      
 +      ArrayList<FileSKVIterator> filesToClose = new ArrayList<FileSKVIterator>();
 +      
 +      // determine which files to close in a sync block, and then close the
 +      // files outside of the sync block
 +      synchronized (FileManager.this) {
 +        Iterator<Entry<String,List<OpenReader>>> iter = openFiles.entrySet().iterator();
 +        while (iter.hasNext()) {
 +          Entry<String,List<OpenReader>> entry = iter.next();
 +          List<OpenReader> ofl = entry.getValue();
 +          
 +          for (Iterator<OpenReader> oflIter = ofl.iterator(); oflIter.hasNext();) {
 +            OpenReader openReader = oflIter.next();
 +            
 +            if (curTime - openReader.releaseTime > maxIdleTime) {
 +              
 +              filesToClose.add(openReader.reader);
 +              oflIter.remove();
 +            }
 +          }
 +          
 +          if (ofl.size() == 0) {
 +            iter.remove();
 +          }
 +        }
 +      }
 +      
 +      closeReaders(filesToClose);
 +      
 +    }
 +    
 +  }
 +  
 +  /**
 +   * 
 +   * @param dataCache
 +   *          : underlying file can and should be able to handle a null cache
 +   * @param indexCache
 +   *          : underlying file can and should be able to handle a null cache
 +   */
 +  FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
 +    
 +    if (maxOpen <= 0)
 +      throw new IllegalArgumentException("maxOpen <= 0");
 +    this.conf = conf;
 +    this.dataCache = dataCache;
 +    this.indexCache = indexCache;
 +    
 +    this.filePermits = new Semaphore(maxOpen, true);
 +    this.maxOpen = maxOpen;
 +    this.fs = fs;
 +    
 +    this.openFiles = new HashMap<String,List<OpenReader>>();
 +    this.reservedReaders = new HashMap<FileSKVIterator,String>();
 +    
 +    this.maxIdleTime = conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
 +    SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2);
 +    
 +  }
 +  
 +  private static int countReaders(Map<String,List<OpenReader>> files) {
 +    int count = 0;
 +    
 +    for (List<OpenReader> list : files.values()) {
 +      count += list.size();
 +    }
 +    
 +    return count;
 +  }
 +  
 +  private List<FileSKVIterator> takeLRUOpenFiles(int numToTake) {
 +    
 +    ArrayList<OpenReader> openReaders = new ArrayList<OpenReader>();
 +    
 +    for (Entry<String,List<OpenReader>> entry : openFiles.entrySet()) {
 +      openReaders.addAll(entry.getValue());
 +    }
 +    
 +    Collections.sort(openReaders);
 +    
 +    ArrayList<FileSKVIterator> ret = new ArrayList<FileSKVIterator>();
 +    
 +    for (int i = 0; i < numToTake; i++) {
 +      OpenReader or = openReaders.get(i);
 +      
 +      List<OpenReader> ofl = openFiles.get(or.fileName);
 +      if (!ofl.remove(or)) {
 +        throw new RuntimeException("Failed to remove open reader that should have been there");
 +      }
 +      
 +      if (ofl.size() == 0) {
 +        openFiles.remove(or.fileName);
 +      }
 +      
 +      ret.add(or.reader);
 +    }
 +    
 +    return ret;
 +  }
 +  
 +  private static <T> List<T> getFileList(String file, Map<String,List<T>> files) {
 +    List<T> ofl = files.get(file);
 +    if (ofl == null) {
 +      ofl = new ArrayList<T>();
 +      files.put(file, ofl);
 +    }
 +    
 +    return ofl;
 +  }
 +  
 +  private void closeReaders(List<FileSKVIterator> filesToClose) {
 +    for (FileSKVIterator reader : filesToClose) {
 +      try {
 +        reader.close();
 +      } catch (Exception e) {
 +        log.error("Failed to close file " + e.getMessage(), e);
 +      }
 +    }
 +  }
 +  
 +  private List<String> takeOpenFiles(Collection<String> files, List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String> readersReserved) {
 +    List<String> filesToOpen = new LinkedList<String>(files);
 +    for (Iterator<String> iterator = filesToOpen.iterator(); iterator.hasNext();) {
 +      String file = iterator.next();
 +      
 +      List<OpenReader> ofl = openFiles.get(file);
 +      if (ofl != null && ofl.size() > 0) {
 +        OpenReader openReader = ofl.remove(ofl.size() - 1);
 +        reservedFiles.add(openReader.reader);
 +        readersReserved.put(openReader.reader, file);
 +        if (ofl.size() == 0) {
 +          openFiles.remove(file);
 +        }
 +        iterator.remove();
 +      }
 +      
 +    }
 +    return filesToOpen;
 +  }
 +  
 +  private synchronized String getReservedReadeFilename(FileSKVIterator reader) {
 +    return reservedReaders.get(reader);
 +  }
 +  
 +  private List<FileSKVIterator> reserveReaders(Text table, Collection<String> files, boolean continueOnFailure) throws IOException {
 +    
 +    if (files.size() >= maxOpen) {
 +      throw new IllegalArgumentException("requested files exceeds max open");
 +    }
 +    
 +    if (files.size() == 0) {
 +      return Collections.emptyList();
 +    }
 +    
 +    List<String> filesToOpen = null;
 +    List<FileSKVIterator> filesToClose = Collections.emptyList();
 +    List<FileSKVIterator> reservedFiles = new ArrayList<FileSKVIterator>();
 +    Map<FileSKVIterator,String> readersReserved = new HashMap<FileSKVIterator,String>();
 +    
 +    filePermits.acquireUninterruptibly(files.size());
 +    
 +    // now that the we are past the semaphore, we have the authority
 +    // to open files.size() files
 +    
 +    // determine what work needs to be done in sync block
 +    // but do the work of opening and closing files outside
 +    // a synch block
 +    synchronized (this) {
 +      
 +      filesToOpen = takeOpenFiles(files, reservedFiles, readersReserved);
 +      
 +      int numOpen = countReaders(openFiles);
 +      
 +      if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) {
 +        filesToClose = takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen);
 +      }
 +    }
 +    
 +    // close files before opening files to ensure we stay under resource
 +    // limitations
 +    closeReaders(filesToClose);
 +    
 +    // open any files that need to be opened
 +    for (String file : filesToOpen) {
 +      try {
 +        if (!file.contains(":"))
 +          throw new IllegalArgumentException("Expected uri, got : " + file);
 +        Path path = new Path(file);
 +        FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
 +        //log.debug("Opening "+file + " path " + path);
 +        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
 +            dataCache, indexCache);
 +        reservedFiles.add(reader);
 +        readersReserved.put(reader, file);
 +      } catch (Exception e) {
 +        
 +        ProblemReports.getInstance().report(new ProblemReport(table.toString(), ProblemType.FILE_READ, file, e));
 +        
 +        if (continueOnFailure) {
 +          // release the permit for the file that failed to open
 +          filePermits.release(1);
 +          log.warn("Failed to open file " + file + " " + e.getMessage() + " continuing...");
 +        } else {
 +          // close whatever files were opened
 +          closeReaders(reservedFiles);
 +          
 +          filePermits.release(files.size());
 +          
 +          log.error("Failed to open file " + file + " " + e.getMessage());
 +          throw new IOException("Failed to open " + file, e);
 +        }
 +      }
 +    }
 +    
 +    synchronized (this) {
 +      // update set of reserved readers
 +      reservedReaders.putAll(readersReserved);
 +    }
 +    
 +    return reservedFiles;
 +  }
 +  
 +  private void releaseReaders(List<FileSKVIterator> readers, boolean sawIOException) {
 +    // put files in openFiles
 +    
 +    synchronized (this) {
 +      
 +      // check that readers were actually reserved ... want to make sure a thread does
 +      // not try to release readers they never reserved
 +      if (!reservedReaders.keySet().containsAll(readers)) {
 +        throw new IllegalArgumentException("Asked to release readers that were never reserved ");
 +      }
 +      
 +      for (FileSKVIterator reader : readers) {
 +        try {
 +          reader.closeDeepCopies();
 +        } catch (IOException e) {
 +          log.warn(e, e);
 +          sawIOException = true;
 +        }
 +      }
 +      
 +      for (FileSKVIterator reader : readers) {
 +        String fileName = reservedReaders.remove(reader);
 +        if (!sawIOException)
 +          getFileList(fileName, openFiles).add(new OpenReader(fileName, reader));
 +      }
 +    }
 +    
 +    if (sawIOException)
 +      closeReaders(readers);
 +    
 +    // decrement the semaphore
 +    filePermits.release(readers.size());
 +    
 +  }
 +  
 +  static class FileDataSource implements DataSource {
 +    
 +    private SortedKeyValueIterator<Key,Value> iter;
 +    private ArrayList<FileDataSource> deepCopies;
 +    private boolean current = true;
 +    private IteratorEnvironment env;
 +    private String file;
++    private AtomicBoolean iflag;
 +    
 +    FileDataSource(String file, SortedKeyValueIterator<Key,Value> iter) {
 +      this.file = file;
 +      this.iter = iter;
 +      this.deepCopies = new ArrayList<FileManager.FileDataSource>();
 +    }
 +    
 +    public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator<Key,Value> deepCopy, ArrayList<FileDataSource> deepCopies) {
 +      this.iter = deepCopy;
 +      this.env = env;
 +      this.deepCopies = deepCopies;
 +      deepCopies.add(this);
 +    }
 +    
 +    @Override
 +    public boolean isCurrent() {
 +      return current;
 +    }
 +    
 +    @Override
 +    public DataSource getNewDataSource() {
 +      current = true;
 +      return this;
 +    }
 +    
 +    @Override
 +    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
 +      return new FileDataSource(env, iter.deepCopy(env), deepCopies);
 +    }
 +    
 +    @Override
 +    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
++      if (iflag != null)
++        ((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
 +      return iter;
 +    }
 +    
 +    void unsetIterator() {
 +      current = false;
 +      iter = null;
 +      for (FileDataSource fds : deepCopies) {
 +        fds.current = false;
 +        fds.iter = null;
 +      }
 +    }
 +    
 +    void setIterator(SortedKeyValueIterator<Key,Value> iter) {
 +      current = false;
 +      this.iter = iter;
++
++      if (iflag != null)
++        ((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
++
 +      for (FileDataSource fds : deepCopies) {
 +        fds.current = false;
 +        fds.iter = iter.deepCopy(fds.env);
 +      }
 +    }
++
++    @Override
++    public void setInterruptFlag(AtomicBoolean flag) {
++      this.iflag = flag;
++    }
 +    
 +  }
 +  
 +  public class ScanFileManager {
 +    
 +    private ArrayList<FileDataSource> dataSources;
 +    private ArrayList<FileSKVIterator> tabletReservedReaders;
 +    private KeyExtent tablet;
 +    private boolean continueOnFailure;
 +    
 +    ScanFileManager(KeyExtent tablet) {
 +      tabletReservedReaders = new ArrayList<FileSKVIterator>();
 +      dataSources = new ArrayList<FileDataSource>();
 +      this.tablet = tablet;
 +      
 +      continueOnFailure = conf.getTableConfiguration(tablet).getBoolean(Property.TABLE_FAILURES_IGNORE);
 +      
 +      if (tablet.isMeta()) {
 +        continueOnFailure = false;
 +      }
 +    }
 +    
 +    private List<FileSKVIterator> openFileRefs(Collection<FileRef> files) throws TooManyFilesException, IOException {
 +      List<String> strings = new ArrayList<String>(files.size());
 +      for (FileRef ref : files)
 +        strings.add(ref.path().toString());
 +      return openFiles(strings);
 +    }
 +    
 +    private List<FileSKVIterator> openFiles(Collection<String> files) throws TooManyFilesException, IOException {
 +      // one tablet can not open more than maxOpen files, otherwise it could get stuck
 +      // forever waiting on itself to release files
 +      
 +      if (tabletReservedReaders.size() + files.size() >= maxOpen) {
 +        throw new TooManyFilesException("Request to open files would exceed max open files reservedReaders.size()=" + tabletReservedReaders.size()
 +            + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " tablet = " + tablet);
 +      }
 +      
 +      List<FileSKVIterator> newlyReservedReaders = reserveReaders(tablet.getTableId(), files, continueOnFailure);
 +      
 +      tabletReservedReaders.addAll(newlyReservedReaders);
 +      return newlyReservedReaders;
 +    }
 +    
 +    synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
 +      
 +      List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
 +      
 +      ArrayList<InterruptibleIterator> iters = new ArrayList<InterruptibleIterator>();
 +      
 +      for (FileSKVIterator reader : newlyReservedReaders) {
 +        String filename = getReservedReadeFilename(reader);
 +        InterruptibleIterator iter;
 +        if (detachable) {
 +          FileDataSource fds = new FileDataSource(filename, reader);
 +          dataSources.add(fds);
 +          SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
 +          iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, ssi);
 +        } else {
 +          iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader);
 +        }
 +        DataFileValue value = files.get(new FileRef(filename));
 +        if (value.isTimeSet()) {
 +          iter = new TimeSettingIterator(iter, value.getTime());
 +        }
 +        
 +        iters.add(iter);
 +      }
 +      
 +      return iters;
 +    }
 +    
 +    synchronized void detach() {
 +      
 +      releaseReaders(tabletReservedReaders, false);
 +      tabletReservedReaders.clear();
 +      
 +      for (FileDataSource fds : dataSources)
 +        fds.unsetIterator();
 +    }
 +    
 +    synchronized void reattach() throws IOException {
 +      if (tabletReservedReaders.size() != 0)
 +        throw new IllegalStateException();
 +      
 +      Collection<String> files = new ArrayList<String>();
 +      for (FileDataSource fds : dataSources)
 +        files.add(fds.file);
 +      
 +      List<FileSKVIterator> newlyReservedReaders = openFiles(files);
 +      Map<String,List<FileSKVIterator>> map = new HashMap<String,List<FileSKVIterator>>();
 +      for (FileSKVIterator reader : newlyReservedReaders) {
 +        String fileName = getReservedReadeFilename(reader);
 +        List<FileSKVIterator> list = map.get(fileName);
 +        if (list == null) {
 +          list = new LinkedList<FileSKVIterator>();
 +          map.put(fileName, list);
 +        }
 +        
 +        list.add(reader);
 +      }
 +      
 +      for (FileDataSource fds : dataSources) {
 +        FileSKVIterator reader = map.get(fds.file).remove(0);
 +        fds.setIterator(reader);
 +      }
 +    }
 +    
 +    synchronized void releaseOpenFiles(boolean sawIOException) {
 +      releaseReaders(tabletReservedReaders, sawIOException);
 +      tabletReservedReaders.clear();
 +      dataSources.clear();
 +    }
 +    
 +    synchronized int getNumOpenFiles() {
 +      return tabletReservedReaders.size();
 +    }
 +  }
 +  
 +  public ScanFileManager newScanFileManager(KeyExtent tablet) {
 +    return new ScanFileManager(tablet);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d54e0fd8/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 5f6d9ce,0000000..2e15767
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@@ -1,772 -1,0 +1,783 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.ColumnUpdate;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.file.rfile.RFile;
 +import org.apache.accumulo.core.file.rfile.RFileOperations;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SkippingIterator;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.SortedMapIterator;
 +import org.apache.accumulo.core.iterators.WrappingIterator;
 +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
 +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.LocalityGroupUtil;
 +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.commons.lang.mutable.MutableLong;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +class MemKeyComparator implements Comparator<Key>, Serializable {
 +  
 +  private static final long serialVersionUID = 1L;
 +
 +  @Override
 +  public int compare(Key k1, Key k2) {
 +    int cmp = k1.compareTo(k2);
 +    
 +    if (cmp == 0) {
 +      if (k1 instanceof MemKey)
 +        if (k2 instanceof MemKey)
 +          cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
 +        else
 +          cmp = 1;
 +      else if (k2 instanceof MemKey)
 +        cmp = -1;
 +    }
 +    
 +    return cmp;
 +  }
 +}
 +
 +class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
 +  
 +  int kvCount;
 +  
 +  public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
 +    setSource(source);
 +    this.kvCount = maxKVCount;
 +  }
 +  
 +  @Override
 +  protected void consume() throws IOException {
 +    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
 +      getSource().next();
 +  }
 +  
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
 +  }
 +  
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +  }
 +  
 +}
 +
 +class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator {
 +  MemKey currKey = null;
 +  Value currVal = null;
 +
 +  public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
 +    super();
 +    setSource(source);
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new MemKeyConversionIterator(getSource().deepCopy(env));
 +  }
 +  
 +  @Override
 +  public Key getTopKey() {
 +    return currKey;
 +  }
 +  
 +  @Override
 +  public Value getTopValue() {
 +    return currVal;
 +  }
 +  
 +  private void getTopKeyVal() {
 +    Key k = super.getTopKey();
 +    Value v = super.getTopValue();
 +    if (k instanceof MemKey || k == null) {
 +      currKey = (MemKey) k;
 +      currVal = v;
 +      return;
 +    }
 +    currVal = new Value(v);
 +    int mc = MemValue.splitKVCount(currVal);
 +    currKey = new MemKey(k, mc);
 +
 +  }
 +  
 +  public void next() throws IOException {
 +    super.next();
 +    if (hasTop())
 +      getTopKeyVal();
 +  }
 +
 +  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
 +    super.seek(range, columnFamilies, inclusive);
 +    
 +    if (hasTop())
 +      getTopKeyVal();
 +
 +    Key k = range.getStartKey();
 +    if (k instanceof MemKey && hasTop()) {
 +      while (hasTop() && currKey.compareTo(k) < 0)
 +        next();
 +    }
 +  }
 +
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +  }
 +
 +}
 +
 +public class InMemoryMap {
 +  private SimpleMap map = null;
 +  
 +  private static final Logger log = Logger.getLogger(InMemoryMap.class);
 +  
 +  private volatile String memDumpFile = null;
 +  private final String memDumpDir;
 +
 +  private Map<String,Set<ByteSequence>> lggroups;
 +  
 +  public InMemoryMap(boolean useNativeMap, String memDumpDir) {
 +    this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
 +  }
 +
 +  public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
 +    this.memDumpDir = memDumpDir;
 +    this.lggroups = lggroups;
 +    
 +    if (lggroups.size() == 0)
 +      map = newMap(useNativeMap);
 +    else
 +      map = new LocalityGroupMap(lggroups, useNativeMap);
 +  }
 +  
 +  public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
 +    this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
 +  }
 +  
 +  private static SimpleMap newMap(boolean useNativeMap) {
 +    if (useNativeMap && NativeMap.isLoaded()) {
 +      try {
 +        return new NativeMapWrapper();
 +      } catch (Throwable t) {
 +        log.error("Failed to create native map", t);
 +      }
 +    }
 +    
 +    return new DefaultMap();
 +  }
 +  
 +  private interface SimpleMap {
 +    Value get(Key key);
 +    
 +    Iterator<Entry<Key,Value>> iterator(Key startKey);
 +    
 +    int size();
 +    
 +    InterruptibleIterator skvIterator();
 +    
 +    void delete();
 +    
 +    long getMemoryUsed();
 +    
 +    void mutate(List<Mutation> mutations, int kvCount);
 +  }
 +  
 +  private static class LocalityGroupMap implements SimpleMap {
 +    
 +    private Map<ByteSequence,MutableLong> groupFams[];
 +    
 +    // the last map in the array is the default locality group
 +    private SimpleMap maps[];
 +    private Partitioner partitioner;
 +    private List<Mutation>[] partitioned;
 +    private Set<ByteSequence> nonDefaultColumnFamilies;
 +    
 +    @SuppressWarnings("unchecked")
 +    LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
 +      this.groupFams = new Map[groups.size()];
 +      this.maps = new SimpleMap[groups.size() + 1];
 +      this.partitioned = new List[groups.size() + 1];
 +      this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
 +      
 +      for (int i = 0; i < maps.length; i++) {
 +        maps[i] = newMap(useNativeMap);
 +      }
 +
 +      int count = 0;
 +      for (Set<ByteSequence> cfset : groups.values()) {
 +        HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
 +        for (ByteSequence bs : cfset)
 +          map.put(bs, new MutableLong(1));
 +        this.groupFams[count++] = map;
 +        nonDefaultColumnFamilies.addAll(cfset);
 +      }
 +      
 +      partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
 +      
 +      for (int i = 0; i < partitioned.length; i++) {
 +        partitioned[i] = new ArrayList<Mutation>();
 +      }
 +    }
 +
 +    @Override
 +    public Value get(Key key) {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public int size() {
 +      int sum = 0;
 +      for (SimpleMap map : maps)
 +        sum += map.size();
 +      return sum;
 +    }
 +    
 +    @Override
 +    public InterruptibleIterator skvIterator() {
 +      LocalityGroup groups[] = new LocalityGroup[maps.length];
 +      for (int i = 0; i < groups.length; i++) {
 +        if (i < groupFams.length)
 +          groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
 +        else
 +          groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
 +      }
 +
 +
 +      return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
 +    }
 +    
 +    @Override
 +    public void delete() {
 +      for (SimpleMap map : maps)
 +        map.delete();
 +    }
 +    
 +    @Override
 +    public long getMemoryUsed() {
 +      long sum = 0;
 +      for (SimpleMap map : maps)
 +        sum += map.getMemoryUsed();
 +      return sum;
 +    }
 +    
 +    @Override
 +    public synchronized void mutate(List<Mutation> mutations, int kvCount) {
 +      // this method is synchronized because it reuses objects to avoid allocation,
 +      // currently, the method that calls this is synchronized so there is no
 +      // loss in parallelism.... synchronization was added here for future proofing
 +      
 +      try{
 +        partitioner.partition(mutations, partitioned);
 +        
 +        for (int i = 0; i < partitioned.length; i++) {
 +          if (partitioned[i].size() > 0) {
 +            maps[i].mutate(partitioned[i], kvCount);
 +            for (Mutation m : partitioned[i])
 +              kvCount += m.getUpdates().size();
 +          }
 +        }
 +      } finally {
 +        // clear immediately so mutations can be garbage collected
 +        for (List<Mutation> list : partitioned) {
 +          list.clear();
 +        }
 +      }
 +    }
 +    
 +  }
 +
 +  private static class DefaultMap implements SimpleMap {
 +    private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
 +    private AtomicLong bytesInMemory = new AtomicLong();
 +    private AtomicInteger size = new AtomicInteger();
 +    
 +    public void put(Key key, Value value) {
 +      // Always a MemKey, so account for the kvCount int
 +      bytesInMemory.addAndGet(key.getLength() + 4);
 +      bytesInMemory.addAndGet(value.getSize());
 +      if (map.put(key, value) == null)
 +        size.incrementAndGet();
 +    }
 +    
 +    public Value get(Key key) {
 +      return map.get(key);
 +    }
 +    
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      Key lk = new Key(startKey);
 +      SortedMap<Key,Value> tm = map.tailMap(lk);
 +      return tm.entrySet().iterator();
 +    }
 +    
 +    public int size() {
 +      return size.get();
 +    }
 +    
 +    public synchronized InterruptibleIterator skvIterator() {
 +      if (map == null)
 +        throw new IllegalStateException();
 +      
 +      return new SortedMapIterator(map);
 +    }
 +    
 +    public synchronized void delete() {
 +      map = null;
 +    }
 +    
 +    public long getOverheadPerEntry() {
 +      // all of the java objects that are used to hold the
 +      // data and make it searchable have overhead... this
 +      // overhead is estimated using test.EstimateInMemMapOverhead
 +      // and is in bytes.. the estimates were obtained by running
 +      // java 6_16 in 64 bit server mode
 +      
 +      return 200;
 +    }
 +    
 +    @Override
 +    public void mutate(List<Mutation> mutations, int kvCount) {
 +      for (Mutation m : mutations) {
 +        for (ColumnUpdate cvp : m.getUpdates()) {
 +          Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
 +              false, kvCount++);
 +          Value value = new Value(cvp.getValue());
 +          put(newKey, value);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public long getMemoryUsed() {
 +      return bytesInMemory.get() + (size() * getOverheadPerEntry());
 +    }
 +  }
 +  
 +  private static class NativeMapWrapper implements SimpleMap {
 +    private NativeMap nativeMap;
 +    
 +    NativeMapWrapper() {
 +      nativeMap = new NativeMap();
 +    }
 +    
 +    public Value get(Key key) {
 +      return nativeMap.get(key);
 +    }
 +    
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      return nativeMap.iterator(startKey);
 +    }
 +    
 +    public int size() {
 +      return nativeMap.size();
 +    }
 +    
 +    public InterruptibleIterator skvIterator() {
 +      return (InterruptibleIterator) nativeMap.skvIterator();
 +    }
 +    
 +    public void delete() {
 +      nativeMap.delete();
 +    }
 +    
 +    public long getMemoryUsed() {
 +      return nativeMap.getMemoryUsed();
 +    }
 +    
 +    @Override
 +    public void mutate(List<Mutation> mutations, int kvCount) {
 +      nativeMap.mutate(mutations, kvCount);
 +    }
 +  }
 +  
 +  private AtomicInteger nextKVCount = new AtomicInteger(1);
 +  private AtomicInteger kvCount = new AtomicInteger(0);
 +
 +  private Object writeSerializer = new Object();
 +  
 +  /**
 +   * Applies changes to a row in the InMemoryMap
 +   * 
 +   */
 +  public void mutate(List<Mutation> mutations) {
 +    int numKVs = 0;
 +    for (int i = 0; i < mutations.size(); i++)
 +      numKVs += mutations.get(i).size();
 +    
 +    // Can not update mutationCount while writes that started before
 +    // are in progress, this would cause partial mutations to be seen.
 +    // Also, can not continue until mutation count is updated, because
 +    // a read may not see a successful write. Therefore writes must
 +    // wait for writes that started before to finish.
 +    //
 +    // using separate lock from this map, to allow read/write in parallel
 +    synchronized (writeSerializer ) {
 +      int kv = nextKVCount.getAndAdd(numKVs);
 +      try {
 +        map.mutate(mutations, kv);
 +      } finally {
 +        kvCount.set(kv + numKVs - 1);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Returns a long representing the size of the InMemoryMap
 +   * 
 +   * @return bytesInMemory
 +   */
 +  public synchronized long estimatedSizeInBytes() {
 +    if (map == null)
 +      return 0;
 +    
 +    return map.getMemoryUsed();
 +  }
 +  
 +  Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
 +    return map.iterator(startKey);
 +  }
 +  
 +  public long getNumEntries() {
 +    return map.size();
 +  }
 +  
 +  private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>());
 +  
 +  class MemoryDataSource implements DataSource {
 +    
 +    boolean switched = false;
 +    private InterruptibleIterator iter;
 +    private FileSKVIterator reader;
 +    private MemoryDataSource parent;
 +    private IteratorEnvironment env;
++    private AtomicBoolean iflag;
 +    
 +    MemoryDataSource() {
-       this(null, false, null);
++      this(null, false, null, null);
 +    }
 +    
-     public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env) {
++    public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag) {
 +      this.parent = parent;
 +      this.switched = switched;
 +      this.env = env;
++      this.iflag = iflag;
 +    }
 +    
 +    @Override
 +    public boolean isCurrent() {
 +      if (switched)
 +        return true;
 +      else
 +        return memDumpFile == null;
 +    }
 +    
 +    @Override
 +    public DataSource getNewDataSource() {
 +      if (switched)
 +        throw new IllegalStateException();
 +      
 +      if (!isCurrent()) {
 +        switched = true;
 +        iter = null;
 +        try {
 +          // ensure files are referenced even if iterator was never seeked before
 +          iterator();
 +        } catch (IOException e) {
 +          throw new RuntimeException();
 +        }
 +      }
 +      
 +      return this;
 +    }
 +    
 +    private synchronized FileSKVIterator getReader() throws IOException {
 +      if (reader == null) {
 +        Configuration conf = CachedConfiguration.getInstance();
 +        FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
 +        
 +        reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
++        if (iflag != null)
++          reader.setInterruptFlag(iflag);
 +      }
 +
 +      return reader;
 +    }
 +
 +    @Override
 +    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
 +      if (iter == null)
-         if (!switched)
++        if (!switched) {
 +          iter = map.skvIterator();
-         else {
++          if (iflag != null)
++            iter.setInterruptFlag(iflag);
++        } else {
 +          if (parent == null)
 +            iter = new MemKeyConversionIterator(getReader());
 +          else
 +            synchronized (parent) {
 +              // synchronize deep copy operation on parent, this prevents multiple threads from deep copying the rfile shared from parent its possible that the
 +              // thread deleting an InMemoryMap and scan thread could be switching different deep copies
 +              iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env));
 +            }
 +        }
 +      
 +      return iter;
 +    }
 +    
 +    @Override
 +    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
-       return new MemoryDataSource(parent == null ? this : parent, switched, env);
++      return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag);
++    }
++
++    @Override
++    public void setInterruptFlag(AtomicBoolean flag) {
++      this.iflag = flag;
 +    }
 +    
 +  }
 +  
 +  class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
 +    
 +    private AtomicBoolean closed;
 +    private SourceSwitchingIterator ssi;
 +    private MemoryDataSource mds;
 +    
 +    protected SortedKeyValueIterator<Key,Value> getSource() {
 +      if (closed.get())
 +        throw new IllegalStateException("Memory iterator is closed");
 +      return super.getSource();
 +    }
 +    
 +    private MemoryIterator(InterruptibleIterator source) {
 +      this(source, new AtomicBoolean(false));
 +    }
 +    
 +    private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) {
 +      setSource(source);
 +      this.closed = closed;
 +    }
 +    
 +    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +      return new MemoryIterator(getSource().deepCopy(env), closed);
 +    }
 +    
 +    public void close() {
 +      
 +      synchronized (this) {
 +        if (closed.compareAndSet(false, true)) {
 +          try {
 +            if (mds.reader != null)
 +              mds.reader.close();
 +          } catch (IOException e) {
 +            log.warn(e, e);
 +          }
 +        }
 +      }
 +      
 +      // remove outside of sync to avoid deadlock
 +      activeIters.remove(this);
 +    }
 +    
 +    private synchronized boolean switchNow() throws IOException {
 +      if (closed.get())
 +        return false;
 +      
 +      ssi.switchNow();
 +      return true;
 +    }
 +    
 +    @Override
 +    public void setInterruptFlag(AtomicBoolean flag) {
 +      ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +    }
 +    
 +    private void setSSI(SourceSwitchingIterator ssi) {
 +      this.ssi = ssi;
 +    }
 +    
 +    public void setMDS(MemoryDataSource mds) {
 +      this.mds = mds;
 +    }
 +    
 +  }
 +  
 +  public synchronized MemoryIterator skvIterator() {
 +    if (map == null)
 +      throw new NullPointerException();
 +    
 +    if (deleted)
 +      throw new IllegalStateException("Can not obtain iterator after map deleted");
 +    
 +    int mc = kvCount.get();
 +    MemoryDataSource mds = new MemoryDataSource();
 +    SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
 +    MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc));
 +    mi.setSSI(ssi);
 +    mi.setMDS(mds);
 +    activeIters.add(mi);
 +    return mi;
 +  }
 +  
 +  public SortedKeyValueIterator<Key,Value> compactionIterator() {
 +    
 +    if (nextKVCount.get() - 1 != kvCount.get())
 +      throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = "
 +          + kvCount.get());
 +    
 +    return map.skvIterator();
 +  }
 +  
 +  private boolean deleted = false;
 +  
 +  public void delete(long waitTime) {
 +    
 +    synchronized (this) {
 +      if (deleted)
 +        throw new IllegalStateException("Double delete");
 +      
 +      deleted = true;
 +    }
 +    
 +    long t1 = System.currentTimeMillis();
 +    
 +    while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) {
 +      UtilWaitThread.sleep(50);
 +    }
 +    
 +    if (activeIters.size() > 0) {
 +      // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file
 +      try {
 +        Configuration conf = CachedConfiguration.getInstance();
 +        FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
 +        
 +        String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
 +        
 +        Configuration newConf = new Configuration(conf);
 +        newConf.setInt("io.seqfile.compress.blocksize", 100000);
 +        
 +        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
 +        
 +        InterruptibleIterator iter = map.skvIterator();
 +       
 +        HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
 +        
 +        for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
 +          allfams.addAll(entry.getValue());
 +          out.startNewLocalityGroup(entry.getKey(), entry.getValue());
 +          iter.seek(new Range(), entry.getValue(), true);
 +          dumpLocalityGroup(out, iter);
 +        }
 +        
 +        out.startDefaultLocalityGroup();
 +        iter.seek(new Range(), allfams, false);
 +       
 +        dumpLocalityGroup(out, iter);
 +        
 +        out.close();
 +        
 +        log.debug("Created mem dump file " + tmpFile);
 +        
 +        memDumpFile = tmpFile;
 +        
 +        synchronized (activeIters) {
 +          for (MemoryIterator mi : activeIters) {
 +            mi.switchNow();
 +          }
 +        }
 +        
 +        // rely on unix behavior that file will be deleted when last
 +        // reader closes it
 +        fs.delete(new Path(memDumpFile), true);
 +        
 +      } catch (IOException ioe) {
 +        log.error("Failed to create mem dump file ", ioe);
 +        
 +        while (activeIters.size() > 0) {
 +          UtilWaitThread.sleep(100);
 +        }
 +      }
 +      
 +    }
 +    
 +    SimpleMap tmpMap = map;
 +    
 +    synchronized (this) {
 +      map = null;
 +    }
 +    
 +    tmpMap.delete();
 +  }
 +
 +  private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
 +    while (iter.hasTop() && activeIters.size() > 0) {
 +      // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
 +      // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
 +      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
 +      out.append(iter.getTopKey(), newValue);
 +      iter.next();
 +
 +    }
 +  }
 +}