You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:55:45 UTC
[06/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
deleted file mode 100644
index 85636c5..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
+++ /dev/null
@@ -1,760 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.ColumnUpdate;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.rfile.RFile;
-import org.apache.accumulo.core.file.rfile.RFileOperations;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SkippingIterator;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.SortedMapIterator;
-import org.apache.accumulo.core.iterators.WrappingIterator;
-import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
-import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
-import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
-import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
-import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.LocalityGroupUtil;
-import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
-import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-class MemKeyComparator implements Comparator<Key> {
-
- @Override
- public int compare(Key k1, Key k2) {
- int cmp = k1.compareTo(k2);
-
- if (cmp == 0) {
- if (k1 instanceof MemKey)
- if (k2 instanceof MemKey)
- cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
- else
- cmp = 1;
- else if (k2 instanceof MemKey)
- cmp = -1;
- }
-
- return cmp;
- }
-}
-
-class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
-
- int kvCount;
-
- public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
- setSource(source);
- this.kvCount = maxKVCount;
- }
-
- @Override
- protected void consume() throws IOException {
- while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
- getSource().next();
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
- }
-
- @Override
- public void setInterruptFlag(AtomicBoolean flag) {
- ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
- }
-
-}
-
-class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator {
- MemKey currKey = null;
- Value currVal = null;
-
- public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
- super();
- setSource(source);
- }
-
- public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source, MemKey startKey) {
- this(source);
- try {
- if (currKey != null)
- currKey = (MemKey) startKey.clone();
- } catch (CloneNotSupportedException e) {
- // MemKey is supported
- }
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new MemKeyConversionIterator(getSource().deepCopy(env), currKey);
- }
-
- @Override
- public Key getTopKey() {
- return currKey;
- }
-
- @Override
- public Value getTopValue() {
- return currVal;
- }
-
- private void getTopKeyVal() {
- Key k = super.getTopKey();
- Value v = super.getTopValue();
- if (k instanceof MemKey || k == null) {
- currKey = (MemKey) k;
- currVal = v;
- return;
- }
- currVal = new Value(v);
- int mc = MemValue.splitKVCount(currVal);
- currKey = new MemKey(k, mc);
-
- }
-
- public void next() throws IOException {
- super.next();
- if (hasTop())
- getTopKeyVal();
- }
-
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- super.seek(range, columnFamilies, inclusive);
-
- if (hasTop())
- getTopKeyVal();
-
- Key k = range.getStartKey();
- if (k instanceof MemKey && hasTop()) {
- while (hasTop() && currKey.compareTo(k) < 0)
- next();
- }
- }
-
- @Override
- public void setInterruptFlag(AtomicBoolean flag) {
- ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
- }
-
-}
-
-public class InMemoryMap {
- private SimpleMap map = null;
-
- private static final Logger log = Logger.getLogger(InMemoryMap.class);
-
- private volatile String memDumpFile = null;
- private final String memDumpDir;
-
- private Map<String,Set<ByteSequence>> lggroups;
-
- public InMemoryMap(boolean useNativeMap, String memDumpDir) {
- this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
- }
-
- public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
- this.memDumpDir = memDumpDir;
- this.lggroups = lggroups;
-
- if (lggroups.size() == 0)
- map = newMap(useNativeMap);
- else
- map = new LocalityGroupMap(lggroups, useNativeMap);
- }
-
- public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
- this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
- }
-
- private static SimpleMap newMap(boolean useNativeMap) {
- if (useNativeMap && NativeMap.loadedNativeLibraries()) {
- try {
- return new NativeMapWrapper();
- } catch (Throwable t) {
- log.error("Failed to create native map", t);
- }
- }
-
- return new DefaultMap();
- }
-
- private interface SimpleMap {
- public Value get(Key key);
-
- public Iterator<Entry<Key,Value>> iterator(Key startKey);
-
- public int size();
-
- public InterruptibleIterator skvIterator();
-
- public void delete();
-
- public long getMemoryUsed();
-
- public void mutate(List<Mutation> mutations, int kvCount);
- }
-
- private static class LocalityGroupMap implements SimpleMap {
-
- private Map<ByteSequence,MutableLong> groupFams[];
-
- // the last map in the array is the default locality group
- private SimpleMap maps[];
- private Partitioner partitioner;
- private List<Mutation>[] partitioned;
- private Set<ByteSequence> nonDefaultColumnFamilies;
-
- @SuppressWarnings("unchecked")
- LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
- this.groupFams = new Map[groups.size()];
- this.maps = new SimpleMap[groups.size() + 1];
- this.partitioned = new List[groups.size() + 1];
- this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
-
- for (int i = 0; i < maps.length; i++) {
- maps[i] = newMap(useNativeMap);
- }
-
- int count = 0;
- for (Set<ByteSequence> cfset : groups.values()) {
- HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
- for (ByteSequence bs : cfset)
- map.put(bs, new MutableLong(1));
- this.groupFams[count++] = map;
- nonDefaultColumnFamilies.addAll(cfset);
- }
-
- partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
-
- for (int i = 0; i < partitioned.length; i++) {
- partitioned[i] = new ArrayList<Mutation>();
- }
- }
-
- @Override
- public Value get(Key key) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<Entry<Key,Value>> iterator(Key startKey) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int size() {
- int sum = 0;
- for (SimpleMap map : maps)
- sum += map.size();
- return sum;
- }
-
- @Override
- public InterruptibleIterator skvIterator() {
- LocalityGroup groups[] = new LocalityGroup[maps.length];
- for (int i = 0; i < groups.length; i++) {
- if (i < groupFams.length)
- groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
- else
- groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
- }
-
-
- return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
- }
-
- @Override
- public void delete() {
- for (SimpleMap map : maps)
- map.delete();
- }
-
- @Override
- public long getMemoryUsed() {
- long sum = 0;
- for (SimpleMap map : maps)
- sum += map.getMemoryUsed();
- return sum;
- }
-
- @Override
- public synchronized void mutate(List<Mutation> mutations, int kvCount) {
- // this method is synchronized because it reuses objects to avoid allocation,
- // currently, the method that calls this is synchronized so there is no
- // loss in parallelism.... synchronization was added here for future proofing
-
- try{
- partitioner.partition(mutations, partitioned);
-
- for (int i = 0; i < partitioned.length; i++) {
- if (partitioned[i].size() > 0) {
- maps[i].mutate(partitioned[i], kvCount);
- for (Mutation m : partitioned[i])
- kvCount += m.getUpdates().size();
- }
- }
- } finally {
- // clear immediately so mutations can be garbage collected
- for (List<Mutation> list : partitioned) {
- list.clear();
- }
- }
- }
-
- }
-
- private static class DefaultMap implements SimpleMap {
- private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
- private AtomicLong bytesInMemory = new AtomicLong();
- private AtomicInteger size = new AtomicInteger();
-
- public void put(Key key, Value value) {
- // Always a MemKey, so account for the kvCount int
- bytesInMemory.addAndGet(key.getLength() + 4);
- bytesInMemory.addAndGet(value.getSize());
- if (map.put(key, value) == null)
- size.incrementAndGet();
- }
-
- public Value get(Key key) {
- return map.get(key);
- }
-
- public Iterator<Entry<Key,Value>> iterator(Key startKey) {
- Key lk = new Key(startKey);
- SortedMap<Key,Value> tm = map.tailMap(lk);
- return tm.entrySet().iterator();
- }
-
- public int size() {
- return size.get();
- }
-
- public synchronized InterruptibleIterator skvIterator() {
- if (map == null)
- throw new IllegalStateException();
-
- return new SortedMapIterator(map);
- }
-
- public synchronized void delete() {
- map = null;
- }
-
- public long getOverheadPerEntry() {
- // all of the java objects that are used to hold the
- // data and make it searchable have overhead... this
- // overhead is estimated using test.EstimateInMemMapOverhead
- // and is in bytes.. the estimates were obtained by running
- // java 6_16 in 64 bit server mode
-
- return 200;
- }
-
- @Override
- public void mutate(List<Mutation> mutations, int kvCount) {
- for (Mutation m : mutations) {
- for (ColumnUpdate cvp : m.getUpdates()) {
- Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
- false, kvCount++);
- Value value = new Value(cvp.getValue());
- put(newKey, value);
- }
- }
- }
-
- @Override
- public long getMemoryUsed() {
- return bytesInMemory.get() + (size() * getOverheadPerEntry());
- }
- }
-
- private static class NativeMapWrapper implements SimpleMap {
- private NativeMap nativeMap;
-
- NativeMapWrapper() {
- nativeMap = new NativeMap();
- }
-
- public Value get(Key key) {
- return nativeMap.get(key);
- }
-
- public Iterator<Entry<Key,Value>> iterator(Key startKey) {
- return nativeMap.iterator(startKey);
- }
-
- public int size() {
- return nativeMap.size();
- }
-
- public InterruptibleIterator skvIterator() {
- return (InterruptibleIterator) nativeMap.skvIterator();
- }
-
- public void delete() {
- nativeMap.delete();
- }
-
- public long getMemoryUsed() {
- return nativeMap.getMemoryUsed();
- }
-
- @Override
- public void mutate(List<Mutation> mutations, int kvCount) {
- nativeMap.mutate(mutations, kvCount);
- }
- }
-
- private AtomicInteger nextKVCount = new AtomicInteger(1);
- private AtomicInteger kvCount = new AtomicInteger(0);
-
- private Object writeSerializer = new Object();
-
- /**
- * Applies changes to a row in the InMemoryMap
- *
- */
- public void mutate(List<Mutation> mutations) {
- int numKVs = 0;
- for (int i = 0; i < mutations.size(); i++)
- numKVs += mutations.get(i).size();
-
- // Can not update mutationCount while writes that started before
- // are in progress, this would cause partial mutations to be seen.
- // Also, can not continue until mutation count is updated, because
- // a read may not see a successful write. Therefore writes must
- // wait for writes that started before to finish.
- //
- // using separate lock from this map, to allow read/write in parallel
- synchronized (writeSerializer ) {
- int kv = nextKVCount.getAndAdd(numKVs);
- try {
- map.mutate(mutations, kv);
- } finally {
- kvCount.set(kv + numKVs - 1);
- }
- }
- }
-
- /**
- * Returns a long representing the size of the InMemoryMap
- *
- * @return bytesInMemory
- */
- public synchronized long estimatedSizeInBytes() {
- if (map == null)
- return 0;
-
- return map.getMemoryUsed();
- }
-
- Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
- return map.iterator(startKey);
- }
-
- public long getNumEntries() {
- return map.size();
- }
-
- private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>());
-
- class MemoryDataSource implements DataSource {
-
- boolean switched = false;
- private InterruptibleIterator iter;
- private List<FileSKVIterator> readers;
-
- MemoryDataSource() {
- this(new ArrayList<FileSKVIterator>());
- }
-
- public MemoryDataSource(List<FileSKVIterator> readers) {
- this.readers = readers;
- }
-
- @Override
- public boolean isCurrent() {
- if (switched)
- return true;
- else
- return memDumpFile == null;
- }
-
- @Override
- public DataSource getNewDataSource() {
- if (switched)
- throw new IllegalStateException();
-
- if (!isCurrent()) {
- switched = true;
- iter = null;
- }
-
- return this;
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
- if (iter == null)
- if (!switched)
- iter = map.skvIterator();
- else {
-
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
-
- FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
-
- readers.add(reader);
-
- iter = new MemKeyConversionIterator(reader);
- }
-
- return iter;
- }
-
- @Override
- public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
- return new MemoryDataSource(readers);
- }
-
- }
-
- class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
-
- private AtomicBoolean closed;
- private SourceSwitchingIterator ssi;
- private MemoryDataSource mds;
-
- protected SortedKeyValueIterator<Key,Value> getSource() {
- if (closed.get())
- throw new IllegalStateException("Memory iterator is closed");
- return super.getSource();
- }
-
- private MemoryIterator(InterruptibleIterator source) {
- this(source, new AtomicBoolean(false));
- }
-
- private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) {
- setSource(source);
- this.closed = closed;
- }
-
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new MemoryIterator(getSource().deepCopy(env), closed);
- }
-
- public void close() {
-
- synchronized (this) {
- if (closed.compareAndSet(false, true)) {
-
- for (FileSKVIterator reader : mds.readers)
- try {
- reader.close();
- } catch (IOException e) {
- log.warn(e, e);
- }
- }
- }
-
- // remove outside of sync to avoid deadlock
- activeIters.remove(this);
- }
-
- private synchronized boolean switchNow() throws IOException {
- if (closed.get())
- return false;
-
- ssi.switchNow();
- return true;
- }
-
- @Override
- public void setInterruptFlag(AtomicBoolean flag) {
- ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
- }
-
- private void setSSI(SourceSwitchingIterator ssi) {
- this.ssi = ssi;
- }
-
- public void setMDS(MemoryDataSource mds) {
- this.mds = mds;
- }
-
- }
-
- public synchronized MemoryIterator skvIterator() {
- if (map == null)
- throw new NullPointerException();
-
- if (deleted)
- throw new IllegalStateException("Can not obtain iterator after map deleted");
-
- int mc = kvCount.get();
- MemoryDataSource mds = new MemoryDataSource();
- SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
- MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc));
- mi.setSSI(ssi);
- mi.setMDS(mds);
- activeIters.add(mi);
- return mi;
- }
-
- public SortedKeyValueIterator<Key,Value> compactionIterator() {
-
- if (nextKVCount.get() - 1 != kvCount.get())
- throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = "
- + kvCount.get());
-
- return map.skvIterator();
- }
-
- private boolean deleted = false;
-
- public void delete(long waitTime) {
-
- synchronized (this) {
- if (deleted)
- throw new IllegalStateException("Double delete");
-
- deleted = true;
- }
-
- long t1 = System.currentTimeMillis();
-
- while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) {
- UtilWaitThread.sleep(50);
- }
-
- if (activeIters.size() > 0) {
- // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file
- try {
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
-
- String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
-
- Configuration newConf = new Configuration(conf);
- newConf.setInt("io.seqfile.compress.blocksize", 100000);
-
- FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
-
- InterruptibleIterator iter = map.skvIterator();
-
- HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
-
- for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
- allfams.addAll(entry.getValue());
- out.startNewLocalityGroup(entry.getKey(), entry.getValue());
- iter.seek(new Range(), entry.getValue(), true);
- dumpLocalityGroup(out, iter);
- }
-
- out.startDefaultLocalityGroup();
- iter.seek(new Range(), allfams, false);
-
- dumpLocalityGroup(out, iter);
-
- out.close();
-
- log.debug("Created mem dump file " + tmpFile);
-
- memDumpFile = tmpFile;
-
- synchronized (activeIters) {
- for (MemoryIterator mi : activeIters) {
- mi.switchNow();
- }
- }
-
- // rely on unix behavior that file will be deleted when last
- // reader closes it
- fs.delete(new Path(memDumpFile), true);
-
- } catch (IOException ioe) {
- log.error("Failed to create mem dump file ", ioe);
-
- while (activeIters.size() > 0) {
- UtilWaitThread.sleep(100);
- }
- }
-
- }
-
- SimpleMap tmpMap = map;
-
- synchronized (this) {
- map = null;
- }
-
- tmpMap.delete();
- }
-
- private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
- while (iter.hasTop() && activeIters.size() > 0) {
- // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
- // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
- Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
- out.append(iter.getTopKey(), newValue);
- iter.next();
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
deleted file mode 100644
index dd1a6ef..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-public class LargestFirstMemoryManager implements MemoryManager {
-
- private static final Logger log = Logger.getLogger(LargestFirstMemoryManager.class);
- private static final int TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER = 2;
-
- private long maxMemory = -1;
- private int maxConcurrentMincs;
- private int numWaitingMultiplier;
- private long prevIngestMemory;
- private double compactionThreshold;
- private long maxObserved;
- private HashMap<Text,Long> mincIdleThresholds;
- private static final long zerotime = System.currentTimeMillis();
- private ServerConfiguration config = null;
-
- LargestFirstMemoryManager(long maxMemory, int maxConcurrentMincs, int numWaitingMultiplier) {
- this();
- this.maxMemory = maxMemory;
- this.maxConcurrentMincs = maxConcurrentMincs;
- this.numWaitingMultiplier = numWaitingMultiplier;
- }
-
- @Override
- public void init(ServerConfiguration conf) {
- this.config = conf;
- maxMemory = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
- maxConcurrentMincs = conf.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT);
- numWaitingMultiplier = TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER;
- }
-
- public LargestFirstMemoryManager() {
- prevIngestMemory = 0;
- compactionThreshold = 0.5;
- maxObserved = 0;
- mincIdleThresholds = new HashMap<Text,Long>();
- }
-
- @Override
- public MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets) {
- if (maxMemory < 0)
- throw new IllegalStateException("need to initialize Largst");
- mincIdleThresholds.clear();
- long ingestMemory = 0;
- long compactionMemory = 0;
- KeyExtent largestMemTablet = null;
- long largestMemTableLoad = 0;
- KeyExtent largestIdleMemTablet = null;
- long largestIdleMemTableLoad = 0;
- long mts;
- long mcmts;
- int numWaitingMincs = 0;
- long idleTime;
- long tml;
- long ct = System.currentTimeMillis();
-
- long largestMemTableIdleTime = -1, largestMemTableSize = -1;
- long largestIdleMemTableIdleTime = -1, largestIdleMemTableSize = -1;
-
- for (TabletState ts : tablets) {
- mts = ts.getMemTableSize();
- mcmts = ts.getMinorCompactingMemTableSize();
- if (ts.getLastCommitTime() > 0)
- idleTime = ct - ts.getLastCommitTime();
- else
- idleTime = ct - zerotime;
- ingestMemory += mts;
- tml = timeMemoryLoad(mts, idleTime);
- if (mcmts == 0 && mts > 0) {
- if (tml > largestMemTableLoad) {
- largestMemTableLoad = tml;
- largestMemTablet = ts.getExtent();
- largestMemTableSize = mts;
- largestMemTableIdleTime = idleTime;
- }
- Text tableId = ts.getExtent().getTableId();
- if (!mincIdleThresholds.containsKey(tableId))
- mincIdleThresholds.put(tableId, config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
- if (idleTime > mincIdleThresholds.get(tableId) && tml > largestIdleMemTableLoad) {
- largestIdleMemTableLoad = tml;
- largestIdleMemTablet = ts.getExtent();
- largestIdleMemTableSize = mts;
- largestIdleMemTableIdleTime = idleTime;
- }
- // log.debug("extent: "+ts.getExtent()+" idle threshold: "+mincIdleThresholds.get(tableId)+" idle time: "+idleTime+" memtable: "+mts+" compacting: "+mcmts);
- }
- // else {
- // log.debug("skipping extent "+ts.getExtent()+", nothing in memory");
- // }
-
- compactionMemory += mcmts;
- if (mcmts > 0)
- numWaitingMincs++;
- }
-
- if (ingestMemory + compactionMemory > maxObserved) {
- maxObserved = ingestMemory + compactionMemory;
- }
-
- long memoryChange = ingestMemory - prevIngestMemory;
- prevIngestMemory = ingestMemory;
-
- MemoryManagementActions mma = new MemoryManagementActions();
- mma.tabletsToMinorCompact = new ArrayList<KeyExtent>();
-
- boolean startMinC = false;
-
- if (numWaitingMincs < maxConcurrentMincs * numWaitingMultiplier) {
- // based on previous ingest memory increase, if we think that the next increase will
- // take us over the threshold for non-compacting memory, then start a minor compaction
- // or if the idle time of the chosen tablet is greater than the threshold, start a minor compaction
- if (memoryChange >= 0 && ingestMemory + memoryChange > compactionThreshold * maxMemory) {
- startMinC = true;
- } else if (largestIdleMemTablet != null) {
- startMinC = true;
- // switch largestMemTablet to largestIdleMemTablet
- largestMemTablet = largestIdleMemTablet;
- largestMemTableLoad = largestIdleMemTableLoad;
- largestMemTableSize = largestIdleMemTableSize;
- largestMemTableIdleTime = largestIdleMemTableIdleTime;
- log.debug("IDLE minor compaction chosen");
- }
- }
-
- if (startMinC && largestMemTablet != null) {
- mma.tabletsToMinorCompact.add(largestMemTablet);
- log.debug(String.format("COMPACTING %s total = %,d ingestMemory = %,d", largestMemTablet.toString(), (ingestMemory + compactionMemory), ingestMemory));
- log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largestMemTableSize, largestMemTableIdleTime / 1000.0, largestMemTableLoad));
- } else if (memoryChange < 0) {
- // before idle mincs, starting a minor compaction meant that memoryChange >= 0.
- // we thought we might want to remove the "else" if that changed,
- // however it seems performing idle compactions shouldn't make the threshold
- // change more often, so it is staying for now.
- // also, now we have the case where memoryChange < 0 due to an idle compaction, yet
- // we are still adjusting the threshold. should this be tracked and prevented?
-
- // memory change < 0 means a minor compaction occurred
- // we want to see how full the memory got during the compaction
- // (the goal is for it to have between 80% and 90% memory utilization)
- // and adjust the compactionThreshold accordingly
-
- log.debug(String.format("BEFORE compactionThreshold = %.3f maxObserved = %,d", compactionThreshold, maxObserved));
-
- if (compactionThreshold < 0.82 && maxObserved < 0.8 * maxMemory) {
- // 0.82 * 1.1 is about 0.9, which is our desired max threshold
- compactionThreshold *= 1.1;
- } else if (compactionThreshold > 0.056 && maxObserved > 0.9 * maxMemory) {
- // 0.056 * 0.9 is about 0.05, which is our desired min threshold
- compactionThreshold *= 0.9;
- }
- maxObserved = 0;
-
- log.debug(String.format("AFTER compactionThreshold = %.3f", compactionThreshold));
- }
-
- return mma;
- }
-
- @Override
- public void tabletClosed(KeyExtent extent) {}
-
- static long timeMemoryLoad(long mem, long time) {
- double minutesIdle = time / 60000.0;
-
- return (long) (mem * Math.pow(2, minutesIdle / 15.0));
- }
-
- public static void main(String[] args) {
- for (int i = 0; i < 62; i++) {
- System.out.printf("%d\t%d%n", i, timeMemoryLoad(1, i * 60000l));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
deleted file mode 100644
index a7e1660..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.accumulo.core.data.Key;
-
-class MemKey extends Key {
-
- int kvCount;
-
- public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) {
- super(row, cf, cq, cv, ts, del, copy);
- this.kvCount = mc;
- }
-
- public MemKey() {
- super();
- this.kvCount = Integer.MAX_VALUE;
- }
-
- public MemKey(Key key, int mc) {
- super(key);
- this.kvCount = mc;
- }
-
- public String toString() {
- return super.toString() + " mc=" + kvCount;
- }
-
- @Override
- public Object clone() throws CloneNotSupportedException {
- return super.clone();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeInt(kvCount);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- kvCount = in.readInt();
- }
-
- @Override
- public int compareTo(Key k) {
-
- int cmp = super.compareTo(k);
-
- if (cmp == 0 && k instanceof MemKey) {
- cmp = ((MemKey) k).kvCount - kvCount;
- }
-
- return cmp;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
deleted file mode 100644
index 735bf20..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.accumulo.core.data.Value;
-
-/**
- *
- */
-public class MemValue extends Value {
- int kvCount;
- boolean merged = false;
-
- /**
- * @param value
- * Value
- * @param kv
- * kv count
- */
- public MemValue(byte[] value, int kv) {
- super(value);
- this.kvCount = kv;
- }
-
- public MemValue() {
- super();
- this.kvCount = Integer.MAX_VALUE;
- }
-
- public MemValue(Value value, int kv) {
- super(value);
- this.kvCount = kv;
- }
-
- // Override
- public void write(final DataOutput out) throws IOException {
- if (!merged) {
- byte[] combinedBytes = new byte[getSize() + 4];
- System.arraycopy(value, 0, combinedBytes, 4, getSize());
- combinedBytes[0] = (byte) (kvCount >>> 24);
- combinedBytes[1] = (byte) (kvCount >>> 16);
- combinedBytes[2] = (byte) (kvCount >>> 8);
- combinedBytes[3] = (byte) (kvCount);
- value = combinedBytes;
- merged = true;
- }
- super.write(out);
- }
-
- public void set(final byte[] b) {
- super.set(b);
- merged = false;
- }
-
- public void copy(byte[] b) {
- super.copy(b);
- merged = false;
- }
-
- /**
- * Takes a Value and will take out the embedded kvCount, and then return that value while replacing the Value with the original unembedded version
- *
- * @param v
- * @return The kvCount embedded in v.
- */
- public static int splitKVCount(Value v) {
- if (v instanceof MemValue)
- return ((MemValue) v).kvCount;
-
- byte[] originalBytes = new byte[v.getSize() - 4];
- byte[] combined = v.get();
- System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length);
- v.set(originalBytes);
- return (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2] & 0xFF) << 8) + (combined[3] & 0xFF);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManagementActions.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManagementActions.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManagementActions.java
deleted file mode 100644
index 3cbe25d..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManagementActions.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.List;
-
-import org.apache.accumulo.core.data.KeyExtent;
-
-public class MemoryManagementActions {
- public List<KeyExtent> tabletsToMinorCompact;
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManager.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManager.java
deleted file mode 100644
index f03b04b..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.List;
-
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-
-/**
- * A MemoryManager in accumulo currently determines when minor compactions should occur and when ingest should be put on hold. The goal of a memory manager
- * implementation is to maximize ingest throughput and minimize the number of minor compactions.
- *
- *
- *
- */
-
-public interface MemoryManager {
-
- /**
- * Initialize the memory manager.
- *
- * @param conf
- */
- void init(ServerConfiguration conf);
-
- /**
- * An implementation of this function will be called periodically by accumulo and should return a list of tablets to minor compact.
- *
- * Instructing a tablet that is already minor compacting (this can be inferred from the TabletState) to minor compact has no effect.
- *
- * Holding all ingest does not affect metadata tablets.
- */
-
- MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets);
-
- /**
- * This method is called when a tablet is closed. A memory manger can clean up any per tablet state it is keeping when this is called.
- */
- void tabletClosed(KeyExtent extent);
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
deleted file mode 100644
index 4478f8e..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReports;
-import org.apache.accumulo.server.problems.ProblemType;
-import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-public class MinorCompactor extends Compactor {
-
- private static final Logger log = Logger.getLogger(MinorCompactor.class);
-
- private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap();
-
- private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) {
- if (mergeFile == null)
- return EMPTY_MAP;
-
- return Collections.singletonMap(mergeFile, dfv);
- }
-
- MinorCompactor(Configuration conf, VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf,
- KeyExtent extent, MinorCompactionReason mincReason) {
- super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() {
-
- @Override
- public boolean isCompactionEnabled() {
- return true;
- }
-
- @Override
- public IteratorScope getIteratorScope() {
- return IteratorScope.minc;
- }
- });
-
- super.mincReason = mincReason;
- }
-
- private boolean isTableDeleting() {
- try {
- return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString()) == TableState.DELETING;
- } catch (Exception e) {
- log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e);
- return false; // can not get positive confirmation that its deleting.
- }
- }
-
- @Override
- public CompactionStats call() {
- log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent());
-
- // output to new MapFile with a temporary name
- int sleepTime = 100;
- double growthFactor = 4;
- int maxSleepTime = 1000 * 60 * 3; // 3 minutes
- boolean reportedProblem = false;
-
- runningCompactions.add(this);
- try {
- do {
- try {
- CompactionStats ret = super.call();
-
- // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted,
- // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes()));
-
- if (reportedProblem) {
- ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile());
- }
-
- return ret;
- } catch (IOException e) {
- log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...");
- ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e));
- reportedProblem = true;
- } catch (RuntimeException e) {
- // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the
- // minor compaction would succeed
- log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...", e);
- ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e));
- reportedProblem = true;
- } catch (CompactionCanceledException e) {
- throw new IllegalStateException(e);
- }
-
- Random random = new Random();
-
- int sleep = sleepTime + random.nextInt(sleepTime);
- log.debug("MinC failed sleeping " + sleep + " ms before retrying");
- UtilWaitThread.sleep(sleep);
- sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor));
-
- // clean up
- try {
- if (getFileSystem().exists(new Path(getOutputFile()))) {
- getFileSystem().deleteRecursively(new Path(getOutputFile()));
- }
- } catch (IOException e) {
- log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
- }
-
- if (isTableDeleting())
- return new CompactionStats(0, 0);
-
- } while (true);
- } finally {
- thread = null;
- runningCompactions.remove(this);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/NativeMap.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/NativeMap.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/NativeMap.java
deleted file mode 100644
index 7242c4a..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/NativeMap.java
+++ /dev/null
@@ -1,717 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.ConcurrentModificationException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.ColumnUpdate;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IterationInterruptedException;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
-import org.apache.accumulo.start.Platform;
-import org.apache.log4j.Logger;
-
-/**
- * This class stores data in a C++ map. Doing this allows us to store more in memory and avoid pauses caused by Java GC.
- *
- * The strategy for dealing with native memory allocated for the native map is that java code using the native map should call delete() as soon as it is
- * finished using the native map. When the NativeMap object is garbage collected its native resources will be released if needed. However waiting for java GC
- * would be a mistake for long lived NativeMaps. Long lived objects are not garbage collected quickly, therefore a process could easily use too much memory.
- *
- */
-
-public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
-
- private static final Logger log = Logger.getLogger(NativeMap.class);
-
- private long nmPointer;
-
- private final ReadWriteLock rwLock;
- private final Lock rlock;
- private final Lock wlock;
-
- int modCount = 0;
-
- private static native long createNM();
-
- // private static native void putNM(long nmPointer, byte[] kd, int cfo, int cqo, int cvo, int tl, long ts, boolean del, byte[] value);
-
- private static native void singleUpdate(long nmPointer, byte[] row, byte cf[], byte cq[], byte cv[], long ts, boolean del, byte[] value, int mutationCount);
-
- private static native long startUpdate(long nmPointer, byte[] row);
-
- private static native void update(long nmPointer, long updateID, byte cf[], byte cq[], byte cv[], long ts, boolean del, byte[] value, int mutationCount);
-
- private static native int sizeNM(long nmPointer);
-
- private static native long memoryUsedNM(long nmPointer);
-
- private static native long deleteNM(long nmPointer);
-
- private static boolean init = false;
- private static long totalAllocations;
- private static HashSet<Long> allocatedNativeMaps;
-
- private static synchronized long createNativeMap() {
-
- if (!init) {
- allocatedNativeMaps = new HashSet<Long>();
-
- Runnable r = new Runnable() {
- @Override
- public void run() {
- if (allocatedNativeMaps.size() > 0) {
- // print to system err in case log4j is shutdown...
- try {
- log.warn("There are " + allocatedNativeMaps.size() + " allocated native maps");
- } catch (Throwable t) {
- log.error("There are " + allocatedNativeMaps.size() + " allocated native maps");
- }
- }
-
- log.debug(totalAllocations + " native maps were allocated");
- }
- };
-
- Runtime.getRuntime().addShutdownHook(new Thread(r));
-
- init = true;
- }
-
- long nmPtr = createNM();
-
- if (allocatedNativeMaps.contains(nmPtr)) {
- // something is really screwy, this should not happen
- throw new RuntimeException(String.format("Duplicate native map pointer 0x%016x ", nmPtr));
- }
-
- totalAllocations++;
- allocatedNativeMaps.add(nmPtr);
-
- return nmPtr;
- }
-
- private static synchronized void deleteNativeMap(long nmPtr) {
- if (allocatedNativeMaps.contains(nmPtr)) {
- deleteNM(nmPtr);
- allocatedNativeMaps.remove(nmPtr);
- } else {
- throw new RuntimeException(String.format("Attempt to delete native map that is not allocated 0x%016x ", nmPtr));
- }
- }
-
- private static boolean loadedNativeLibraries = false;
-
- public static String getNativeLibPath() {
- return "lib/native/map/" + System.mapLibraryName("NativeMap-" + Platform.getPlatform());
- }
-
- public static void loadNativeLib(String nativeLib) {
- try {
- System.load(nativeLib);
- log.info("Loaded native map shared library " + nativeLib);
- loadedNativeLibraries = true;
- } catch (Throwable t) {
- log.error("Failed to load native map library " + nativeLib, t);
- }
- }
-
- static {
- String aHome = System.getenv("ACCUMULO_HOME");
- if (aHome != null) {
- String nativeLib = aHome + "/" + getNativeLibPath();
- loadNativeLib(new File(nativeLib).getAbsolutePath());
- }
- }
-
- public static boolean loadedNativeLibraries() {
- return loadedNativeLibraries;
- }
-
- private static native long createNMI(long nmp, int fieldLens[]);
-
- private static native long createNMI(long nmp, byte[] row, byte cf[], byte cq[], byte cv[], long ts, boolean del, int fieldLens[]);
-
- private static native boolean nmiNext(long nmiPointer, int fieldLens[]);
-
- private static native void nmiGetData(long nmiPointer, byte[] row, byte cf[], byte cq[], byte cv[], byte[] valData);
-
- private static native long nmiGetTS(long nmiPointer);
-
- private static native void deleteNMI(long nmiPointer);
-
- private class ConcurrentIterator implements Iterator<Map.Entry<Key,Value>> {
-
- // in order to get good performance when there are multiple threads reading, need to read a lot while the
- // the read lock is held..... lots of threads trying to get the read lock very often causes serious slow
- // downs.... also reading a lot of entries at once lessens the impact of concurrent writes... if only
- // one entry were read at a time and there were concurrent writes, then iteration could be n*log(n)
-
- // increasing this number has a positive effect on concurrent read performance, but negatively effects
- // concurrent writers
- private static final int MAX_READ_AHEAD_ENTRIES = 16;
- private static final int READ_AHEAD_BYTES = 4096;
-
- private NMIterator source;
-
- private Entry<Key,Value> nextEntries[];
- private int index;
- private int end;
-
- ConcurrentIterator() {
- this(new MemKey());
- }
-
- @SuppressWarnings("unchecked")
- ConcurrentIterator(Key key) {
- // start off with a small read ahead
- nextEntries = new Entry[1];
-
- rlock.lock();
- try {
- source = new NMIterator(key);
- fill();
- } finally {
- rlock.unlock();
- }
- }
-
- // it is assumed the read lock is held when this method is called
- @SuppressWarnings("unchecked")
- private void fill() {
- end = 0;
- index = 0;
-
- if (source.hasNext())
- source.doNextPreCheck();
-
- int amountRead = 0;
-
- // as we keep filling, increase the read ahead buffer
- if (nextEntries.length < MAX_READ_AHEAD_ENTRIES)
- nextEntries = new Entry[Math.min(nextEntries.length * 2, MAX_READ_AHEAD_ENTRIES)];
-
- while (source.hasNext() && end < nextEntries.length) {
- Entry<Key,Value> ne = source.next();
- nextEntries[end++] = ne;
- amountRead += ne.getKey().getSize() + ne.getValue().getSize();
-
- if (amountRead > READ_AHEAD_BYTES)
- break;
- }
- }
-
- @Override
- public boolean hasNext() {
- return end != 0;
- }
-
- @Override
- public Entry<Key,Value> next() {
- if (end == 0) {
- throw new NoSuchElementException();
- }
-
- Entry<Key,Value> ret = nextEntries[index++];
-
- if (index == end) {
- rlock.lock();
- try {
- fill();
- } catch (ConcurrentModificationException cme) {
- source.delete();
- source = new NMIterator(ret.getKey());
- fill();
- if (0 < end && nextEntries[0].getKey().equals(ret.getKey())) {
- index++;
- if (index == end) {
- fill();
- }
- }
- } finally {
- rlock.unlock();
- }
-
- }
-
- return ret;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- public void delete() {
- source.delete();
- }
- }
-
- private class NMIterator implements Iterator<Map.Entry<Key,Value>> {
-
- /**
- * The strategy for dealing with native memory allocated for iterators is to simply delete that memory when this Java Object is garbage collected.
- *
- * These iterators are likely short lived object and therefore will be quickly garbage collected. Even if the objects are long lived and therefore more
- * slowly garbage collected they only hold a small amount of native memory.
- *
- */
-
- private long nmiPointer;
- private boolean hasNext;
- private int expectedModCount;
- private int[] fieldsLens = new int[7];
- private byte lastRow[];
-
- // it is assumed the read lock is held when this method is called
- NMIterator(Key key) {
-
- if (nmPointer == 0) {
- throw new IllegalStateException();
- }
-
- expectedModCount = modCount;
-
- nmiPointer = createNMI(nmPointer, key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), key
- .getColumnVisibilityData().toArray(), key.getTimestamp(), key.isDeleted(), fieldsLens);
-
- hasNext = nmiPointer != 0;
- }
-
- // delete is synchronized on a per iterator basis want to ensure only one
- // thread deletes an iterator w/o acquiring the global write lock...
- // there is no contention among concurrent readers for deleting their iterators
- public synchronized void delete() {
- if (nmiPointer == 0) {
- return;
- }
-
- // log.debug("Deleting native map iterator pointer");
-
- deleteNMI(nmiPointer);
- nmiPointer = 0;
- }
-
- @Override
- public boolean hasNext() {
- return hasNext;
- }
-
- // it is assumed the read lock is held when this method is called
- // this method only needs to be called once per read lock acquisition
- private void doNextPreCheck() {
- if (nmPointer == 0) {
- throw new IllegalStateException();
- }
-
- if (modCount != expectedModCount) {
- throw new ConcurrentModificationException();
- }
- }
-
- @Override
- // It is assumed that this method is called w/ the read lock held and
- // that doNextPreCheck() is called prior to calling this method
- // also this method is synchronized to ensure that a deleted iterator
- // is not used
- public synchronized Entry<Key,Value> next() {
- if (!hasNext) {
- throw new NoSuchElementException();
- }
-
- if (nmiPointer == 0) {
- throw new IllegalStateException("Native Map Iterator Deleted");
- }
-
- byte[] row = null;
- if (fieldsLens[0] >= 0) {
- row = new byte[fieldsLens[0]];
- lastRow = row;
- }
-
- byte cf[] = new byte[fieldsLens[1]];
- byte cq[] = new byte[fieldsLens[2]];
- byte cv[] = new byte[fieldsLens[3]];
- boolean deleted = fieldsLens[4] == 0 ? false : true;
- byte val[] = new byte[fieldsLens[5]];
-
- nmiGetData(nmiPointer, row, cf, cq, cv, val);
- long ts = nmiGetTS(nmiPointer);
-
- Key k = new MemKey(lastRow, cf, cq, cv, ts, deleted, false, fieldsLens[6]);
- Value v = new Value(val, false);
-
- hasNext = nmiNext(nmiPointer, fieldsLens);
-
- return new NMEntry(k, v);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- if (nmiPointer != 0) {
- // log.debug("Deleting native map iterator pointer in finalize");
- deleteNMI(nmiPointer);
- }
- }
-
- }
-
- private static class NMEntry implements Map.Entry<Key,Value> {
-
- private Key key;
- private Value val;
-
- NMEntry(Key k, Value v) {
- this.key = k;
- this.val = v;
- }
-
- @Override
- public Key getKey() {
- return key;
- }
-
- @Override
- public Value getValue() {
- return val;
- }
-
- @Override
- public Value setValue(Value value) {
- throw new UnsupportedOperationException();
- }
-
- public String toString() {
- return key + "=" + val;
- }
- }
-
- public NativeMap() {
- nmPointer = createNativeMap();
- rwLock = new ReentrantReadWriteLock();
- rlock = rwLock.readLock();
- wlock = rwLock.writeLock();
- log.debug(String.format("Allocated native map 0x%016x", nmPointer));
- }
-
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- if (nmPointer != 0) {
- log.warn(String.format("Deallocating native map 0x%016x in finalize", nmPointer));
- deleteNativeMap(nmPointer);
- }
- }
-
- private void _mutate(Mutation mutation, int mutationCount) {
-
- List<ColumnUpdate> updates = mutation.getUpdates();
- if (updates.size() == 1) {
- ColumnUpdate update = updates.get(0);
- singleUpdate(nmPointer, mutation.getRow(), update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(),
- update.isDeleted(), update.getValue(), mutationCount);
- } else if (updates.size() > 1) {
- long uid = startUpdate(nmPointer, mutation.getRow());
- for (ColumnUpdate update : updates) {
- update(nmPointer, uid, update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(), update.isDeleted(),
- update.getValue(), mutationCount);
- }
-
- }
- }
-
- public void mutate(Mutation mutation, int mutationCount) {
- wlock.lock();
- try {
- if (nmPointer == 0) {
- throw new IllegalStateException("Native Map Deleted");
- }
-
- modCount++;
-
- _mutate(mutation, mutationCount);
- } finally {
- wlock.unlock();
- }
- }
-
- public void mutate(List<Mutation> mutations, int mutationCount) {
- Iterator<Mutation> iter = mutations.iterator();
-
- while (iter.hasNext()) {
-
- wlock.lock();
- try {
- if (nmPointer == 0) {
- throw new IllegalStateException("Native Map Deleted");
- }
-
- modCount++;
-
- int count = 0;
- while (iter.hasNext() && count < 10) {
- Mutation mutation = iter.next();
- _mutate(mutation, mutationCount);
- mutationCount++;
- count += mutation.size();
- }
- } finally {
- wlock.unlock();
- }
- }
- }
-
- public void put(Key key, Value value) {
- wlock.lock();
- try {
- if (nmPointer == 0) {
- throw new IllegalStateException("Native Map Deleted");
- }
-
- modCount++;
-
- singleUpdate(nmPointer, key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), key
- .getColumnVisibilityData().toArray(), key.getTimestamp(), key.isDeleted(), value.get(), 0);
- } finally {
- wlock.unlock();
- }
- }
-
- public Value get(Key key) {
- rlock.lock();
- try {
- Value ret = null;
- NMIterator nmi = new NMIterator(key);
- if (nmi.hasNext()) {
- Entry<Key,Value> entry = nmi.next();
- if (entry.getKey().equals(key)) {
- ret = entry.getValue();
- }
- }
-
- nmi.delete();
-
- return ret;
- } finally {
- rlock.unlock();
- }
- }
-
- public int size() {
- rlock.lock();
- try {
- if (nmPointer == 0) {
- throw new IllegalStateException("Native Map Deleted");
- }
-
- return sizeNM(nmPointer);
- } finally {
- rlock.unlock();
- }
- }
-
- public long getMemoryUsed() {
- rlock.lock();
- try {
- if (nmPointer == 0) {
- throw new IllegalStateException("Native Map Deleted");
- }
-
- return memoryUsedNM(nmPointer);
- } finally {
- rlock.unlock();
- }
- }
-
- public Iterator<Map.Entry<Key,Value>> iterator() {
- rlock.lock();
- try {
- if (nmPointer == 0) {
- throw new IllegalStateException("Native Map Deleted");
- }
-
- return new ConcurrentIterator();
- } finally {
- rlock.unlock();
- }
- }
-
- public Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
- rlock.lock();
- try {
-
- if (nmPointer == 0) {
- throw new IllegalStateException("Native Map Deleted");
- }
-
- return new ConcurrentIterator(startKey);
- } finally {
- rlock.unlock();
- }
- }
-
- public void delete() {
- wlock.lock();
- try {
- if (nmPointer == 0) {
- throw new IllegalStateException("Native Map Deleted");
- }
-
- log.debug(String.format("Deallocating native map 0x%016x", nmPointer));
- deleteNativeMap(nmPointer);
- nmPointer = 0;
- } finally {
- wlock.unlock();
- }
- }
-
- private static class NMSKVIter implements InterruptibleIterator {
-
- private ConcurrentIterator iter;
- private Entry<Key,Value> entry;
-
- private NativeMap map;
- private Range range;
- private AtomicBoolean interruptFlag;
- private int interruptCheckCount = 0;
-
- private NMSKVIter(NativeMap map, AtomicBoolean interruptFlag) {
- this.map = map;
- this.range = new Range();
- iter = map.new ConcurrentIterator();
- if (iter.hasNext())
- entry = iter.next();
- else
- entry = null;
-
- this.interruptFlag = interruptFlag;
- }
-
- public NMSKVIter(NativeMap map) {
- this(map, null);
- }
-
- @Override
- public Key getTopKey() {
- return entry.getKey();
- }
-
- @Override
- public Value getTopValue() {
- return entry.getValue();
- }
-
- @Override
- public boolean hasTop() {
- return entry != null;
- }
-
- @Override
- public void next() throws IOException {
-
- if (entry == null)
- throw new IllegalStateException();
-
- // checking the interrupt flag for every call to next had bad a bad performance impact
- // so check it every 100th time
- if (interruptFlag != null && interruptCheckCount++ % 100 == 0 && interruptFlag.get())
- throw new IterationInterruptedException();
-
- if (iter.hasNext()) {
- entry = iter.next();
- if (range.afterEndKey(entry.getKey())) {
- entry = null;
- }
- } else
- entry = null;
-
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-
- if (interruptFlag != null && interruptFlag.get())
- throw new IterationInterruptedException();
-
- iter.delete();
-
- this.range = range;
-
- Key key = range.getStartKey();
- if (key == null) {
- key = new MemKey();
- }
-
- iter = map.new ConcurrentIterator(key);
- if (iter.hasNext()) {
- entry = iter.next();
- if (range.afterEndKey(entry.getKey())) {
- entry = null;
- }
- } else
- entry = null;
-
- while (hasTop() && range.beforeStartKey(getTopKey())) {
- next();
- }
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new NMSKVIter(map, interruptFlag);
- }
-
- @Override
- public void setInterruptFlag(AtomicBoolean flag) {
- this.interruptFlag = flag;
- }
- }
-
- public SortedKeyValueIterator<Key,Value> skvIterator() {
- return new NMSKVIter(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/Rate.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Rate.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Rate.java
deleted file mode 100644
index 1d1123b..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Rate.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-public class Rate {
- private long lastCounter = -1;
- private long lastTime = -1;
- private double current = 0.0;
- final double ratio;
-
- /**
- * Turn a counter into an exponentially smoothed rate over time.
- *
- * @param ratio
- * the rate at which each update influences the curve; must be (0., 1.0)
- */
- public Rate(double ratio) {
- if (ratio <= 0. || ratio >= 1.0)
- throw new IllegalArgumentException("ratio must be > 0. and < 1.0");
- this.ratio = ratio;
- }
-
- public double update(long counter) {
- return update(System.currentTimeMillis(), counter);
- }
-
- synchronized public double update(long when, long counter) {
- if (lastCounter < 0) {
- lastTime = when;
- lastCounter = counter;
- return current;
- }
- if (lastTime == when) {
- throw new IllegalArgumentException("update time < last value");
- }
- double keep = 1. - ratio;
- current = (keep * current + ratio * ((counter - lastCounter)) * 1000. / (when - lastTime));
- lastTime = when;
- lastCounter = counter;
- return current;
- }
-
- synchronized public double rate() {
- return this.current;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
deleted file mode 100644
index f057ca3..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.accumulo.core.data.ArrayByteSequence;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.server.data.ServerConditionalMutation;
-import org.apache.accumulo.server.tabletserver.ConditionalMutationSet.DeferFilter;
-
-/**
- *
- */
-class RowLocks {
-
- private Map<ByteSequence,RowLock> rowLocks = new HashMap<ByteSequence,RowLock>();
-
- static class RowLock {
- ReentrantLock rlock;
- int count;
- ByteSequence rowSeq;
-
- RowLock(ReentrantLock rlock, ByteSequence rowSeq) {
- this.rlock = rlock;
- this.count = 0;
- this.rowSeq = rowSeq;
- }
-
- public boolean tryLock() {
- return rlock.tryLock();
- }
-
- public void lock() {
- rlock.lock();
- }
-
- public void unlock() {
- rlock.unlock();
- }
- }
-
- private RowLock getRowLock(ArrayByteSequence rowSeq) {
- RowLock lock = rowLocks.get(rowSeq);
- if (lock == null) {
- lock = new RowLock(new ReentrantLock(), rowSeq);
- rowLocks.put(rowSeq, lock);
- }
-
- lock.count++;
- return lock;
- }
-
- private void returnRowLock(RowLock lock) {
- if (lock.count == 0)
- throw new IllegalStateException();
- lock.count--;
-
- if (lock.count == 0) {
- rowLocks.remove(lock.rowSeq);
- }
- }
-
- List<RowLock> acquireRowlocks(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
- ArrayList<RowLock> locks = new ArrayList<RowLock>();
-
- // assume that mutations are in sorted order to avoid deadlock
- synchronized (rowLocks) {
- for (List<ServerConditionalMutation> scml : updates.values()) {
- for (ServerConditionalMutation scm : scml) {
- locks.add(getRowLock(new ArrayByteSequence(scm.getRow())));
- }
- }
- }
-
- HashSet<ByteSequence> rowsNotLocked = null;
-
- // acquire as many locks as possible, not blocking on rows that are already locked
- if (locks.size() > 1) {
- for (RowLock rowLock : locks) {
- if (!rowLock.tryLock()) {
- if (rowsNotLocked == null)
- rowsNotLocked = new HashSet<ByteSequence>();
- rowsNotLocked.add(rowLock.rowSeq);
- }
- }
- } else {
- // if there is only one lock, then wait for it
- locks.get(0).lock();
- }
-
- if (rowsNotLocked != null) {
-
- final HashSet<ByteSequence> rnlf = rowsNotLocked;
- // assume will get locks needed, do something expensive otherwise
- ConditionalMutationSet.defer(updates, deferred, new DeferFilter() {
- @Override
- public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
- for (ServerConditionalMutation scm : scml) {
- if (rnlf.contains(new ArrayByteSequence(scm.getRow())))
- deferred.add(scm);
- else
- okMutations.add(scm);
-
- }
- }
- });
-
- ArrayList<RowLock> filteredLocks = new ArrayList<RowLock>();
- ArrayList<RowLock> locksToReturn = new ArrayList<RowLock>();
- for (RowLock rowLock : locks) {
- if (rowsNotLocked.contains(rowLock.rowSeq)) {
- locksToReturn.add(rowLock);
- } else {
- filteredLocks.add(rowLock);
- }
- }
-
- synchronized (rowLocks) {
- for (RowLock rowLock : locksToReturn) {
- returnRowLock(rowLock);
- }
- }
-
- locks = filteredLocks;
- }
- return locks;
- }
-
- void releaseRowLocks(List<RowLock> locks) {
- for (RowLock rowLock : locks) {
- rowLock.unlock();
- }
-
- synchronized (rowLocks) {
- for (RowLock rowLock : locks) {
- returnRowLock(rowLock);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/TLevel.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TLevel.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TLevel.java
deleted file mode 100644
index 7c49ee1..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TLevel.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import org.apache.log4j.Level;
-
-public class TLevel extends Level {
-
- private static final long serialVersionUID = 1L;
- public final static Level TABLET_HIST = new TLevel();
-
- protected TLevel() {
- super(Level.DEBUG_INT + 100, "TABLET_HIST", Level.DEBUG_INT + 100);
- }
-
- static public Level toLevel(int val) {
- if (val == Level.DEBUG_INT + 100)
- return Level.DEBUG;
- return Level.toLevel(val);
- }
-
-}