You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/01/09 03:44:17 UTC
[13/66] [abbrv] accumulo git commit: ACCUMULO-3451 Format master
branch (1.7.0-SNAPSHOT)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 7378348..47936b6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -71,13 +71,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
class MemKeyComparator implements Comparator<Key>, Serializable {
-
+
private static final long serialVersionUID = 1L;
@Override
public int compare(Key k1, Key k2) {
int cmp = k1.compareTo(k2);
-
+
if (cmp == 0) {
if (k1 instanceof MemKey)
if (k2 instanceof MemKey)
@@ -87,36 +87,36 @@ class MemKeyComparator implements Comparator<Key>, Serializable {
else if (k2 instanceof MemKey)
cmp = -1;
}
-
+
return cmp;
}
}
class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
-
+
int kvCount;
-
+
public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
setSource(source);
this.kvCount = maxKVCount;
}
-
+
@Override
protected void consume() throws IOException {
while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
getSource().next();
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
}
-
+
@Override
public void setInterruptFlag(AtomicBoolean flag) {
((InterruptibleIterator) getSource()).setInterruptFlag(flag);
}
-
+
}
class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator {
@@ -132,17 +132,17 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
return new MemKeyConversionIterator(getSource().deepCopy(env));
}
-
+
@Override
public Key getTopKey() {
return currKey;
}
-
+
@Override
public Value getTopValue() {
return currVal;
}
-
+
private void getTopKeyVal() {
Key k = super.getTopKey();
Value v = super.getTopValue();
@@ -156,7 +156,7 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible
currKey = new MemKey(k, mc);
}
-
+
public void next() throws IOException {
super.next();
if (hasTop())
@@ -165,7 +165,7 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
super.seek(range, columnFamilies, inclusive);
-
+
if (hasTop())
getTopKeyVal();
@@ -185,14 +185,14 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible
public class InMemoryMap {
private SimpleMap map = null;
-
+
private static final Logger log = Logger.getLogger(InMemoryMap.class);
-
+
private volatile String memDumpFile = null;
private final String memDumpDir;
private Map<String,Set<ByteSequence>> lggroups;
-
+
public InMemoryMap(boolean useNativeMap, String memDumpDir) {
this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
}
@@ -200,17 +200,17 @@ public class InMemoryMap {
public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
this.memDumpDir = memDumpDir;
this.lggroups = lggroups;
-
+
if (lggroups.size() == 0)
map = newMap(useNativeMap);
else
map = new LocalityGroupMap(lggroups, useNativeMap);
}
-
+
public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
}
-
+
private static SimpleMap newMap(boolean useNativeMap) {
if (useNativeMap && NativeMap.isLoaded()) {
try {
@@ -219,43 +219,43 @@ public class InMemoryMap {
log.error("Failed to create native map", t);
}
}
-
+
return new DefaultMap();
}
-
+
private interface SimpleMap {
Value get(Key key);
-
+
Iterator<Entry<Key,Value>> iterator(Key startKey);
-
+
int size();
-
+
InterruptibleIterator skvIterator();
-
+
void delete();
-
+
long getMemoryUsed();
-
+
void mutate(List<Mutation> mutations, int kvCount);
}
-
+
private static class LocalityGroupMap implements SimpleMap {
-
+
private Map<ByteSequence,MutableLong> groupFams[];
-
+
// the last map in the array is the default locality group
private SimpleMap maps[];
private Partitioner partitioner;
private List<Mutation>[] partitioned;
private Set<ByteSequence> nonDefaultColumnFamilies;
-
+
@SuppressWarnings("unchecked")
LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
this.groupFams = new Map[groups.size()];
this.maps = new SimpleMap[groups.size() + 1];
this.partitioned = new List[groups.size() + 1];
this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
-
+
for (int i = 0; i < maps.length; i++) {
maps[i] = newMap(useNativeMap);
}
@@ -268,9 +268,9 @@ public class InMemoryMap {
this.groupFams[count++] = map;
nonDefaultColumnFamilies.addAll(cfset);
}
-
+
partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
-
+
for (int i = 0; i < partitioned.length; i++) {
partitioned[i] = new ArrayList<Mutation>();
}
@@ -280,12 +280,12 @@ public class InMemoryMap {
public Value get(Key key) {
throw new UnsupportedOperationException();
}
-
+
@Override
public Iterator<Entry<Key,Value>> iterator(Key startKey) {
throw new UnsupportedOperationException();
}
-
+
@Override
public int size() {
int sum = 0;
@@ -293,7 +293,7 @@ public class InMemoryMap {
sum += map.size();
return sum;
}
-
+
@Override
public InterruptibleIterator skvIterator() {
LocalityGroup groups[] = new LocalityGroup[maps.length];
@@ -304,16 +304,15 @@ public class InMemoryMap {
groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
}
-
return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
}
-
+
@Override
public void delete() {
for (SimpleMap map : maps)
map.delete();
}
-
+
@Override
public long getMemoryUsed() {
long sum = 0;
@@ -321,16 +320,16 @@ public class InMemoryMap {
sum += map.getMemoryUsed();
return sum;
}
-
+
@Override
public synchronized void mutate(List<Mutation> mutations, int kvCount) {
// this method is synchronized because it reuses objects to avoid allocation,
// currently, the method that calls this is synchronized so there is no
// loss in parallelism.... synchronization was added here for future proofing
-
- try{
+
+ try {
partitioner.partition(mutations, partitioned);
-
+
for (int i = 0; i < partitioned.length; i++) {
if (partitioned[i].size() > 0) {
maps[i].mutate(partitioned[i], kvCount);
@@ -345,14 +344,14 @@ public class InMemoryMap {
}
}
}
-
+
}
private static class DefaultMap implements SimpleMap {
private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
private AtomicLong bytesInMemory = new AtomicLong();
private AtomicInteger size = new AtomicInteger();
-
+
public void put(Key key, Value value) {
// Always a MemKey, so account for the kvCount int
bytesInMemory.addAndGet(key.getLength() + 4);
@@ -360,42 +359,42 @@ public class InMemoryMap {
if (map.put(key, value) == null)
size.incrementAndGet();
}
-
+
public Value get(Key key) {
return map.get(key);
}
-
+
public Iterator<Entry<Key,Value>> iterator(Key startKey) {
Key lk = new Key(startKey);
SortedMap<Key,Value> tm = map.tailMap(lk);
return tm.entrySet().iterator();
}
-
+
public int size() {
return size.get();
}
-
+
public synchronized InterruptibleIterator skvIterator() {
if (map == null)
throw new IllegalStateException();
-
+
return new SortedMapIterator(map);
}
-
+
public synchronized void delete() {
map = null;
}
-
+
public long getOverheadPerEntry() {
// all of the java objects that are used to hold the
// data and make it searchable have overhead... this
// overhead is estimated using test.EstimateInMemMapOverhead
// and is in bytes.. the estimates were obtained by running
// java 6_16 in 64 bit server mode
-
+
return 200;
}
-
+
@Override
public void mutate(List<Mutation> mutations, int kvCount) {
for (Mutation m : mutations) {
@@ -407,64 +406,64 @@ public class InMemoryMap {
}
}
}
-
+
@Override
public long getMemoryUsed() {
return bytesInMemory.get() + (size() * getOverheadPerEntry());
}
}
-
+
private static class NativeMapWrapper implements SimpleMap {
private NativeMap nativeMap;
-
+
NativeMapWrapper() {
nativeMap = new NativeMap();
}
-
+
public Value get(Key key) {
return nativeMap.get(key);
}
-
+
public Iterator<Entry<Key,Value>> iterator(Key startKey) {
return nativeMap.iterator(startKey);
}
-
+
public int size() {
return nativeMap.size();
}
-
+
public InterruptibleIterator skvIterator() {
return (InterruptibleIterator) nativeMap.skvIterator();
}
-
+
public void delete() {
nativeMap.delete();
}
-
+
public long getMemoryUsed() {
return nativeMap.getMemoryUsed();
}
-
+
@Override
public void mutate(List<Mutation> mutations, int kvCount) {
nativeMap.mutate(mutations, kvCount);
}
}
-
+
private AtomicInteger nextKVCount = new AtomicInteger(1);
private AtomicInteger kvCount = new AtomicInteger(0);
private Object writeSerializer = new Object();
-
+
/**
* Applies changes to a row in the InMemoryMap
- *
+ *
*/
public void mutate(List<Mutation> mutations) {
int numKVs = 0;
for (int i = 0; i < mutations.size(); i++)
numKVs += mutations.get(i).size();
-
+
// Can not update mutationCount while writes that started before
// are in progress, this would cause partial mutations to be seen.
// Also, can not continue until mutation count is updated, because
@@ -472,7 +471,7 @@ public class InMemoryMap {
// wait for writes that started before to finish.
//
// using separate lock from this map, to allow read/write in parallel
- synchronized (writeSerializer ) {
+ synchronized (writeSerializer) {
int kv = nextKVCount.getAndAdd(numKVs);
try {
map.mutate(mutations, kv);
@@ -481,51 +480,51 @@ public class InMemoryMap {
}
}
}
-
+
/**
* Returns a long representing the size of the InMemoryMap
- *
+ *
* @return bytesInMemory
*/
public synchronized long estimatedSizeInBytes() {
if (map == null)
return 0;
-
+
return map.getMemoryUsed();
}
-
+
Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
return map.iterator(startKey);
}
-
+
public synchronized long getNumEntries() {
if (map == null)
return 0;
return map.size();
}
-
+
private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>());
-
+
class MemoryDataSource implements DataSource {
-
+
boolean switched = false;
private InterruptibleIterator iter;
private FileSKVIterator reader;
private MemoryDataSource parent;
private IteratorEnvironment env;
private AtomicBoolean iflag;
-
+
MemoryDataSource() {
this(null, false, null, null);
}
-
+
public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag) {
this.parent = parent;
this.switched = switched;
this.env = env;
this.iflag = iflag;
}
-
+
@Override
public boolean isCurrent() {
if (switched)
@@ -533,12 +532,12 @@ public class InMemoryMap {
else
return memDumpFile == null;
}
-
+
@Override
public DataSource getNewDataSource() {
if (switched)
throw new IllegalStateException();
-
+
if (!isCurrent()) {
switched = true;
iter = null;
@@ -549,15 +548,15 @@ public class InMemoryMap {
throw new RuntimeException();
}
}
-
+
return this;
}
-
+
private synchronized FileSKVIterator getReader() throws IOException {
if (reader == null) {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = FileSystem.getLocal(conf);
-
+
reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, SiteConfiguration.getInstance());
if (iflag != null)
reader.setInterruptFlag(iflag);
@@ -583,10 +582,10 @@ public class InMemoryMap {
iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env));
}
}
-
+
return iter;
}
-
+
@Override
public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag);
@@ -596,36 +595,36 @@ public class InMemoryMap {
public void setInterruptFlag(AtomicBoolean flag) {
this.iflag = flag;
}
-
+
}
-
+
public class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
-
+
private AtomicBoolean closed;
private SourceSwitchingIterator ssi;
private MemoryDataSource mds;
-
+
protected SortedKeyValueIterator<Key,Value> getSource() {
if (closed.get())
throw new IllegalStateException("Memory iterator is closed");
return super.getSource();
}
-
+
private MemoryIterator(InterruptibleIterator source) {
this(source, new AtomicBoolean(false));
}
-
+
private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) {
setSource(source);
this.closed = closed;
}
-
+
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
return new MemoryIterator(getSource().deepCopy(env), closed);
}
-
+
public void close() {
-
+
synchronized (this) {
if (closed.compareAndSet(false, true)) {
try {
@@ -636,41 +635,41 @@ public class InMemoryMap {
}
}
}
-
+
// remove outside of sync to avoid deadlock
activeIters.remove(this);
}
-
+
private synchronized boolean switchNow() throws IOException {
if (closed.get())
return false;
-
+
ssi.switchNow();
return true;
}
-
+
@Override
public void setInterruptFlag(AtomicBoolean flag) {
((InterruptibleIterator) getSource()).setInterruptFlag(flag);
}
-
+
private void setSSI(SourceSwitchingIterator ssi) {
this.ssi = ssi;
}
-
+
public void setMDS(MemoryDataSource mds) {
this.mds = mds;
}
-
+
}
-
+
public synchronized MemoryIterator skvIterator() {
if (map == null)
throw new NullPointerException();
-
+
if (deleted)
throw new IllegalStateException("Can not obtain iterator after map deleted");
-
+
int mc = kvCount.get();
MemoryDataSource mds = new MemoryDataSource();
SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
@@ -680,94 +679,93 @@ public class InMemoryMap {
activeIters.add(mi);
return mi;
}
-
+
public SortedKeyValueIterator<Key,Value> compactionIterator() {
-
+
if (nextKVCount.get() - 1 != kvCount.get())
- throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = "
- + kvCount.get());
-
+ throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = " + kvCount.get());
+
return map.skvIterator();
}
-
+
private boolean deleted = false;
-
+
public void delete(long waitTime) {
-
+
synchronized (this) {
if (deleted)
throw new IllegalStateException("Double delete");
-
+
deleted = true;
}
-
+
long t1 = System.currentTimeMillis();
-
+
while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) {
UtilWaitThread.sleep(50);
}
-
+
if (activeIters.size() > 0) {
// dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file
try {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = FileSystem.getLocal(conf);
-
+
String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
-
+
Configuration newConf = new Configuration(conf);
newConf.setInt("io.seqfile.compress.blocksize", 100000);
-
+
FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, SiteConfiguration.getInstance());
-
+
InterruptibleIterator iter = map.skvIterator();
-
- HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
-
- for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
+
+ HashSet<ByteSequence> allfams = new HashSet<ByteSequence>();
+
+ for (Entry<String,Set<ByteSequence>> entry : lggroups.entrySet()) {
allfams.addAll(entry.getValue());
out.startNewLocalityGroup(entry.getKey(), entry.getValue());
iter.seek(new Range(), entry.getValue(), true);
dumpLocalityGroup(out, iter);
}
-
+
out.startDefaultLocalityGroup();
iter.seek(new Range(), allfams, false);
-
+
dumpLocalityGroup(out, iter);
-
+
out.close();
-
+
log.debug("Created mem dump file " + tmpFile);
-
+
memDumpFile = tmpFile;
-
+
synchronized (activeIters) {
for (MemoryIterator mi : activeIters) {
mi.switchNow();
}
}
-
+
// rely on unix behavior that file will be deleted when last
// reader closes it
fs.delete(new Path(memDumpFile), true);
-
+
} catch (IOException ioe) {
log.error("Failed to create mem dump file ", ioe);
-
+
while (activeIters.size() > 0) {
UtilWaitThread.sleep(100);
}
}
-
+
}
-
+
SimpleMap tmpMap = map;
-
+
synchronized (this) {
map = null;
}
-
+
tmpMap.delete();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
index 4bc8891..443ffb2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
@@ -23,55 +23,55 @@ import java.io.IOException;
import org.apache.accumulo.core.data.Key;
class MemKey extends Key {
-
+
int kvCount;
-
+
public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) {
super(row, cf, cq, cv, ts, del, copy);
this.kvCount = mc;
}
-
+
public MemKey() {
super();
this.kvCount = Integer.MAX_VALUE;
}
-
+
public MemKey(Key key, int mc) {
super(key);
this.kvCount = mc;
}
-
+
public String toString() {
return super.toString() + " mc=" + kvCount;
}
-
+
@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
-
+
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(kvCount);
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
kvCount = in.readInt();
}
-
+
@Override
public int compareTo(Key k) {
-
+
int cmp = super.compareTo(k);
-
+
if (cmp == 0 && k instanceof MemKey) {
cmp = ((MemKey) k).kvCount - kvCount;
}
-
+
return cmp;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
index f1fdde4..0ce3b9e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import org.apache.accumulo.core.data.Value;
/**
- *
+ *
*/
public class MemValue extends Value {
int kvCount;
boolean merged = false;
-
+
/**
* @param value
* Value
@@ -38,17 +38,17 @@ public class MemValue extends Value {
super(value);
this.kvCount = kv;
}
-
+
public MemValue() {
super();
this.kvCount = Integer.MAX_VALUE;
}
-
+
public MemValue(Value value, int kv) {
super(value);
this.kvCount = kv;
}
-
+
// Override
@Override
public void write(final DataOutput out) throws IOException {
@@ -64,7 +64,7 @@ public class MemValue extends Value {
}
super.write(out);
}
-
+
@Override
public void set(final byte[] b) {
super.set(b);
@@ -76,16 +76,16 @@ public class MemValue extends Value {
super.copy(b);
merged = false;
}
-
+
/**
* Takes a Value and will take out the embedded kvCount, and then return that value while replacing the Value with the original unembedded version
- *
+ *
* @return The kvCount embedded in v.
*/
public static int splitKVCount(Value v) {
if (v instanceof MemValue)
return ((MemValue) v).kvCount;
-
+
byte[] originalBytes = new byte[v.getSize() - 4];
byte[] combined = v.get();
System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java
index 82c791c..62c6ec6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java
@@ -18,4 +18,4 @@ package org.apache.accumulo.tserver;
public enum MinorCompactionReason {
USER, SYSTEM, CLOSE, RECOVERY
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
index 5ee1952..76061e6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
@@ -18,8 +18,8 @@ package org.apache.accumulo.tserver;
import java.util.List;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.data.Mutation;
public class Mutations {
private final Durability durability;
@@ -29,10 +29,12 @@ public class Mutations {
this.durability = durability;
this.mutations = mutations;
}
+
public Durability getDurability() {
return durability;
}
+
public List<Mutation> getMutations() {
return mutations;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
index e22a54f..b330d1e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
@@ -48,11 +48,11 @@ import org.apache.log4j.Logger;
/**
* This class stores data in a C++ map. Doing this allows us to store more in memory and avoid pauses caused by Java GC.
- *
+ *
* The strategy for dealing with native memory allocated for the native map is that java code using the native map should call delete() as soon as it is
* finished using the native map. When the NativeMap object is garbage collected its native resources will be released if needed. However waiting for java GC
* would be a mistake for long lived NativeMaps. Long lived objects are not garbage collected quickly, therefore a process could easily use too much memory.
- *
+ *
*/
public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
@@ -92,7 +92,7 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
* If native libraries are not loaded, the specified search path will be used to attempt to load them. Directories will be searched by using the
* system-specific library naming conventions. A path directly to a file can also be provided. Loading will continue until the search path is exhausted, or
* until the native libraries are found and successfully loaded, whichever occurs first.
- *
+ *
* @param searchPath
* a list of files and directories to search
*/
@@ -116,7 +116,7 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
/**
* Check if native libraries are loaded.
- *
+ *
* @return true if they are loaded; false otherwise
*/
public static boolean isLoaded() {
@@ -360,10 +360,10 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
/**
* The strategy for dealing with native memory allocated for iterators is to simply delete that memory when this Java Object is garbage collected.
- *
+ *
* These iterators are likely short lived object and therefore will be quickly garbage collected. Even if the objects are long lived and therefore more
* slowly garbage collected they only hold a small amount of native memory.
- *
+ *
*/
private long nmiPointer;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java
index 1b22f05..babd629 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java
@@ -30,60 +30,60 @@ import org.apache.accumulo.tserver.ConditionalMutationSet.DeferFilter;
import org.apache.accumulo.tserver.data.ServerConditionalMutation;
/**
- *
+ *
*/
class RowLocks {
-
+
private Map<ByteSequence,RowLock> rowLocks = new HashMap<ByteSequence,RowLock>();
-
+
static class RowLock {
ReentrantLock rlock;
int count;
ByteSequence rowSeq;
-
+
RowLock(ReentrantLock rlock, ByteSequence rowSeq) {
this.rlock = rlock;
this.count = 0;
this.rowSeq = rowSeq;
}
-
+
public boolean tryLock() {
return rlock.tryLock();
}
-
+
public void lock() {
rlock.lock();
}
-
+
public void unlock() {
rlock.unlock();
}
}
-
+
private RowLock getRowLock(ArrayByteSequence rowSeq) {
- RowLock lock = rowLocks.get(rowSeq);
- if (lock == null) {
- lock = new RowLock(new ReentrantLock(), rowSeq);
- rowLocks.put(rowSeq, lock);
- }
-
- lock.count++;
- return lock;
+ RowLock lock = rowLocks.get(rowSeq);
+ if (lock == null) {
+ lock = new RowLock(new ReentrantLock(), rowSeq);
+ rowLocks.put(rowSeq, lock);
+ }
+
+ lock.count++;
+ return lock;
}
-
+
private void returnRowLock(RowLock lock) {
- if (lock.count == 0)
- throw new IllegalStateException();
- lock.count--;
-
- if (lock.count == 0) {
- rowLocks.remove(lock.rowSeq);
- }
+ if (lock.count == 0)
+ throw new IllegalStateException();
+ lock.count--;
+
+ if (lock.count == 0) {
+ rowLocks.remove(lock.rowSeq);
+ }
}
-
+
List<RowLock> acquireRowlocks(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
ArrayList<RowLock> locks = new ArrayList<RowLock>();
-
+
// assume that mutations are in sorted order to avoid deadlock
synchronized (rowLocks) {
for (List<ServerConditionalMutation> scml : updates.values()) {
@@ -92,7 +92,7 @@ class RowLocks {
}
}
}
-
+
HashSet<ByteSequence> rowsNotLocked = null;
// acquire as many locks as possible, not blocking on rows that are already locked
@@ -108,9 +108,9 @@ class RowLocks {
// if there is only one lock, then wait for it
locks.get(0).lock();
}
-
+
if (rowsNotLocked != null) {
-
+
final HashSet<ByteSequence> rnlf = rowsNotLocked;
// assume will get locks needed, do something expensive otherwise
ConditionalMutationSet.defer(updates, deferred, new DeferFilter() {
@@ -121,11 +121,11 @@ class RowLocks {
deferred.add(scm);
else
okMutations.add(scm);
-
+
}
}
});
-
+
ArrayList<RowLock> filteredLocks = new ArrayList<RowLock>();
ArrayList<RowLock> locksToReturn = new ArrayList<RowLock>();
for (RowLock rowLock : locks) {
@@ -135,7 +135,7 @@ class RowLocks {
filteredLocks.add(rowLock);
}
}
-
+
synchronized (rowLocks) {
for (RowLock rowLock : locksToReturn) {
returnRowLock(rowLock);
@@ -146,12 +146,12 @@ class RowLocks {
}
return locks;
}
-
+
void releaseRowLocks(List<RowLock> locks) {
for (RowLock rowLock : locks) {
rowLock.unlock();
}
-
+
synchronized (rowLocks) {
for (RowLock rowLock : locks) {
returnRowLock(rowLock);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java
index 83fc43e..5f121cc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java
@@ -51,4 +51,4 @@ public class TConstraintViolationException extends Exception {
CommitSession getCommitSession() {
return commitSession;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java
index 8bdf08b..5705c9e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java
@@ -19,18 +19,18 @@ package org.apache.accumulo.tserver;
import org.apache.log4j.Level;
public class TLevel extends Level {
-
+
private static final long serialVersionUID = 1L;
public final static Level TABLET_HIST = new TLevel();
-
+
protected TLevel() {
super(Level.DEBUG_INT + 100, "TABLET_HIST", Level.DEBUG_INT + 100);
}
-
+
static public Level toLevel(int val) {
if (val == Level.DEBUG_INT + 100)
return Level.DEBUG;
return Level.toLevel(val);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
index d1fece5..e7477b9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
@@ -34,73 +34,73 @@ import org.apache.accumulo.tserver.FileManager.ScanFileManager;
import org.apache.hadoop.fs.Path;
public class TabletIteratorEnvironment implements IteratorEnvironment {
-
+
private final ScanFileManager trm;
private final IteratorScope scope;
private final boolean fullMajorCompaction;
private final AccumuloConfiguration config;
private final ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
private Map<FileRef,DataFileValue> files;
-
+
public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) {
if (scope == IteratorScope.majc)
throw new IllegalArgumentException("must set if compaction is full");
-
+
this.scope = scope;
this.trm = null;
this.config = config;
this.fullMajorCompaction = false;
}
-
+
public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files) {
if (scope == IteratorScope.majc)
throw new IllegalArgumentException("must set if compaction is full");
-
+
this.scope = scope;
this.trm = trm;
this.config = config;
this.fullMajorCompaction = false;
this.files = files;
}
-
+
public TabletIteratorEnvironment(IteratorScope scope, boolean fullMajC, AccumuloConfiguration config) {
if (scope != IteratorScope.majc)
throw new IllegalArgumentException("Tried to set maj compaction type when scope was " + scope);
-
+
this.scope = scope;
this.trm = null;
this.config = config;
this.fullMajorCompaction = fullMajC;
}
-
+
@Override
public AccumuloConfiguration getConfig() {
return config;
}
-
+
@Override
public IteratorScope getIteratorScope() {
return scope;
}
-
+
@Override
public boolean isFullMajorCompaction() {
if (scope != IteratorScope.majc)
throw new IllegalStateException("Asked about major compaction type when scope is " + scope);
return fullMajorCompaction;
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
FileRef ref = new FileRef(mapFileName, new Path(mapFileName));
return trm.openFiles(Collections.singletonMap(ref, files.get(ref)), false).get(0);
}
-
+
@Override
public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
topLevelIterators.add(iter);
}
-
+
public SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
if (topLevelIterators.isEmpty())
return iter;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
index 9cc07dc..21734f9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
@@ -18,8 +18,8 @@ package org.apache.accumulo.tserver;
import java.util.List;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.data.Mutation;
public class TabletMutations {
private final int tid;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index d5c1d2f..2bfa5a0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -349,7 +349,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
private final SessionManager sessionManager;
-
private final WriteTracker writeTracker = new WriteTracker();
private final RowLocks rowLocks = new RowLocks();
@@ -2122,16 +2121,17 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
locationToOpen = VolumeUtil.switchRootTabletVolume(extent, locationToOpen);
tablet = new Tablet(TabletServer.this, extent, locationToOpen, trm, tabletsKeyValues);
- /* @formatter:off
- * If a minor compaction starts after a tablet opens, this indicates a log recovery occurred. This recovered data must be minor compacted.
+ /*
+ * @formatter:off If a minor compaction starts after a tablet opens, this indicates a log recovery occurred. This recovered data must be minor
+ * compacted.
*
* There are three reasons to wait for this minor compaction to finish before placing the tablet in online tablets.
*
- * 1) The log recovery code does not handle data written to the tablet on multiple tablet servers.
- * 2) The log recovery code does not block if memory is full. Therefore recovering lots of tablets that use a lot of memory could run out of memory.
- * 3) The minor compaction finish event did not make it to the logs (the file will be in metadata, preventing replay of compacted data)...
- * but do not want a majc to wipe the file out from metadata and then have another process failure...
- * this could cause duplicate data to replay.
+ * 1) The log recovery code does not handle data written to the tablet on multiple tablet servers. 2) The log recovery code does not block if memory is
+ * full. Therefore recovering lots of tablets that use a lot of memory could run out of memory. 3) The minor compaction finish event did not make it to
+ * the logs (the file will be in metadata, preventing replay of compacted data)... but do not want a majc to wipe the file out from metadata and then
+ * have another process failure... this could cause duplicate data to replay.
+ *
* @formatter:on
*/
if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index bb9d427..351d526 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -420,7 +420,7 @@ public class TabletServerResourceManager {
if (!tablet.initiateMinorCompaction(MinorCompactionReason.SYSTEM)) {
if (tablet.isClosed()) {
// attempt to remove it from the current reports if still there
- synchronized(tabletReports) {
+ synchronized (tabletReports) {
TabletStateImpl latestReport = tabletReports.remove(keyExtent);
if (latestReport != null) {
if (latestReport.getTablet() != tablet) {
@@ -644,8 +644,6 @@ public class TabletServerResourceManager {
}
}
-
-
// END methods that Tablets call to make decisions about major compaction
// tablets call this method to run minor compactions,
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
index 40906df..1e2cdf4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
@@ -21,18 +21,18 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.server.util.ActionStatsUpdator;
public class TabletStatsKeeper {
-
+
// suspect we need more synchronization in this class
private ActionStats major = new ActionStats();
private ActionStats minor = new ActionStats();
private ActionStats split = new ActionStats();
-
+
public enum Operation {
MAJOR, SPLIT, MINOR
}
-
+
private ActionStats[] map = new ActionStats[] {major, split, minor};
-
+
public void updateTime(Operation operation, long queued, long start, long count, boolean failed) {
try {
ActionStats data = map[operation.ordinal()];
@@ -42,7 +42,7 @@ public class TabletStatsKeeper {
} else {
double t = (System.currentTimeMillis() - start) / 1000.0;
double q = (start - queued) / 1000.0;
-
+
data.status--;
data.count += count;
data.num++;
@@ -56,9 +56,9 @@ public class TabletStatsKeeper {
} catch (Exception E) {
resetTimes();
}
-
+
}
-
+
public void updateTime(Operation operation, long start, long count, boolean failed) {
try {
ActionStats data = map[operation.ordinal()];
@@ -67,52 +67,52 @@ public class TabletStatsKeeper {
data.status--;
} else {
double t = (System.currentTimeMillis() - start) / 1000.0;
-
+
data.status--;
data.num++;
data.elapsed += t;
data.sumDev += t * t;
-
+
if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0)
resetTimes();
}
} catch (Exception E) {
resetTimes();
}
-
+
}
-
+
public void saveMajorMinorTimes(TabletStats t) {
ActionStatsUpdator.update(minor, t.minors);
ActionStatsUpdator.update(major, t.majors);
}
-
+
public void saveMinorTimes(TabletStatsKeeper t) {
ActionStatsUpdator.update(minor, t.minor);
}
-
+
public void saveMajorTimes(TabletStatsKeeper t) {
ActionStatsUpdator.update(major, t.major);
}
-
+
public void resetTimes() {
major = new ActionStats();
split = new ActionStats();
minor = new ActionStats();
}
-
+
public void incrementStatusMinor() {
minor.status++;
}
-
+
public void incrementStatusMajor() {
major.status++;
}
-
+
public void incrementStatusSplit() {
split.status++;
}
-
+
public TabletStats getTabletStats() {
return new TabletStats(null, major, minor, split, 0, 0, 0, 0);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
index 98c7a02..026f7e2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
@@ -19,9 +19,9 @@ package org.apache.accumulo.tserver;
import java.io.IOException;
public class TooManyFilesException extends IOException {
-
+
private static final long serialVersionUID = 1L;
-
+
public TooManyFilesException(String msg) {
super(msg);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java
index dbb67a9..edaca31 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java
@@ -80,4 +80,4 @@ public class TservConstraintEnv implements Environment {
}
};
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
index 84b5cd0..9bf80b4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
@@ -30,13 +30,13 @@ import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.log4j.Logger;
/**
- * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
- * are monotonically increasing.
+ * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids are
+ * monotonically increasing.
*
*/
class WriteTracker {
private static final Logger log = Logger.getLogger(WriteTracker.class);
-
+
private static final AtomicLong operationCounter = new AtomicLong(1);
private final Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
@@ -93,4 +93,4 @@ class WriteTracker {
return startWrite(TabletType.type(extents));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java
index 75c6bd8..8f98761 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java
@@ -22,9 +22,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import com.google.common.collect.Sets;
import org.apache.accumulo.server.fs.FileRef;
+import com.google.common.collect.Sets;
+
/**
* A plan for a compaction: the input files, the files that are *not* inputs to a compaction that should simply be deleted, and the optional parameters used to
* create the resulting output file.
@@ -59,7 +60,7 @@ public class CompactionPlan {
/**
* Validate compaction plan.
- *
+ *
* @param allFiles
* All possible files
* @throws IllegalStateException
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
index 2d94884..40cb604 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
@@ -42,10 +42,10 @@ public abstract class CompactionStrategy {
* Determine if this tablet is eligible for a major compaction. It's ok if it later determines (through {@link #gatherInformation(MajorCompactionRequest)} and
* {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when
* {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called.
- *
+ *
* <P>
* Called while holding the tablet lock, so it should not be doing any blocking.
- *
+ *
* <P>
* Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be
* easily removed.
@@ -55,7 +55,7 @@ public abstract class CompactionStrategy {
/**
* Called prior to obtaining the tablet lock, useful for examining metadata or indexes. State collected during this method will be available during the call
* the {@link #getCompactionPlan(MajorCompactionRequest)}.
- *
+ *
* @param request
* basic details about the tablet
*/
@@ -63,11 +63,11 @@ public abstract class CompactionStrategy {
/**
* Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking.
- *
+ *
* <P>
* Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be
* easily removed.
- *
+ *
* @param request
* basic details about the tablet
* @return the plan for a major compaction, or null to cancel the compaction.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
index 6d4dc79..6cc9025 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.server.fs.FileRef;
/**
- *
+ *
*/
public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy {
public static final String SIZE_LIMIT_OPT = "sizeLimit";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java
index 7ea1388..fd9e521 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java
@@ -36,10 +36,10 @@ import org.apache.log4j.Logger;
import com.google.common.annotations.VisibleForTesting;
public class ConstraintChecker {
-
+
private ArrayList<Constraint> constrains;
private static final Logger log = Logger.getLogger(ConstraintChecker.class);
-
+
private ClassLoader loader;
private TableConfiguration conf;
@@ -47,7 +47,7 @@ public class ConstraintChecker {
public ConstraintChecker(TableConfiguration conf) {
constrains = new ArrayList<Constraint>();
-
+
this.conf = conf;
try {
@@ -58,7 +58,7 @@ public class ConstraintChecker {
} else {
loader = AccumuloVFSClassLoader.getClassLoader();
}
-
+
for (Entry<String,String> entry : conf) {
if (entry.getKey().startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())) {
String className = entry.getValue();
@@ -67,7 +67,7 @@ public class ConstraintChecker {
constrains.add(clazz.newInstance());
}
}
-
+
lastCheck.set(System.currentTimeMillis());
} catch (Throwable e) {
@@ -84,21 +84,21 @@ public class ConstraintChecker {
}
public boolean classLoaderChanged() {
-
+
if (constrains.size() == 0)
return false;
try {
String context = conf.get(Property.TABLE_CLASSPATH);
-
+
ClassLoader currentLoader;
-
+
if (context != null && !context.equals("")) {
currentLoader = AccumuloVFSClassLoader.getContextManager().getClassLoader(context);
} else {
currentLoader = AccumuloVFSClassLoader.getClassLoader();
}
-
+
return currentLoader != loader;
} catch (Exception e) {
log.debug("Failed to check " + e.getMessage());
@@ -117,10 +117,10 @@ public class ConstraintChecker {
public Violations check(Environment env, Mutation m) {
if (!env.getExtent().contains(new ComparableBytes(m.getRow()))) {
Violations violations = new Violations();
-
+
ConstraintViolationSummary cvs = new ConstraintViolationSummary(SystemConstraint.class.getName(), (short) -1, "Mutation outside of tablet extent", 1);
violations.add(cvs);
-
+
// do not bother with further checks since this mutation does not go with this tablet
return violations;
}
@@ -133,8 +133,7 @@ public class ConstraintChecker {
if (violationCodes != null) {
String className = constraint.getClass().getName();
for (Short vcode : violationCodes) {
- violations = addViolation(violations, new ConstraintViolationSummary(
- className, vcode, constraint.getViolationDescription(vcode), 1));
+ violations = addViolation(violations, new ConstraintViolationSummary(className, vcode, constraint.getViolationDescription(vcode), 1));
}
}
} catch (Throwable throwable) {
@@ -161,8 +160,7 @@ public class ConstraintChecker {
msg = "threw some Exception";
}
- violations = addViolation(violations, new ConstraintViolationSummary(
- constraint.getClass().getName(), vcode, "CONSTRAINT FAILED : " + msg, 1));
+ violations = addViolation(violations, new ConstraintViolationSummary(constraint.getClass().getName(), vcode, "CONSTRAINT FAILED : " + msg, 1));
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java
index cf0c176..64bc2cd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java
@@ -23,21 +23,21 @@ import org.apache.accumulo.core.constraints.Constraint;
import org.apache.accumulo.core.data.Mutation;
public class UnsatisfiableConstraint implements Constraint {
-
+
private List<Short> violations;
private String vDesc;
-
+
public UnsatisfiableConstraint(short vcode, String violationDescription) {
this.violations = Collections.unmodifiableList(Collections.singletonList(vcode));
this.vDesc = violationDescription;
}
-
+
public List<Short> check(Environment env, Mutation mutation) {
return violations;
}
-
+
public String getViolationDescription(short violationCode) {
return vDesc;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java
index 975300b..84137cc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java
@@ -24,22 +24,22 @@ import org.apache.accumulo.core.data.thrift.TConditionalMutation;
import org.apache.accumulo.server.data.ServerMutation;
/**
- *
+ *
*/
public class ServerConditionalMutation extends ServerMutation {
-
+
public static class TCMTranslator extends Translator<TConditionalMutation,ServerConditionalMutation> {
@Override
public ServerConditionalMutation translate(TConditionalMutation input) {
return new ServerConditionalMutation(input);
}
}
-
+
public static final TCMTranslator TCMT = new TCMTranslator();
private long cmid;
private List<TCondition> conditions;
-
+
public ServerConditionalMutation(TConditionalMutation input) {
super(input.mutation);
@@ -50,10 +50,9 @@ public class ServerConditionalMutation extends ServerMutation {
public long getID() {
return cmid;
}
-
+
public List<TCondition> getConditions() {
return conditions;
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 18aa192..6f9be7d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -152,7 +152,7 @@ public class DfsLogger {
private static final LogFileValue EMPTY = new LogFileValue();
private boolean closed = false;
-
+
private class LogSyncingTask implements Runnable {
@Override
@@ -170,8 +170,7 @@ public class DfsLogger {
workQueue.drainTo(work);
Method durabilityMethod = null;
- loop:
- for (LogWork logWork : work) {
+ loop: for (LogWork logWork : work) {
switch (logWork.durability) {
case DEFAULT:
case NONE:
@@ -287,7 +286,9 @@ public class DfsLogger {
/**
* Reference a pre-existing log file.
- * @param meta the cq for the "log" entry in +r/!0
+ *
+ * @param meta
+ * the cq for the "log" entry in +r/!0
*/
public DfsLogger(ServerResources conf, String filename, String meta) throws IOException {
this.conf = conf;
@@ -387,7 +388,8 @@ public class DfsLogger {
log.debug("DfsLogger.open() begin");
VolumeManager fs = conf.getFileSystem();
- logPath = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.WAL_DIR + Path.SEPARATOR + logger + Path.SEPARATOR + filename;
+ logPath = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.WAL_DIR + Path.SEPARATOR + logger
+ + Path.SEPARATOR + filename;
metaReference = toString();
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java
index 10bc903..2658c1f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableName;
import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.WritableName;
import org.apache.log4j.Logger;
import com.beust.jcommander.JCommander;
@@ -50,10 +50,10 @@ import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings("deprecation")
public class LocalWALRecovery implements Runnable {
private static final Logger log = Logger.getLogger(LocalWALRecovery.class);
-
- static {
- WritableName.addName(LogFileKey.class, org.apache.accumulo.server.logger.LogFileKey.class.getName());
- WritableName.addName(LogFileValue.class, org.apache.accumulo.server.logger.LogFileValue.class.getName());
+
+ static {
+ WritableName.addName(LogFileKey.class, org.apache.accumulo.server.logger.LogFileKey.class.getName());
+ WritableName.addName(LogFileValue.class, org.apache.accumulo.server.logger.LogFileValue.class.getName());
}
public static void main(String[] args) throws IOException {
@@ -150,7 +150,7 @@ public class LocalWALRecovery implements Runnable {
Path localWal = new Path(file.toURI());
FileSystem localFs = FileSystem.getLocal(fs.getConf());
-
+
Reader reader = new SequenceFile.Reader(localFs, localWal, localFs.getConf());
// Reader reader = new SequenceFile.Reader(localFs.getConf(), SequenceFile.Reader.file(localWal));
Path tmp = new Path(options.destination + "/" + name + ".copy");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index 2c6d415..405ec70 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -39,21 +39,25 @@ import org.apache.log4j.Logger;
/**
* Extract Mutations for a tablet from a set of logs that have been sorted by operation and tablet.
- *
+ *
*/
public class SortedLogRecovery {
private static final Logger log = Logger.getLogger(SortedLogRecovery.class);
-
+
static class EmptyMapFileException extends Exception {
private static final long serialVersionUID = 1L;
- public EmptyMapFileException() { super(); }
+ public EmptyMapFileException() {
+ super();
+ }
}
static class UnusedException extends Exception {
private static final long serialVersionUID = 1L;
- public UnusedException() { super(); }
+ public UnusedException() {
+ super();
+ }
}
private VolumeManager fs;
@@ -61,28 +65,28 @@ public class SortedLogRecovery {
public SortedLogRecovery(VolumeManager fs) {
this.fs = fs;
}
-
+
private enum Status {
INITIAL, LOOKING_FOR_FINISH, COMPLETE
};
-
+
private static class LastStartToFinish {
long lastStart = -1;
long seq = -1;
long lastFinish = -1;
Status compactionStatus = Status.INITIAL;
String tserverSession = "";
-
+
private void update(long newFinish) {
this.seq = this.lastStart;
if (newFinish != -1)
lastFinish = newFinish;
}
-
+
private void update(int newStartFile, long newStart) {
this.lastStart = newStart;
}
-
+
private void update(String newSession) {
this.lastStart = -1;
this.lastFinish = -1;
@@ -90,7 +94,7 @@ public class SortedLogRecovery {
this.tserverSession = newSession;
}
}
-
+
public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
int[] tids = new int[recoveryLogs.size()];
LastStartToFinish lastStartToFinish = new LastStartToFinish();
@@ -115,12 +119,12 @@ public class SortedLogRecovery {
log.warn("Ignoring error closing file");
}
}
-
+
}
-
+
if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction");
-
+
for (int i = 0; i < recoveryLogs.size(); i++) {
Path logfile = recoveryLogs.get(i);
MultiReader reader = new MultiReader(fs, logfile);
@@ -136,7 +140,7 @@ public class SortedLogRecovery {
log.info("Recovery complete for " + extent + " using " + logfile);
}
}
-
+
private String getPathSuffix(String pathString) {
Path path = new Path(pathString);
if (path.depth() < 2)
@@ -144,7 +148,8 @@ public class SortedLogRecovery {
return path.getParent().getName() + "/" + path.getName();
}
- int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent, Set<String> tabletFiles, LastStartToFinish lastStartToFinish) throws IOException, EmptyMapFileException, UnusedException {
+ int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent, Set<String> tabletFiles, LastStartToFinish lastStartToFinish) throws IOException,
+ EmptyMapFileException, UnusedException {
HashSet<String> suffixes = new HashSet<String>();
for (String path : tabletFiles)
@@ -158,7 +163,7 @@ public class SortedLogRecovery {
throw new EmptyMapFileException();
if (key.event != OPEN)
throw new RuntimeException("First log entry value is not OPEN");
-
+
if (key.tserverSession.compareTo(lastStartToFinish.tserverSession) != 0) {
if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) is not followed by a successful minor compaction.");
@@ -168,9 +173,9 @@ public class SortedLogRecovery {
if (extent.isRootTablet()) {
alternative = RootTable.OLD_EXTENT;
}
-
+
LogFileKey defineKey = null;
-
+
// find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id
// for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to
while (reader.next(key, value)) {
@@ -188,9 +193,9 @@ public class SortedLogRecovery {
if (tid < 0) {
throw new UnusedException();
}
-
+
log.debug("Found tid, seq " + tid + " " + defineKey.seq);
-
+
// Scan start/stop events for this tablet
key = defineKey;
key.event = COMPACTION_START;
@@ -205,7 +210,7 @@ public class SortedLogRecovery {
if (key.seq <= lastStartToFinish.lastStart)
throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
lastStartToFinish.update(fileno, key.seq);
-
+
// Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
log.debug("minor compaction into " + key.filename + " finished, but was still in the METADATA");
if (suffixes.contains(getPathSuffix(key.filename)))
@@ -225,11 +230,11 @@ public class SortedLogRecovery {
}
return tid;
}
-
+
private void playbackMutations(MultiReader reader, int tid, LastStartToFinish lastStartToFinish, MutationReceiver mr) throws IOException {
LogFileKey key = new LogFileKey();
LogFileValue value = new LogFileValue();
-
+
// Playback mutations after the last stop to finish
log.info("Scanning for mutations starting at sequence number " + lastStartToFinish.seq + " for tid " + tid);
key.event = MUTATION;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index ceb76da..5c3fc2d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -90,7 +90,7 @@ public class TabletServerLogger {
private final AtomicLong syncCounter;
private final AtomicLong flushCounter;
-
+
private final static int HALT_AFTER_ERROR_COUNT = 5;
// Die if we get 5 WAL creation errors in 10 seconds
private final Cache<Long,Object> walErrors = CacheBuilder.newBuilder().maximumSize(HALT_AFTER_ERROR_COUNT).expireAfterWrite(10, TimeUnit.SECONDS).build();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
index 3a20e8d..829cf2f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.KeyExtent;
import org.apache.hadoop.io.WritableComparable;
public class LogFileKey implements WritableComparable<LogFileKey> {
-
+
public LogEvents event;
public String filename = null;
public KeyExtent tablet = null;
@@ -37,7 +37,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
public int tid = -1;
public static final int VERSION = 2;
public String tserverSession;
-
+
@Override
public void readFields(DataInput in) throws IOException {
int value = in.readByte();
@@ -79,9 +79,9 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
default:
throw new RuntimeException("Unknown log event type: " + event);
}
-
+
}
-
+
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(event.ordinal());
@@ -119,7 +119,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
throw new IllegalArgumentException("Bad value for LogFileEntry type");
}
}
-
+
static int eventType(LogEvents event) {
// Order logs by START, TABLET_DEFINITIONS, COMPACTIONS and then MUTATIONS
if (event == MUTATION || event == MANY_MUTATIONS) {
@@ -133,7 +133,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
}
return 2;
}
-
+
private static int sign(long l) {
if (l < 0)
return -1;
@@ -141,7 +141,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
return 1;
return 0;
}
-
+
@Override
public int compareTo(LogFileKey o) {
if (eventType(this.event) != eventType(o.event)) {
@@ -154,7 +154,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
}
return sign(this.seq - o.seq);
}
-
+
@Override
public boolean equals(Object obj) {
if (obj instanceof LogFileKey) {
@@ -162,16 +162,16 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
}
return false;
}
-
+
@Override
public int hashCode() {
return (int) seq;
}
-
+
public static void printEntry(LogFileKey entry) {
System.out.println(entry.toString());
}
-
+
@Override
public String toString() {
switch (event) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
index 81cd593..9ca0f38 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
@@ -31,11 +31,11 @@ import org.apache.accumulo.server.data.ServerMutation;
import org.apache.hadoop.io.Writable;
public class LogFileValue implements Writable {
-
+
private static final List<Mutation> empty = Collections.emptyList();
-
+
public List<Mutation> mutations = empty;
-
+
@Override
public void readFields(DataInput in) throws IOException {
int count = in.readInt();
@@ -46,7 +46,7 @@ public class LogFileValue implements Writable {
mutations.add(mutation);
}
}
-
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(mutations.size());
@@ -54,18 +54,18 @@ public class LogFileValue implements Writable {
m.write(out);
}
}
-
+
public static void print(LogFileValue value) {
System.out.println(value.toString());
}
-
+
private static String displayLabels(byte[] labels) {
String s = new String(labels, UTF_8);
s = s.replace("&", " & ");
s = s.replace("|", " | ");
return s;
}
-
+
public static String format(LogFileValue lfv, int maxMutations) {
if (lfv.mutations.size() == 0)
return "";
@@ -80,18 +80,17 @@ public class LogFileValue implements Writable {
builder.append(" ").append(new String(m.getRow(), UTF_8)).append("\n");
for (ColumnUpdate update : m.getUpdates()) {
String value = new String(update.getValue());
- builder.append(" ").append(new String(update.getColumnFamily(), UTF_8)).append(":")
- .append(new String(update.getColumnQualifier(), UTF_8)).append(" ").append(update.hasTimestamp() ? "[user]:" : "[system]:")
- .append(update.getTimestamp()).append(" [").append(displayLabels(update.getColumnVisibility())).append("] ")
- .append(update.isDeleted() ? "<deleted>" : value).append("\n");
+ builder.append(" ").append(new String(update.getColumnFamily(), UTF_8)).append(":").append(new String(update.getColumnQualifier(), UTF_8))
+ .append(" ").append(update.hasTimestamp() ? "[user]:" : "[system]:").append(update.getTimestamp()).append(" [")
+ .append(displayLabels(update.getColumnVisibility())).append("] ").append(update.isDeleted() ? "<deleted>" : value).append("\n");
}
}
return builder.toString();
}
-
+
@Override
public String toString() {
return format(this, 5);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java
index ecca3ce..ab71794 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.thrift.TException;
public interface MasterMessage {
-
+
void send(TCredentials info, String serverName, MasterClientService.Iface client) throws TException, ThriftSecurityException;
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java
index 5d61c9c..0c93a86 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java
@@ -20,8 +20,8 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.accumulo.core.client.impl.Translator;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.master.thrift.TabletSplit;
@@ -33,24 +33,24 @@ import org.apache.thrift.TException;
public class SplitReportMessage implements MasterMessage {
Map<KeyExtent,Text> extents;
KeyExtent old_extent;
-
+
public SplitReportMessage(KeyExtent old_extent, Map<KeyExtent,Text> newExtents) {
this.old_extent = old_extent;
extents = new TreeMap<KeyExtent,Text>(newExtents);
}
-
+
public SplitReportMessage(KeyExtent old_extent, KeyExtent ne1, Text np1, KeyExtent ne2, Text np2) {
this.old_extent = old_extent;
extents = new TreeMap<KeyExtent,Text>();
extents.put(ne1, np1);
extents.put(ne2, np2);
}
-
+
public void send(TCredentials credentials, String serverName, MasterClientService.Iface client) throws TException, ThriftSecurityException {
TabletSplit split = new TabletSplit();
split.oldTablet = old_extent.toThrift();
split.newTablets = Translator.translate(extents.keySet(), Translators.KET);
client.reportSplitExtent(Tracer.traceInfo(), credentials, serverName, split);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java
index 655414d..25db744 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java
@@ -18,22 +18,22 @@ package org.apache.accumulo.tserver.mastermessage;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.thrift.TException;
public class TabletStatusMessage implements MasterMessage {
-
+
private KeyExtent extent;
private TabletLoadState status;
-
+
public TabletStatusMessage(TabletLoadState status, KeyExtent extent) {
this.extent = extent;
this.status = status;
}
-
+
public void send(TCredentials auth, String serverName, Iface client) throws TException, ThriftSecurityException {
client.reportTabletStatus(Tracer.traceInfo(), auth, serverName, status, extent.toThrift());
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java
index 3b7a637..dc35c28 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java
@@ -17,34 +17,34 @@
package org.apache.accumulo.tserver.metrics;
public interface TabletServerMBean {
-
+
int getOnlineCount();
-
+
int getOpeningCount();
-
+
int getUnopenedCount();
-
+
int getMajorCompactions();
-
+
int getMajorCompactionsQueued();
-
+
int getMinorCompactions();
-
+
int getMinorCompactionsQueued();
-
+
long getEntries();
-
+
long getEntriesInMemory();
-
+
long getQueries();
-
+
long getIngest();
-
+
long getTotalMinorCompactions();
-
+
double getHoldTime();
-
+
String getName();
-
+
double getAverageFilesPerTablet();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 7d6c59e..a07f354 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -234,9 +234,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
}
}
- protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target,
- final Path p, final Status status, final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper)
- throws TTransportException, AccumuloException, AccumuloSecurityException {
+ protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status,
+ final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
+ AccumuloException, AccumuloSecurityException {
DataInputStream input;
try {
input = getRFileInputStream(p);
@@ -280,9 +280,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
}
}
- protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target,
- final Path p, final Status status, final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper)
- throws TTransportException, AccumuloException, AccumuloSecurityException {
+ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status,
+ final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
+ AccumuloException, AccumuloSecurityException {
final Set<Integer> tids;
final DataInputStream input;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
index e2af4df..cc79f31 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
@@ -94,8 +94,7 @@ public class ReplicationServicerHandler implements Iface {
replayer = clz.newInstance();
} catch (InstantiationException | IllegalAccessException e1) {
log.error("Could not instantiate replayer class {}", clz.getName());
- throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class"
- + clz.getName());
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class" + clz.getName());
}
long entriesReplicated;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index 1d20e2b..bd6bcd3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ *
*/
public class ReplicationWorker implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ReplicationWorker.class);