You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/12 09:46:35 UTC
[06/10] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Conflicts:
server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d54e0fd8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d54e0fd8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d54e0fd8
Branch: refs/heads/master
Commit: d54e0fd8636405b39a982a6fad5a3fca1593d6cf
Parents: 0d76cd5 7699e1f
Author: Josh Elser <el...@apache.org>
Authored: Thu Sep 11 17:42:01 2014 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Sep 11 17:42:01 2014 -0700
----------------------------------------------------------------------
.../system/SourceSwitchingIterator.java | 20 ++++------
.../system/SourceSwitchingIteratorTest.java | 38 +++++++++++++++++-
.../apache/accumulo/tserver/FileManager.java | 13 +++++++
.../apache/accumulo/tserver/InMemoryMap.java | 21 +++++++---
.../org/apache/accumulo/tserver/Tablet.java | 5 +++
.../accumulo/tserver/InMemoryMapTest.java | 41 ++++++++++++++++----
6 files changed, 112 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d54e0fd8/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 8bf2517,0000000..b82b9cc
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@@ -1,562 -1,0 +1,575 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Semaphore;
++import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReportingIterator;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class FileManager {
+
+ private static final Logger log = Logger.getLogger(FileManager.class);
+
+ int maxOpen;
+
+ private static class OpenReader implements Comparable<OpenReader> {
+ long releaseTime;
+ FileSKVIterator reader;
+ String fileName;
+
+ public OpenReader(String fileName, FileSKVIterator reader) {
+ this.fileName = fileName;
+ this.reader = reader;
+ this.releaseTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public int compareTo(OpenReader o) {
+ if (releaseTime < o.releaseTime) {
+ return -1;
+ } else if (releaseTime > o.releaseTime) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof OpenReader) {
+ return compareTo((OpenReader) obj) == 0;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return fileName.hashCode();
+ }
+ }
+
+ private Map<String,List<OpenReader>> openFiles;
+ private HashMap<FileSKVIterator,String> reservedReaders;
+
+ private Semaphore filePermits;
+
+ private VolumeManager fs;
+
+ // the data cache and index cache are allocated in
+ // TabletResourceManager and passed through the file opener to
+ // CachableBlockFile which can handle the caches being
+ // null if unallocated
+ private BlockCache dataCache = null;
+ private BlockCache indexCache = null;
+
+ private long maxIdleTime;
+
+ private final ServerConfiguration conf;
+
+ private class IdleFileCloser implements Runnable {
+
+ @Override
+ public void run() {
+
+ long curTime = System.currentTimeMillis();
+
+ ArrayList<FileSKVIterator> filesToClose = new ArrayList<FileSKVIterator>();
+
+ // determine which files to close in a sync block, and then close the
+ // files outside of the sync block
+ synchronized (FileManager.this) {
+ Iterator<Entry<String,List<OpenReader>>> iter = openFiles.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<String,List<OpenReader>> entry = iter.next();
+ List<OpenReader> ofl = entry.getValue();
+
+ for (Iterator<OpenReader> oflIter = ofl.iterator(); oflIter.hasNext();) {
+ OpenReader openReader = oflIter.next();
+
+ if (curTime - openReader.releaseTime > maxIdleTime) {
+
+ filesToClose.add(openReader.reader);
+ oflIter.remove();
+ }
+ }
+
+ if (ofl.size() == 0) {
+ iter.remove();
+ }
+ }
+ }
+
+ closeReaders(filesToClose);
+
+ }
+
+ }
+
+ /**
+ *
+ * @param dataCache
+ * : underlying file can and should be able to handle a null cache
+ * @param indexCache
+ * : underlying file can and should be able to handle a null cache
+ */
+ FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
+
+ if (maxOpen <= 0)
+ throw new IllegalArgumentException("maxOpen <= 0");
+ this.conf = conf;
+ this.dataCache = dataCache;
+ this.indexCache = indexCache;
+
+ this.filePermits = new Semaphore(maxOpen, true);
+ this.maxOpen = maxOpen;
+ this.fs = fs;
+
+ this.openFiles = new HashMap<String,List<OpenReader>>();
+ this.reservedReaders = new HashMap<FileSKVIterator,String>();
+
+ this.maxIdleTime = conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
+ SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2);
+
+ }
+
+ private static int countReaders(Map<String,List<OpenReader>> files) {
+ int count = 0;
+
+ for (List<OpenReader> list : files.values()) {
+ count += list.size();
+ }
+
+ return count;
+ }
+
+ private List<FileSKVIterator> takeLRUOpenFiles(int numToTake) {
+
+ ArrayList<OpenReader> openReaders = new ArrayList<OpenReader>();
+
+ for (Entry<String,List<OpenReader>> entry : openFiles.entrySet()) {
+ openReaders.addAll(entry.getValue());
+ }
+
+ Collections.sort(openReaders);
+
+ ArrayList<FileSKVIterator> ret = new ArrayList<FileSKVIterator>();
+
+ for (int i = 0; i < numToTake; i++) {
+ OpenReader or = openReaders.get(i);
+
+ List<OpenReader> ofl = openFiles.get(or.fileName);
+ if (!ofl.remove(or)) {
+ throw new RuntimeException("Failed to remove open reader that should have been there");
+ }
+
+ if (ofl.size() == 0) {
+ openFiles.remove(or.fileName);
+ }
+
+ ret.add(or.reader);
+ }
+
+ return ret;
+ }
+
+ private static <T> List<T> getFileList(String file, Map<String,List<T>> files) {
+ List<T> ofl = files.get(file);
+ if (ofl == null) {
+ ofl = new ArrayList<T>();
+ files.put(file, ofl);
+ }
+
+ return ofl;
+ }
+
+ private void closeReaders(List<FileSKVIterator> filesToClose) {
+ for (FileSKVIterator reader : filesToClose) {
+ try {
+ reader.close();
+ } catch (Exception e) {
+ log.error("Failed to close file " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private List<String> takeOpenFiles(Collection<String> files, List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String> readersReserved) {
+ List<String> filesToOpen = new LinkedList<String>(files);
+ for (Iterator<String> iterator = filesToOpen.iterator(); iterator.hasNext();) {
+ String file = iterator.next();
+
+ List<OpenReader> ofl = openFiles.get(file);
+ if (ofl != null && ofl.size() > 0) {
+ OpenReader openReader = ofl.remove(ofl.size() - 1);
+ reservedFiles.add(openReader.reader);
+ readersReserved.put(openReader.reader, file);
+ if (ofl.size() == 0) {
+ openFiles.remove(file);
+ }
+ iterator.remove();
+ }
+
+ }
+ return filesToOpen;
+ }
+
+ private synchronized String getReservedReadeFilename(FileSKVIterator reader) {
+ return reservedReaders.get(reader);
+ }
+
+ private List<FileSKVIterator> reserveReaders(Text table, Collection<String> files, boolean continueOnFailure) throws IOException {
+
+ if (files.size() >= maxOpen) {
+ throw new IllegalArgumentException("requested files exceeds max open");
+ }
+
+ if (files.size() == 0) {
+ return Collections.emptyList();
+ }
+
+ List<String> filesToOpen = null;
+ List<FileSKVIterator> filesToClose = Collections.emptyList();
+ List<FileSKVIterator> reservedFiles = new ArrayList<FileSKVIterator>();
+ Map<FileSKVIterator,String> readersReserved = new HashMap<FileSKVIterator,String>();
+
+ filePermits.acquireUninterruptibly(files.size());
+
+ // now that the we are past the semaphore, we have the authority
+ // to open files.size() files
+
+ // determine what work needs to be done in sync block
+ // but do the work of opening and closing files outside
+ // a synch block
+ synchronized (this) {
+
+ filesToOpen = takeOpenFiles(files, reservedFiles, readersReserved);
+
+ int numOpen = countReaders(openFiles);
+
+ if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) {
+ filesToClose = takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen);
+ }
+ }
+
+ // close files before opening files to ensure we stay under resource
+ // limitations
+ closeReaders(filesToClose);
+
+ // open any files that need to be opened
+ for (String file : filesToOpen) {
+ try {
+ if (!file.contains(":"))
+ throw new IllegalArgumentException("Expected uri, got : " + file);
+ Path path = new Path(file);
+ FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
+ //log.debug("Opening "+file + " path " + path);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
+ dataCache, indexCache);
+ reservedFiles.add(reader);
+ readersReserved.put(reader, file);
+ } catch (Exception e) {
+
+ ProblemReports.getInstance().report(new ProblemReport(table.toString(), ProblemType.FILE_READ, file, e));
+
+ if (continueOnFailure) {
+ // release the permit for the file that failed to open
+ filePermits.release(1);
+ log.warn("Failed to open file " + file + " " + e.getMessage() + " continuing...");
+ } else {
+ // close whatever files were opened
+ closeReaders(reservedFiles);
+
+ filePermits.release(files.size());
+
+ log.error("Failed to open file " + file + " " + e.getMessage());
+ throw new IOException("Failed to open " + file, e);
+ }
+ }
+ }
+
+ synchronized (this) {
+ // update set of reserved readers
+ reservedReaders.putAll(readersReserved);
+ }
+
+ return reservedFiles;
+ }
+
+ private void releaseReaders(List<FileSKVIterator> readers, boolean sawIOException) {
+ // put files in openFiles
+
+ synchronized (this) {
+
+ // check that readers were actually reserved ... want to make sure a thread does
+ // not try to release readers they never reserved
+ if (!reservedReaders.keySet().containsAll(readers)) {
+ throw new IllegalArgumentException("Asked to release readers that were never reserved ");
+ }
+
+ for (FileSKVIterator reader : readers) {
+ try {
+ reader.closeDeepCopies();
+ } catch (IOException e) {
+ log.warn(e, e);
+ sawIOException = true;
+ }
+ }
+
+ for (FileSKVIterator reader : readers) {
+ String fileName = reservedReaders.remove(reader);
+ if (!sawIOException)
+ getFileList(fileName, openFiles).add(new OpenReader(fileName, reader));
+ }
+ }
+
+ if (sawIOException)
+ closeReaders(readers);
+
+ // decrement the semaphore
+ filePermits.release(readers.size());
+
+ }
+
+ static class FileDataSource implements DataSource {
+
+ private SortedKeyValueIterator<Key,Value> iter;
+ private ArrayList<FileDataSource> deepCopies;
+ private boolean current = true;
+ private IteratorEnvironment env;
+ private String file;
++ private AtomicBoolean iflag;
+
+ FileDataSource(String file, SortedKeyValueIterator<Key,Value> iter) {
+ this.file = file;
+ this.iter = iter;
+ this.deepCopies = new ArrayList<FileManager.FileDataSource>();
+ }
+
+ public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator<Key,Value> deepCopy, ArrayList<FileDataSource> deepCopies) {
+ this.iter = deepCopy;
+ this.env = env;
+ this.deepCopies = deepCopies;
+ deepCopies.add(this);
+ }
+
+ @Override
+ public boolean isCurrent() {
+ return current;
+ }
+
+ @Override
+ public DataSource getNewDataSource() {
+ current = true;
+ return this;
+ }
+
+ @Override
+ public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
+ return new FileDataSource(env, iter.deepCopy(env), deepCopies);
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
++ if (iflag != null)
++ ((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
+ return iter;
+ }
+
+ void unsetIterator() {
+ current = false;
+ iter = null;
+ for (FileDataSource fds : deepCopies) {
+ fds.current = false;
+ fds.iter = null;
+ }
+ }
+
+ void setIterator(SortedKeyValueIterator<Key,Value> iter) {
+ current = false;
+ this.iter = iter;
++
++ if (iflag != null)
++ ((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
++
+ for (FileDataSource fds : deepCopies) {
+ fds.current = false;
+ fds.iter = iter.deepCopy(fds.env);
+ }
+ }
++
++ @Override
++ public void setInterruptFlag(AtomicBoolean flag) {
++ this.iflag = flag;
++ }
+
+ }
+
+ public class ScanFileManager {
+
+ private ArrayList<FileDataSource> dataSources;
+ private ArrayList<FileSKVIterator> tabletReservedReaders;
+ private KeyExtent tablet;
+ private boolean continueOnFailure;
+
+ ScanFileManager(KeyExtent tablet) {
+ tabletReservedReaders = new ArrayList<FileSKVIterator>();
+ dataSources = new ArrayList<FileDataSource>();
+ this.tablet = tablet;
+
+ continueOnFailure = conf.getTableConfiguration(tablet).getBoolean(Property.TABLE_FAILURES_IGNORE);
+
+ if (tablet.isMeta()) {
+ continueOnFailure = false;
+ }
+ }
+
+ private List<FileSKVIterator> openFileRefs(Collection<FileRef> files) throws TooManyFilesException, IOException {
+ List<String> strings = new ArrayList<String>(files.size());
+ for (FileRef ref : files)
+ strings.add(ref.path().toString());
+ return openFiles(strings);
+ }
+
+ private List<FileSKVIterator> openFiles(Collection<String> files) throws TooManyFilesException, IOException {
+ // one tablet can not open more than maxOpen files, otherwise it could get stuck
+ // forever waiting on itself to release files
+
+ if (tabletReservedReaders.size() + files.size() >= maxOpen) {
+ throw new TooManyFilesException("Request to open files would exceed max open files reservedReaders.size()=" + tabletReservedReaders.size()
+ + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " tablet = " + tablet);
+ }
+
+ List<FileSKVIterator> newlyReservedReaders = reserveReaders(tablet.getTableId(), files, continueOnFailure);
+
+ tabletReservedReaders.addAll(newlyReservedReaders);
+ return newlyReservedReaders;
+ }
+
+ synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
+
+ List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
+
+ ArrayList<InterruptibleIterator> iters = new ArrayList<InterruptibleIterator>();
+
+ for (FileSKVIterator reader : newlyReservedReaders) {
+ String filename = getReservedReadeFilename(reader);
+ InterruptibleIterator iter;
+ if (detachable) {
+ FileDataSource fds = new FileDataSource(filename, reader);
+ dataSources.add(fds);
+ SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
+ iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, ssi);
+ } else {
+ iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader);
+ }
+ DataFileValue value = files.get(new FileRef(filename));
+ if (value.isTimeSet()) {
+ iter = new TimeSettingIterator(iter, value.getTime());
+ }
+
+ iters.add(iter);
+ }
+
+ return iters;
+ }
+
+ synchronized void detach() {
+
+ releaseReaders(tabletReservedReaders, false);
+ tabletReservedReaders.clear();
+
+ for (FileDataSource fds : dataSources)
+ fds.unsetIterator();
+ }
+
+ synchronized void reattach() throws IOException {
+ if (tabletReservedReaders.size() != 0)
+ throw new IllegalStateException();
+
+ Collection<String> files = new ArrayList<String>();
+ for (FileDataSource fds : dataSources)
+ files.add(fds.file);
+
+ List<FileSKVIterator> newlyReservedReaders = openFiles(files);
+ Map<String,List<FileSKVIterator>> map = new HashMap<String,List<FileSKVIterator>>();
+ for (FileSKVIterator reader : newlyReservedReaders) {
+ String fileName = getReservedReadeFilename(reader);
+ List<FileSKVIterator> list = map.get(fileName);
+ if (list == null) {
+ list = new LinkedList<FileSKVIterator>();
+ map.put(fileName, list);
+ }
+
+ list.add(reader);
+ }
+
+ for (FileDataSource fds : dataSources) {
+ FileSKVIterator reader = map.get(fds.file).remove(0);
+ fds.setIterator(reader);
+ }
+ }
+
+ synchronized void releaseOpenFiles(boolean sawIOException) {
+ releaseReaders(tabletReservedReaders, sawIOException);
+ tabletReservedReaders.clear();
+ dataSources.clear();
+ }
+
+ synchronized int getNumOpenFiles() {
+ return tabletReservedReaders.size();
+ }
+ }
+
+ public ScanFileManager newScanFileManager(KeyExtent tablet) {
+ return new ScanFileManager(tablet);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d54e0fd8/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 5f6d9ce,0000000..2e15767
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@@ -1,772 -1,0 +1,783 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+class MemKeyComparator implements Comparator<Key>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int compare(Key k1, Key k2) {
+ int cmp = k1.compareTo(k2);
+
+ if (cmp == 0) {
+ if (k1 instanceof MemKey)
+ if (k2 instanceof MemKey)
+ cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
+ else
+ cmp = 1;
+ else if (k2 instanceof MemKey)
+ cmp = -1;
+ }
+
+ return cmp;
+ }
+}
+
+class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
+
+ int kvCount;
+
+ public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
+ setSource(source);
+ this.kvCount = maxKVCount;
+ }
+
+ @Override
+ protected void consume() throws IOException {
+ while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
+ getSource().next();
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+ }
+
+}
+
+class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator {
+ MemKey currKey = null;
+ Value currVal = null;
+
+ public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
+ super();
+ setSource(source);
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new MemKeyConversionIterator(getSource().deepCopy(env));
+ }
+
+ @Override
+ public Key getTopKey() {
+ return currKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return currVal;
+ }
+
+ private void getTopKeyVal() {
+ Key k = super.getTopKey();
+ Value v = super.getTopValue();
+ if (k instanceof MemKey || k == null) {
+ currKey = (MemKey) k;
+ currVal = v;
+ return;
+ }
+ currVal = new Value(v);
+ int mc = MemValue.splitKVCount(currVal);
+ currKey = new MemKey(k, mc);
+
+ }
+
+ public void next() throws IOException {
+ super.next();
+ if (hasTop())
+ getTopKeyVal();
+ }
+
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ super.seek(range, columnFamilies, inclusive);
+
+ if (hasTop())
+ getTopKeyVal();
+
+ Key k = range.getStartKey();
+ if (k instanceof MemKey && hasTop()) {
+ while (hasTop() && currKey.compareTo(k) < 0)
+ next();
+ }
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+ }
+
+}
+
+public class InMemoryMap {
+ private SimpleMap map = null;
+
+ private static final Logger log = Logger.getLogger(InMemoryMap.class);
+
+ private volatile String memDumpFile = null;
+ private final String memDumpDir;
+
+ private Map<String,Set<ByteSequence>> lggroups;
+
+ public InMemoryMap(boolean useNativeMap, String memDumpDir) {
+ this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
+ }
+
+ public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
+ this.memDumpDir = memDumpDir;
+ this.lggroups = lggroups;
+
+ if (lggroups.size() == 0)
+ map = newMap(useNativeMap);
+ else
+ map = new LocalityGroupMap(lggroups, useNativeMap);
+ }
+
+ public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
+ this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
+ }
+
+ private static SimpleMap newMap(boolean useNativeMap) {
+ if (useNativeMap && NativeMap.isLoaded()) {
+ try {
+ return new NativeMapWrapper();
+ } catch (Throwable t) {
+ log.error("Failed to create native map", t);
+ }
+ }
+
+ return new DefaultMap();
+ }
+
+ private interface SimpleMap {
+ Value get(Key key);
+
+ Iterator<Entry<Key,Value>> iterator(Key startKey);
+
+ int size();
+
+ InterruptibleIterator skvIterator();
+
+ void delete();
+
+ long getMemoryUsed();
+
+ void mutate(List<Mutation> mutations, int kvCount);
+ }
+
+ private static class LocalityGroupMap implements SimpleMap {
+
+ private Map<ByteSequence,MutableLong> groupFams[];
+
+ // the last map in the array is the default locality group
+ private SimpleMap maps[];
+ private Partitioner partitioner;
+ private List<Mutation>[] partitioned;
+ private Set<ByteSequence> nonDefaultColumnFamilies;
+
+ @SuppressWarnings("unchecked")
+ LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
+ this.groupFams = new Map[groups.size()];
+ this.maps = new SimpleMap[groups.size() + 1];
+ this.partitioned = new List[groups.size() + 1];
+ this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
+
+ for (int i = 0; i < maps.length; i++) {
+ maps[i] = newMap(useNativeMap);
+ }
+
+ int count = 0;
+ for (Set<ByteSequence> cfset : groups.values()) {
+ HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
+ for (ByteSequence bs : cfset)
+ map.put(bs, new MutableLong(1));
+ this.groupFams[count++] = map;
+ nonDefaultColumnFamilies.addAll(cfset);
+ }
+
+ partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
+
+ for (int i = 0; i < partitioned.length; i++) {
+ partitioned[i] = new ArrayList<Mutation>();
+ }
+ }
+
+ @Override
+ public Value get(Key key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ int sum = 0;
+ for (SimpleMap map : maps)
+ sum += map.size();
+ return sum;
+ }
+
+ @Override
+ public InterruptibleIterator skvIterator() {
+ LocalityGroup groups[] = new LocalityGroup[maps.length];
+ for (int i = 0; i < groups.length; i++) {
+ if (i < groupFams.length)
+ groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
+ else
+ groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
+ }
+
+
+ return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
+ }
+
+ @Override
+ public void delete() {
+ for (SimpleMap map : maps)
+ map.delete();
+ }
+
+ @Override
+ public long getMemoryUsed() {
+ long sum = 0;
+ for (SimpleMap map : maps)
+ sum += map.getMemoryUsed();
+ return sum;
+ }
+
+ @Override
+ public synchronized void mutate(List<Mutation> mutations, int kvCount) {
+ // this method is synchronized because it reuses objects to avoid allocation,
+ // currently, the method that calls this is synchronized so there is no
+ // loss in parallelism.... synchronization was added here for future proofing
+
+ try{
+ partitioner.partition(mutations, partitioned);
+
+ for (int i = 0; i < partitioned.length; i++) {
+ if (partitioned[i].size() > 0) {
+ maps[i].mutate(partitioned[i], kvCount);
+ for (Mutation m : partitioned[i])
+ kvCount += m.getUpdates().size();
+ }
+ }
+ } finally {
+ // clear immediately so mutations can be garbage collected
+ for (List<Mutation> list : partitioned) {
+ list.clear();
+ }
+ }
+ }
+
+ }
+
+ private static class DefaultMap implements SimpleMap {
+ private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
+ private AtomicLong bytesInMemory = new AtomicLong();
+ private AtomicInteger size = new AtomicInteger();
+
+ public void put(Key key, Value value) {
+ // Always a MemKey, so account for the kvCount int
+ bytesInMemory.addAndGet(key.getLength() + 4);
+ bytesInMemory.addAndGet(value.getSize());
+ if (map.put(key, value) == null)
+ size.incrementAndGet();
+ }
+
+ public Value get(Key key) {
+ return map.get(key);
+ }
+
+ public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+ Key lk = new Key(startKey);
+ SortedMap<Key,Value> tm = map.tailMap(lk);
+ return tm.entrySet().iterator();
+ }
+
+ public int size() {
+ return size.get();
+ }
+
+ public synchronized InterruptibleIterator skvIterator() {
+ if (map == null)
+ throw new IllegalStateException();
+
+ return new SortedMapIterator(map);
+ }
+
+ public synchronized void delete() {
+ map = null;
+ }
+
+ public long getOverheadPerEntry() {
+ // all of the java objects that are used to hold the
+ // data and make it searchable have overhead... this
+ // overhead is estimated using test.EstimateInMemMapOverhead
+ // and is in bytes.. the estimates were obtained by running
+ // java 6_16 in 64 bit server mode
+
+ return 200;
+ }
+
+ @Override
+ public void mutate(List<Mutation> mutations, int kvCount) {
+ for (Mutation m : mutations) {
+ for (ColumnUpdate cvp : m.getUpdates()) {
+ Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
+ false, kvCount++);
+ Value value = new Value(cvp.getValue());
+ put(newKey, value);
+ }
+ }
+ }
+
+ @Override
+ public long getMemoryUsed() {
+ return bytesInMemory.get() + (size() * getOverheadPerEntry());
+ }
+ }
+
+ private static class NativeMapWrapper implements SimpleMap {
+ private NativeMap nativeMap;
+
+ NativeMapWrapper() {
+ nativeMap = new NativeMap();
+ }
+
+ public Value get(Key key) {
+ return nativeMap.get(key);
+ }
+
+ public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+ return nativeMap.iterator(startKey);
+ }
+
+ public int size() {
+ return nativeMap.size();
+ }
+
+ public InterruptibleIterator skvIterator() {
+ return (InterruptibleIterator) nativeMap.skvIterator();
+ }
+
+ public void delete() {
+ nativeMap.delete();
+ }
+
+ public long getMemoryUsed() {
+ return nativeMap.getMemoryUsed();
+ }
+
+ @Override
+ public void mutate(List<Mutation> mutations, int kvCount) {
+ nativeMap.mutate(mutations, kvCount);
+ }
+ }
+
+ private AtomicInteger nextKVCount = new AtomicInteger(1);
+ private AtomicInteger kvCount = new AtomicInteger(0);
+
+ private Object writeSerializer = new Object();
+
+ /**
+ * Applies changes to a row in the InMemoryMap
+ *
+ */
+ public void mutate(List<Mutation> mutations) {
+ int numKVs = 0;
+ for (int i = 0; i < mutations.size(); i++)
+ numKVs += mutations.get(i).size();
+
+ // Can not update mutationCount while writes that started before
+ // are in progress, this would cause partial mutations to be seen.
+ // Also, can not continue until mutation count is updated, because
+ // a read may not see a successful write. Therefore writes must
+ // wait for writes that started before to finish.
+ //
+ // using separate lock from this map, to allow read/write in parallel
+ synchronized (writeSerializer ) {
+ int kv = nextKVCount.getAndAdd(numKVs);
+ try {
+ map.mutate(mutations, kv);
+ } finally {
+ kvCount.set(kv + numKVs - 1);
+ }
+ }
+ }
+
+ /**
+ * Returns a long representing the size of the InMemoryMap
+ *
+ * @return bytesInMemory
+ */
+ public synchronized long estimatedSizeInBytes() {
+ if (map == null)
+ return 0;
+
+ return map.getMemoryUsed();
+ }
+
+ Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
+ return map.iterator(startKey);
+ }
+
+ public long getNumEntries() {
+ return map.size();
+ }
+
+ private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>());
+
+ class MemoryDataSource implements DataSource {
+
+ boolean switched = false;
+ private InterruptibleIterator iter;
+ private FileSKVIterator reader;
+ private MemoryDataSource parent;
+ private IteratorEnvironment env;
++ private AtomicBoolean iflag;
+
+ MemoryDataSource() {
- this(null, false, null);
++ this(null, false, null, null);
+ }
+
- public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env) {
++ public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag) {
+ this.parent = parent;
+ this.switched = switched;
+ this.env = env;
++ this.iflag = iflag;
+ }
+
+ @Override
+ public boolean isCurrent() {
+ if (switched)
+ return true;
+ else
+ return memDumpFile == null;
+ }
+
+ @Override
+ public DataSource getNewDataSource() {
+ if (switched)
+ throw new IllegalStateException();
+
+ if (!isCurrent()) {
+ switched = true;
+ iter = null;
+ try {
+ // ensure files are referenced even if iterator was never seeked before
+ iterator();
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ return this;
+ }
+
+ private synchronized FileSKVIterator getReader() throws IOException {
+ if (reader == null) {
+ Configuration conf = CachedConfiguration.getInstance();
+ FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+
+ reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
++ if (iflag != null)
++ reader.setInterruptFlag(iflag);
+ }
+
+ return reader;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+ if (iter == null)
- if (!switched)
++ if (!switched) {
+ iter = map.skvIterator();
- else {
++ if (iflag != null)
++ iter.setInterruptFlag(iflag);
++ } else {
+ if (parent == null)
+ iter = new MemKeyConversionIterator(getReader());
+ else
+ synchronized (parent) {
+ // synchronize deep copy operation on parent, this prevents multiple threads from deep copying the rfile shared from parent its possible that the
+ // thread deleting an InMemoryMap and scan thread could be switching different deep copies
+ iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env));
+ }
+ }
+
+ return iter;
+ }
+
+ @Override
+ public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
- return new MemoryDataSource(parent == null ? this : parent, switched, env);
++ return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag);
++ }
++
++ @Override
++ public void setInterruptFlag(AtomicBoolean flag) {
++ this.iflag = flag;
+ }
+
+ }
+
+ class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
+
+ private AtomicBoolean closed;
+ private SourceSwitchingIterator ssi;
+ private MemoryDataSource mds;
+
+ protected SortedKeyValueIterator<Key,Value> getSource() {
+ if (closed.get())
+ throw new IllegalStateException("Memory iterator is closed");
+ return super.getSource();
+ }
+
+ private MemoryIterator(InterruptibleIterator source) {
+ this(source, new AtomicBoolean(false));
+ }
+
+ private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) {
+ setSource(source);
+ this.closed = closed;
+ }
+
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new MemoryIterator(getSource().deepCopy(env), closed);
+ }
+
+ public void close() {
+
+ synchronized (this) {
+ if (closed.compareAndSet(false, true)) {
+ try {
+ if (mds.reader != null)
+ mds.reader.close();
+ } catch (IOException e) {
+ log.warn(e, e);
+ }
+ }
+ }
+
+ // remove outside of sync to avoid deadlock
+ activeIters.remove(this);
+ }
+
+ private synchronized boolean switchNow() throws IOException {
+ if (closed.get())
+ return false;
+
+ ssi.switchNow();
+ return true;
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+ }
+
+ private void setSSI(SourceSwitchingIterator ssi) {
+ this.ssi = ssi;
+ }
+
+ public void setMDS(MemoryDataSource mds) {
+ this.mds = mds;
+ }
+
+ }
+
+ public synchronized MemoryIterator skvIterator() {
+ if (map == null)
+ throw new NullPointerException();
+
+ if (deleted)
+ throw new IllegalStateException("Can not obtain iterator after map deleted");
+
+ int mc = kvCount.get();
+ MemoryDataSource mds = new MemoryDataSource();
+ SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
+ MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc));
+ mi.setSSI(ssi);
+ mi.setMDS(mds);
+ activeIters.add(mi);
+ return mi;
+ }
+
+ public SortedKeyValueIterator<Key,Value> compactionIterator() {
+
+ if (nextKVCount.get() - 1 != kvCount.get())
+ throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = "
+ + kvCount.get());
+
+ return map.skvIterator();
+ }
+
+ private boolean deleted = false;
+
+ public void delete(long waitTime) {
+
+ synchronized (this) {
+ if (deleted)
+ throw new IllegalStateException("Double delete");
+
+ deleted = true;
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) {
+ UtilWaitThread.sleep(50);
+ }
+
+ if (activeIters.size() > 0) {
+ // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file
+ try {
+ Configuration conf = CachedConfiguration.getInstance();
+ FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+
+ String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
+
+ Configuration newConf = new Configuration(conf);
+ newConf.setInt("io.seqfile.compress.blocksize", 100000);
+
+ FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
+
+ InterruptibleIterator iter = map.skvIterator();
+
+ HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
+
+ for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
+ allfams.addAll(entry.getValue());
+ out.startNewLocalityGroup(entry.getKey(), entry.getValue());
+ iter.seek(new Range(), entry.getValue(), true);
+ dumpLocalityGroup(out, iter);
+ }
+
+ out.startDefaultLocalityGroup();
+ iter.seek(new Range(), allfams, false);
+
+ dumpLocalityGroup(out, iter);
+
+ out.close();
+
+ log.debug("Created mem dump file " + tmpFile);
+
+ memDumpFile = tmpFile;
+
+ synchronized (activeIters) {
+ for (MemoryIterator mi : activeIters) {
+ mi.switchNow();
+ }
+ }
+
+ // rely on unix behavior that file will be deleted when last
+ // reader closes it
+ fs.delete(new Path(memDumpFile), true);
+
+ } catch (IOException ioe) {
+ log.error("Failed to create mem dump file ", ioe);
+
+ while (activeIters.size() > 0) {
+ UtilWaitThread.sleep(100);
+ }
+ }
+
+ }
+
+ SimpleMap tmpMap = map;
+
+ synchronized (this) {
+ map = null;
+ }
+
+ tmpMap.delete();
+ }
+
+ private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
+ while (iter.hasTop() && activeIters.size() > 0) {
+ // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
+ // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
+ Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+ out.append(iter.getTopKey(), newValue);
+ iter.next();
+
+ }
+ }
+}