You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/01/09 03:44:49 UTC
[45/66] [abbrv] accumulo git commit: ACCUMULO-3451 Format master
branch (1.7.0-SNAPSHOT)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
index 1d49647..659d877 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
@@ -52,23 +52,23 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
public class TabletLocatorImpl extends TabletLocator {
-
+
private static final Logger log = Logger.getLogger(TabletLocatorImpl.class);
-
+
// there seems to be a bug in TreeMap.tailMap related to
// putting null in the treemap.. therefore instead of
// putting null, put MAX_TEXT
static final Text MAX_TEXT = new Text();
-
+
private static class EndRowComparator implements Comparator<Text>, Serializable {
-
+
private static final long serialVersionUID = 1L;
@Override
public int compare(Text o1, Text o2) {
-
+
int ret;
-
+
if (o1 == MAX_TEXT)
if (o2 == MAX_TEXT)
ret = 0;
@@ -78,21 +78,21 @@ public class TabletLocatorImpl extends TabletLocator {
ret = -1;
else
ret = o1.compareTo(o2);
-
+
return ret;
}
-
+
}
-
+
static final EndRowComparator endRowComparator = new EndRowComparator();
-
+
protected Text tableId;
protected TabletLocator parent;
protected TreeMap<Text,TabletLocation> metaCache = new TreeMap<Text,TabletLocation>(endRowComparator);
protected TabletLocationObtainer locationObtainer;
private TabletServerLockChecker lockChecker;
protected Text lastTabletRow;
-
+
private TreeSet<KeyExtent> badExtents = new TreeSet<KeyExtent>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock rLock = rwLock.readLock();
@@ -104,11 +104,11 @@ public class TabletLocatorImpl extends TabletLocator {
*/
TabletLocations lookupTablet(ClientContext context, TabletLocation src, Text row, Text stopRow, TabletLocator parent) throws AccumuloSecurityException,
AccumuloException;
-
+
List<TabletLocation> lookupTablets(ClientContext context, String tserver, Map<KeyExtent,List<Range>> map, TabletLocator parent)
throws AccumuloSecurityException, AccumuloException;
}
-
+
public static interface TabletServerLockChecker {
boolean isLockHeld(String tserver, String session);
@@ -116,36 +116,36 @@ public class TabletLocatorImpl extends TabletLocator {
}
private class LockCheckerSession {
-
+
private HashSet<Pair<String,String>> okLocks = new HashSet<Pair<String,String>>();
private HashSet<Pair<String,String>> invalidLocks = new HashSet<Pair<String,String>>();
-
+
private TabletLocation checkLock(TabletLocation tl) {
// the goal of this class is to minimize calls out to lockChecker under that assumption that its a resource synchronized among many threads... want to
// avoid fine grained synchronization when binning lots of mutations or ranges... remember decisions from the lockChecker in thread local unsynchronized
// memory
-
+
if (tl == null)
return null;
Pair<String,String> lock = new Pair<String,String>(tl.tablet_location, tl.tablet_session);
-
+
if (okLocks.contains(lock))
return tl;
-
+
if (invalidLocks.contains(lock))
return null;
-
+
if (lockChecker.isLockHeld(tl.tablet_location, tl.tablet_session)) {
okLocks.add(lock);
return tl;
}
-
+
if (log.isTraceEnabled())
log.trace("Tablet server " + tl.tablet_location + " " + tl.tablet_session + " no longer holds its lock");
-
+
invalidLocks.add(lock);
-
+
return null;
}
}
@@ -155,34 +155,34 @@ public class TabletLocatorImpl extends TabletLocator {
this.parent = parent;
this.locationObtainer = tlo;
this.lockChecker = tslc;
-
+
this.lastTabletRow = new Text(tableId);
lastTabletRow.append(new byte[] {'<'}, 0, 1);
}
-
+
@Override
public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-
+
OpTimer opTimer = null;
if (log.isTraceEnabled())
opTimer = new OpTimer(log, Level.TRACE).start("Binning " + mutations.size() + " mutations for table " + tableId);
-
+
ArrayList<T> notInCache = new ArrayList<T>();
Text row = new Text();
-
+
LockCheckerSession lcSession = new LockCheckerSession();
rLock.lock();
try {
processInvalidated(context, lcSession);
-
+
// for this to be efficient rows need to be in sorted order, but always sorting is slow... therefore only sort the
// stuff not in the cache.... it is most efficient to pass _locateTablet rows in sorted order
-
+
// For this to be efficient, need to avoid fine grained synchronization and fine grained logging.
// Therefore methods called by this are not synchronized and should not log.
-
+
for (T mutation : mutations) {
row.set(mutation.getRow());
TabletLocation tl = locateTabletInCache(row);
@@ -192,7 +192,7 @@ public class TabletLocatorImpl extends TabletLocator {
} finally {
rLock.unlock();
}
-
+
if (notInCache.size() > 0) {
Collections.sort(notInCache, new Comparator<Mutation>() {
@Override
@@ -200,7 +200,7 @@ public class TabletLocatorImpl extends TabletLocator {
return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
}
});
-
+
wLock.lock();
try {
boolean failed = false;
@@ -211,11 +211,11 @@ public class TabletLocatorImpl extends TabletLocator {
failures.add(mutation);
continue;
}
-
+
row.set(mutation.getRow());
-
+
TabletLocation tl = _locateTablet(context, row, false, false, false, lcSession);
-
+
if (tl == null || !addMutation(binnedMutations, mutation, tl, lcSession)) {
failures.add(mutation);
failed = true;
@@ -233,7 +233,7 @@ public class TabletLocatorImpl extends TabletLocator {
private <T extends Mutation> boolean addMutation(Map<String,TabletServerMutations<T>> binnedMutations, T mutation, TabletLocation tl,
LockCheckerSession lcSession) {
TabletServerMutations<T> tsm = binnedMutations.get(tl.tablet_location);
-
+
if (tsm == null) {
// do lock check once per tserver here to make binning faster
boolean lockHeld = lcSession.checkLock(tl) != null;
@@ -244,50 +244,50 @@ public class TabletLocatorImpl extends TabletLocator {
return false;
}
}
-
+
// its possible the same tserver could be listed with different sessions
if (tsm.getSession().equals(tl.tablet_session)) {
tsm.addMutation(tl.tablet_extent, mutation);
return true;
}
-
+
return false;
}
-
+
private List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache,
LockCheckerSession lcSession) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
List<Range> failures = new ArrayList<Range>();
List<TabletLocation> tabletLocations = new ArrayList<TabletLocation>();
-
+
boolean lookupFailed = false;
-
+
l1: for (Range range : ranges) {
-
+
tabletLocations.clear();
-
+
Text startRow;
-
+
if (range.getStartKey() != null) {
startRow = range.getStartKey().getRow();
} else
startRow = new Text();
-
+
TabletLocation tl = null;
-
+
if (useCache)
tl = lcSession.checkLock(locateTabletInCache(startRow));
else if (!lookupFailed)
tl = _locateTablet(context, startRow, false, false, false, lcSession);
-
+
if (tl == null) {
failures.add(range);
if (!useCache)
lookupFailed = true;
continue;
}
-
+
tabletLocations.add(tl);
-
+
while (tl.tablet_extent.getEndRow() != null && !range.afterEndKey(new Key(tl.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) {
if (useCache) {
Text row = new Text(tl.tablet_extent.getEndRow());
@@ -296,7 +296,7 @@ public class TabletLocatorImpl extends TabletLocator {
} else {
tl = _locateTablet(context, tl.tablet_extent.getEndRow(), true, false, false, lcSession);
}
-
+
if (tl == null) {
failures.add(range);
if (!useCache)
@@ -309,46 +309,46 @@ public class TabletLocatorImpl extends TabletLocator {
for (TabletLocation tl2 : tabletLocations) {
TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, range);
}
-
+
}
-
+
return failures;
}
-
+
@Override
public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
-
+
/*
* For this to be efficient, need to avoid fine grained synchronization and fine grained logging. Therefore methods called by this are not synchronized and
* should not log.
*/
-
+
OpTimer opTimer = null;
if (log.isTraceEnabled())
opTimer = new OpTimer(log, Level.TRACE).start("Binning " + ranges.size() + " ranges for table " + tableId);
-
+
LockCheckerSession lcSession = new LockCheckerSession();
List<Range> failures;
rLock.lock();
try {
processInvalidated(context, lcSession);
-
+
// for this to be optimal, need to look ranges up in sorted order when
// ranges are not present in cache... however do not want to always
// sort ranges... therefore try binning ranges using only the cache
// and sort whatever fails and retry
-
+
failures = binRanges(context, ranges, binnedRanges, true, lcSession);
} finally {
rLock.unlock();
}
-
+
if (failures.size() > 0) {
// sort failures by range start key
Collections.sort(failures);
-
+
// try lookups again
wLock.lock();
try {
@@ -357,13 +357,13 @@ public class TabletLocatorImpl extends TabletLocator {
wLock.unlock();
}
}
-
+
if (opTimer != null)
opTimer.stop("Binned " + ranges.size() + " ranges for table " + tableId + " to " + binnedRanges.size() + " tservers in %DURATION%");
return failures;
}
-
+
@Override
public void invalidateCache(KeyExtent failedExtent) {
wLock.lock();
@@ -375,7 +375,7 @@ public class TabletLocatorImpl extends TabletLocator {
if (log.isTraceEnabled())
log.trace("Invalidated extent=" + failedExtent);
}
-
+
@Override
public void invalidateCache(Collection<KeyExtent> keySet) {
wLock.lock();
@@ -387,11 +387,11 @@ public class TabletLocatorImpl extends TabletLocator {
if (log.isTraceEnabled())
log.trace("Invalidated " + keySet.size() + " cache entries for table " + tableId);
}
-
+
@Override
public void invalidateCache(Instance instance, String server) {
int invalidatedCount = 0;
-
+
wLock.lock();
try {
for (TabletLocation cacheEntry : metaCache.values())
@@ -402,14 +402,14 @@ public class TabletLocatorImpl extends TabletLocator {
} finally {
wLock.unlock();
}
-
+
lockChecker.invalidateCache(server);
if (log.isTraceEnabled())
log.trace("invalidated " + invalidatedCount + " cache entries table=" + tableId + " server=" + server);
-
+
}
-
+
@Override
public void invalidateCache() {
int invalidatedCount;
@@ -423,18 +423,18 @@ public class TabletLocatorImpl extends TabletLocator {
if (log.isTraceEnabled())
log.trace("invalidated all " + invalidatedCount + " cache entries for table=" + tableId);
}
-
+
@Override
public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
-
+
OpTimer opTimer = null;
if (log.isTraceEnabled())
opTimer = new OpTimer(log, Level.TRACE).start("Locating tablet table=" + tableId + " row=" + TextUtil.truncate(row) + " skipRow=" + skipRow + " retry="
+ retry);
-
+
while (true) {
-
+
LockCheckerSession lcSession = new LockCheckerSession();
TabletLocation tl = _locateTablet(context, row, skipRow, retry, true, lcSession);
@@ -444,21 +444,21 @@ public class TabletLocatorImpl extends TabletLocator {
log.trace("Failed to locate tablet containing row " + TextUtil.truncate(row) + " in table " + tableId + ", will retry...");
continue;
}
-
+
if (opTimer != null)
opTimer.stop("Located tablet " + (tl == null ? null : tl.tablet_extent) + " at " + (tl == null ? null : tl.tablet_location) + " in %DURATION%");
-
+
return tl;
}
}
-
+
private void lookupTabletLocation(ClientContext context, Text row, boolean retry, LockCheckerSession lcSession) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
Text metadataRow = new Text(tableId);
metadataRow.append(new byte[] {';'}, 0, 1);
metadataRow.append(row.getBytes(), 0, row.getLength());
TabletLocation ptl = parent.locateTablet(context, metadataRow, false, retry);
-
+
if (ptl != null) {
TabletLocations locations = locationObtainer.lookupTablet(context, ptl, metadataRow, lastTabletRow, parent);
while (locations != null && locations.getLocations().isEmpty() && locations.getLocationless().isEmpty()) {
@@ -475,19 +475,19 @@ public class TabletLocatorImpl extends TabletLocator {
break;
}
}
-
+
if (locations == null)
return;
-
+
// cannot assume the list contains contiguous key extents... so it is probably
// best to deal with each extent individually
-
+
Text lastEndRow = null;
for (TabletLocation tabletLocation : locations.getLocations()) {
-
+
KeyExtent ke = tabletLocation.tablet_extent;
TabletLocation locToCache;
-
+
// create new location if current prevEndRow == endRow
if ((lastEndRow != null) && (ke.getPrevEndRow() != null) && ke.getPrevEndRow().equals(lastEndRow)) {
locToCache = new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow), tabletLocation.tablet_location,
@@ -495,35 +495,35 @@ public class TabletLocatorImpl extends TabletLocator {
} else {
locToCache = tabletLocation;
}
-
+
// save endRow for next iteration
lastEndRow = locToCache.tablet_extent.getEndRow();
-
+
updateCache(locToCache, lcSession);
}
}
-
+
}
-
+
private void updateCache(TabletLocation tabletLocation, LockCheckerSession lcSession) {
if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
// sanity check
throw new IllegalStateException("Unexpected extent returned " + tableId + " " + tabletLocation.tablet_extent);
}
-
+
if (tabletLocation.tablet_location == null) {
// sanity check
throw new IllegalStateException("Cannot add null locations to cache " + tableId + " " + tabletLocation.tablet_extent);
}
-
+
if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
// sanity check
throw new IllegalStateException("Cannot add other table ids to locations cache " + tableId + " " + tabletLocation.tablet_extent);
}
-
+
// clear out any overlapping extents in cache
removeOverlapping(metaCache, tabletLocation.tablet_extent);
-
+
// do not add to cache unless lock is held
if (lcSession.checkLock(tabletLocation) == null)
return;
@@ -533,14 +533,14 @@ public class TabletLocatorImpl extends TabletLocator {
if (er == null)
er = MAX_TEXT;
metaCache.put(er, tabletLocation);
-
+
if (badExtents.size() > 0)
removeOverlapping(badExtents, tabletLocation.tablet_extent);
}
-
+
static void removeOverlapping(TreeMap<Text,TabletLocation> metaCache, KeyExtent nke) {
Iterator<Entry<Text,TabletLocation>> iter = null;
-
+
if (nke.getPrevEndRow() == null) {
iter = metaCache.entrySet().iterator();
} else {
@@ -548,30 +548,30 @@ public class TabletLocatorImpl extends TabletLocator {
SortedMap<Text,TabletLocation> tailMap = metaCache.tailMap(row);
iter = tailMap.entrySet().iterator();
}
-
+
while (iter.hasNext()) {
Entry<Text,TabletLocation> entry = iter.next();
-
+
KeyExtent ke = entry.getValue().tablet_extent;
-
+
if (stopRemoving(nke, ke)) {
break;
}
-
+
iter.remove();
}
}
-
+
private static boolean stopRemoving(KeyExtent nke, KeyExtent ke) {
return ke.getPrevEndRow() != null && nke.getEndRow() != null && ke.getPrevEndRow().compareTo(nke.getEndRow()) >= 0;
}
-
+
private static Text rowAfterPrevRow(KeyExtent nke) {
Text row = new Text(nke.getPrevEndRow());
row.append(new byte[] {0}, 0, 1);
return row;
}
-
+
static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) {
for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) {
extents.remove(overlapping);
@@ -579,9 +579,9 @@ public class TabletLocatorImpl extends TabletLocator {
}
private TabletLocation locateTabletInCache(Text row) {
-
+
Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row);
-
+
if (entry != null) {
KeyExtent ke = entry.getValue().tablet_extent;
if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0) {
@@ -590,17 +590,17 @@ public class TabletLocatorImpl extends TabletLocator {
}
return null;
}
-
+
protected TabletLocation _locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry, boolean lock, LockCheckerSession lcSession)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-
+
if (skipRow) {
row = new Text(row);
row.append(new byte[] {0}, 0, 1);
}
-
+
TabletLocation tl;
-
+
if (lock)
rLock.lock();
try {
@@ -610,30 +610,30 @@ public class TabletLocatorImpl extends TabletLocator {
if (lock)
rLock.unlock();
}
-
+
if (tl == null) {
if (lock)
wLock.lock();
try {
// not in cache, so obtain info
lookupTabletLocation(context, row, retry, lcSession);
-
+
tl = lcSession.checkLock(locateTabletInCache(row));
} finally {
if (lock)
wLock.unlock();
}
}
-
+
return tl;
}
-
+
private void processInvalidated(ClientContext context, LockCheckerSession lcSession) throws AccumuloSecurityException, AccumuloException,
TableNotFoundException {
-
+
if (badExtents.size() == 0)
return;
-
+
final boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread();
try {
if (!writeLockHeld) {
@@ -642,27 +642,27 @@ public class TabletLocatorImpl extends TabletLocator {
if (badExtents.size() == 0)
return;
}
-
+
List<Range> lookups = new ArrayList<Range>(badExtents.size());
-
+
for (KeyExtent be : badExtents) {
lookups.add(be.toMetadataRange());
removeOverlapping(metaCache, be);
}
-
+
lookups = Range.mergeOverlapping(lookups);
-
+
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
+
parent.binRanges(context, lookups, binnedRanges);
-
+
// randomize server order
ArrayList<String> tabletServers = new ArrayList<String>(binnedRanges.keySet());
Collections.shuffle(tabletServers);
-
+
for (String tserver : tabletServers) {
List<TabletLocation> locations = locationObtainer.lookupTablets(context, tserver, binnedRanges.get(tserver), parent);
-
+
for (TabletLocation tabletLocation : locations) {
updateCache(tabletLocation, lcSession);
}
@@ -674,21 +674,21 @@ public class TabletLocatorImpl extends TabletLocator {
}
}
}
-
+
protected static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, String location, KeyExtent ke, Range range) {
Map<KeyExtent,List<Range>> tablets = binnedRanges.get(location);
if (tablets == null) {
tablets = new HashMap<KeyExtent,List<Range>>();
binnedRanges.put(location, tablets);
}
-
+
List<Range> tabletsRanges = tablets.get(ke);
if (tabletsRanges == null) {
tabletsRanges = new ArrayList<Range>();
tablets.put(ke, tabletsRanges);
}
-
+
tabletsRanges.add(range);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
index b88571a..d3b26dc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
@@ -33,11 +33,11 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
public class TabletServerBatchDeleter extends TabletServerBatchReader implements BatchDeleter {
-
+
private final ClientContext context;
private String tableId;
private BatchWriterConfig bwConfig;
-
+
public TabletServerBatchDeleter(ClientContext context, String tableId, Authorizations authorizations, int numQueryThreads, BatchWriterConfig bwConfig)
throws TableNotFoundException {
super(context, tableId, authorizations, numQueryThreads);
@@ -46,7 +46,7 @@ public class TabletServerBatchDeleter extends TabletServerBatchReader implements
this.bwConfig = bwConfig;
super.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, BatchDeleter.class.getName() + ".NOVALUE", SortedKeyIterator.class));
}
-
+
@Override
public void delete() throws MutationsRejectedException, TableNotFoundException {
BatchWriter bw = null;
@@ -65,5 +65,5 @@ public class TabletServerBatchDeleter extends TabletServerBatchReader implements
bw.close();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
index 2a79f05..a7422c2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
@@ -34,25 +34,25 @@ import org.apache.log4j.Logger;
public class TabletServerBatchReader extends ScannerOptions implements BatchScanner {
private static final Logger log = Logger.getLogger(TabletServerBatchReader.class);
-
+
private String table;
private int numThreads;
private ExecutorService queryThreadPool;
-
+
private final ClientContext context;
private ArrayList<Range> ranges;
-
+
private Authorizations authorizations = Authorizations.EMPTY;
private Throwable ex = null;
-
+
private static int nextBatchReaderInstance = 1;
-
+
private static synchronized int getNextBatchReaderInstance() {
return nextBatchReaderInstance++;
}
-
+
private final int batchReaderInstance = getNextBatchReaderInstance();
-
+
public TabletServerBatchReader(ClientContext context, String table, Authorizations authorizations, int numQueryThreads) {
checkArgument(context != null, "context is null");
checkArgument(table != null, "table is null");
@@ -61,18 +61,18 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan
this.authorizations = authorizations;
this.table = table;
this.numThreads = numQueryThreads;
-
+
queryThreadPool = new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-");
-
+
ranges = null;
ex = new Throwable();
}
-
+
@Override
public void close() {
queryThreadPool.shutdownNow();
}
-
+
/**
* Warning: do not rely upon finalize to close this class. Finalize is not guaranteed to be called.
*/
@@ -83,31 +83,31 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan
close();
}
}
-
+
@Override
public void setRanges(Collection<Range> ranges) {
if (ranges == null || ranges.size() == 0) {
throw new IllegalArgumentException("ranges must be non null and contain at least 1 range");
}
-
+
if (queryThreadPool.isShutdown()) {
throw new IllegalStateException("batch reader closed");
}
-
+
this.ranges = new ArrayList<Range>(ranges);
-
+
}
-
+
@Override
public Iterator<Entry<Key,Value>> iterator() {
if (ranges == null) {
throw new IllegalStateException("ranges not set");
}
-
+
if (queryThreadPool.isShutdown()) {
throw new IllegalStateException("batch reader closed");
}
-
+
return new TabletServerBatchReaderIterator(context, table, authorizations, ranges, numThreads, queryThreadPool, this, timeOut);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index eb80f8b..df330ed 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -73,9 +73,9 @@ import org.apache.thrift.transport.TTransportException;
import com.google.common.net.HostAndPort;
public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> {
-
+
private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class);
-
+
private final ClientContext context;
private final Instance instance;
private final String table;
@@ -83,30 +83,30 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
private final int numThreads;
private final ExecutorService queryThreadPool;
private final ScannerOptions options;
-
+
private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue;
private Iterator<Entry<Key,Value>> batchIterator;
private List<Entry<Key,Value>> batch;
private static final List<Entry<Key,Value>> LAST_BATCH = new ArrayList<Map.Entry<Key,Value>>();
private final Object nextLock = new Object();
-
+
private long failSleepTime = 100;
-
+
private volatile Throwable fatalException = null;
-
+
private Map<String,TimeoutTracker> timeoutTrackers;
private Set<String> timedoutServers;
private long timeout;
-
+
private TabletLocator locator;
-
+
public interface ResultReceiver {
void receive(List<Entry<Key,Value>> entries);
}
-
+
public TabletServerBatchReaderIterator(ClientContext context, String table, Authorizations authorizations, ArrayList<Range> ranges, int numThreads,
ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) {
-
+
this.context = context;
this.instance = context.getInstance();
this.table = table;
@@ -115,24 +115,24 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
this.queryThreadPool = queryThreadPool;
this.options = new ScannerOptions(scannerOptions);
resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
-
+
this.locator = new TimeoutTabletLocator(TabletLocator.getLocator(context, new Text(table)), timeout);
-
+
timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
timedoutServers = Collections.synchronizedSet(new HashSet<String>());
this.timeout = timeout;
-
+
if (options.fetchedColumns.size() > 0) {
ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());
for (Range range : ranges) {
ranges2.add(range.bound(options.fetchedColumns.first(), options.fetchedColumns.last()));
}
-
+
ranges = ranges2;
}
-
+
ResultReceiver rr = new ResultReceiver() {
-
+
@Override
public void receive(List<Entry<Key,Value>> entries) {
try {
@@ -144,12 +144,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
log.warn("Failed to add Batch Scan result", e);
fatalException = e;
throw new RuntimeException(e);
-
+
}
}
-
+
};
-
+
try {
lookup(ranges, rr);
} catch (RuntimeException re) {
@@ -158,31 +158,31 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
throw new RuntimeException("Failed to create iterator", e);
}
}
-
+
@Override
public boolean hasNext() {
synchronized (nextLock) {
if (batch == LAST_BATCH)
return false;
-
+
if (batch != null && batchIterator.hasNext())
return true;
-
+
// don't have one cached, try to cache one and return success
try {
batch = null;
while (batch == null && fatalException == null && !queryThreadPool.isShutdown())
batch = resultsQueue.poll(1, TimeUnit.SECONDS);
-
+
if (fatalException != null)
if (fatalException instanceof RuntimeException)
throw (RuntimeException) fatalException;
else
throw new RuntimeException(fatalException);
-
+
if (queryThreadPool.isShutdown())
throw new RuntimeException("scanner closed");
-
+
batchIterator = batch.iterator();
return batch != LAST_BATCH;
} catch (InterruptedException e) {
@@ -190,7 +190,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
}
}
}
-
+
@Override
public Entry<Key,Value> next() {
// if there's one waiting, or hasNext() can get one, return it
@@ -201,33 +201,33 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
throw new NoSuchElementException();
}
}
-
+
@Override
public void remove() {
throw new UnsupportedOperationException();
}
-
+
private synchronized void lookup(List<Range> ranges, ResultReceiver receiver) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
List<Column> columns = new ArrayList<Column>(options.fetchedColumns);
ranges = Range.mergeOverlapping(ranges);
-
+
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
+
binRanges(locator, ranges, binnedRanges);
-
+
doLookups(binnedRanges, receiver, columns);
}
-
+
private void binRanges(TabletLocator tabletLocator, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
-
+
int lastFailureSize = Integer.MAX_VALUE;
-
+
while (true) {
-
+
binnedRanges.clear();
List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges);
-
+
if (failures.size() > 0) {
// tried to only do table state checks when failures.size() == ranges.size(), however this did
// not work because nothing ever invalidated entries in the tabletLocator cache... so even though
@@ -238,9 +238,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
throw new TableDeletedException(table);
else if (Tables.getTableState(instance, table) == TableState.OFFLINE)
throw new TableOfflineException(instance, table);
-
+
lastFailureSize = failures.size();
-
+
if (log.isTraceEnabled())
log.trace("Failed to bin " + failures.size() + " ranges, tablet locations were null, retrying in 100ms");
try {
@@ -251,9 +251,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
} else {
break;
}
-
+
}
-
+
// truncate the ranges to within the tablets... this makes it easier to know what work
// needs to be redone when failures occurs and tablets have merged or split
Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<String,Map<KeyExtent,List<Range>>>();
@@ -268,47 +268,47 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
clippedRanges.add(tabletRange.clip(range));
}
}
-
+
binnedRanges.clear();
binnedRanges.putAll(binnedRanges2);
}
-
+
private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
if (log.isTraceEnabled())
log.trace("Failed to execute multiscans against " + failures.size() + " tablets, retrying...");
-
+
try {
Thread.sleep(failSleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
-
+
// We were interrupted (close called on batchscanner) just exit
log.debug("Exiting failure processing on interrupt");
return;
}
failSleepTime = Math.min(5000, failSleepTime * 2);
-
+
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
List<Range> allRanges = new ArrayList<Range>();
-
+
for (List<Range> ranges : failures.values())
allRanges.addAll(ranges);
-
+
// since the first call to binRanges clipped the ranges to within a tablet, we should not get only
// bin to the set of failed tablets
binRanges(locator, allRanges, binnedRanges);
-
+
doLookups(binnedRanges, receiver, columns);
}
-
+
private String getTableInfo() {
return Tables.getPrintableTableInfoFromId(instance, table);
}
-
+
private class QueryTask implements Runnable {
-
+
private String tsLocation;
private Map<KeyExtent,List<Range>> tabletsRanges;
private ResultReceiver receiver;
@@ -316,7 +316,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
private final Map<KeyExtent,List<Range>> failures;
private List<Column> columns;
private int semaphoreSize;
-
+
QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges, Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) {
this.tsLocation = tsLocation;
this.tabletsRanges = tabletsRanges;
@@ -324,12 +324,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
this.columns = columns;
this.failures = failures;
}
-
+
void setSemaphore(Semaphore semaphore, int semaphoreSize) {
this.semaphore = semaphore;
this.semaphoreSize = semaphoreSize;
}
-
+
@Override
public void run() {
String threadName = Thread.currentThread().getName();
@@ -349,19 +349,19 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
failures.putAll(tsFailures);
}
}
-
+
} catch (IOException e) {
synchronized (failures) {
failures.putAll(tsFailures);
failures.putAll(unscanned);
}
-
+
locator.invalidateCache(context.getInstance(), tsLocation);
log.debug(e.getMessage(), e);
} catch (AccumuloSecurityException e) {
e.setTableInfo(getTableInfo());
log.debug(e.getMessage(), e);
-
+
Tables.clearCache(instance);
if (!Tables.exists(instance, table))
fatalException = new TableDeletedException(table);
@@ -396,7 +396,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
log.debug(t.getMessage(), t);
fatalException = t;
}
-
+
if (fatalException != null) {
// we are finished with this batch query
if (!resultsQueue.offer(LAST_BATCH)) {
@@ -423,16 +423,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
}
}
}
-
+
}
-
+
private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, final ResultReceiver receiver, List<Column> columns) {
-
+
if (timedoutServers.containsAll(binnedRanges.keySet())) {
// all servers have timed out
throw new TimedOutException(timedoutServers);
}
-
+
// when there are lots of threads and a few tablet servers
// it is good to break request to tablet servers up, the
// following code determines if this is the case
@@ -442,16 +442,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
totalNumberOfTablets += entry.getValue().size();
}
-
+
maxTabletsPerRequest = totalNumberOfTablets / numThreads;
if (maxTabletsPerRequest == 0) {
maxTabletsPerRequest = 1;
}
-
+
}
-
+
Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
-
+
if (timedoutServers.size() > 0) {
// go ahead and fail any timed out servers
for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator = binnedRanges.entrySet().iterator(); iterator.hasNext();) {
@@ -462,16 +462,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
}
}
}
-
+
// randomize tabletserver order... this will help when there are multiple
// batch readers and writers running against accumulo
List<String> locations = new ArrayList<String>(binnedRanges.keySet());
Collections.shuffle(locations);
-
+
List<QueryTask> queryTasks = new ArrayList<QueryTask>();
-
+
for (final String tsLocation : locations) {
-
+
final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
@@ -486,44 +486,44 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
tabletSubset = new HashMap<KeyExtent,List<Range>>();
}
}
-
+
if (tabletSubset.size() > 0) {
QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns);
queryTasks.add(queryTask);
}
}
}
-
+
final Semaphore semaphore = new Semaphore(queryTasks.size());
semaphore.acquireUninterruptibly(queryTasks.size());
-
+
for (QueryTask queryTask : queryTasks) {
queryTask.setSemaphore(semaphore, queryTasks.size());
queryThreadPool.execute(new TraceRunnable(queryTask));
}
}
-
+
static void trackScanning(Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) {
-
+
// translate returned failures, remove them from unscanned, and add them to failures
Map<KeyExtent,List<Range>> retFailures = Translator.translate(scanResult.failures, Translators.TKET, new Translator.ListTranslator<TRange,Range>(
Translators.TRT));
unscanned.keySet().removeAll(retFailures.keySet());
failures.putAll(retFailures);
-
+
// translate full scans and remove them from unscanned
HashSet<KeyExtent> fullScans = new HashSet<KeyExtent>(Translator.translate(scanResult.fullScans, Translators.TKET));
unscanned.keySet().removeAll(fullScans);
-
+
// remove partial scan from unscanned
if (scanResult.partScan != null) {
KeyExtent ke = new KeyExtent(scanResult.partScan);
Key nextKey = new Key(scanResult.partNextKey);
-
+
ListIterator<Range> iterator = unscanned.get(ke).listIterator();
while (iterator.hasNext()) {
Range range = iterator.next();
-
+
if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey()) && scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) {
iterator.remove();
} else if (range.contains(nextKey)) {
@@ -534,41 +534,41 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
}
}
}
-
+
private static class TimeoutTracker {
-
+
String server;
Set<String> badServers;
long timeOut;
long activityTime;
Long firstErrorTime = null;
-
+
TimeoutTracker(String server, Set<String> badServers, long timeOut) {
this(timeOut);
this.server = server;
this.badServers = badServers;
}
-
+
TimeoutTracker(long timeOut) {
this.timeOut = timeOut;
}
-
+
void startingScan() {
activityTime = System.currentTimeMillis();
}
-
+
void check() throws IOException {
if (System.currentTimeMillis() - activityTime > timeOut) {
badServers.add(server);
throw new IOException("Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server);
}
}
-
+
void madeProgress() {
activityTime = System.currentTimeMillis();
firstErrorTime = null;
}
-
+
void errorOccured(Exception e) {
if (firstErrorTime == null) {
firstErrorTime = activityTime;
@@ -576,28 +576,28 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
badServers.add(server);
}
}
-
+
/**
*/
public long getTimeOut() {
return timeOut;
}
}
-
+
public static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures,
Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns, ScannerOptions options, Authorizations authorizations)
throws IOException, AccumuloSecurityException, AccumuloServerException {
doLookup(context, server, requested, failures, unscanned, receiver, columns, options, authorizations, new TimeoutTracker(Long.MAX_VALUE));
}
-
+
static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures,
Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns, ScannerOptions options, Authorizations authorizations,
TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException {
-
+
if (requested.size() == 0) {
return;
}
-
+
// copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
ArrayList<Range> ranges = new ArrayList<Range>();
@@ -606,7 +606,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
}
unscanned.put(new KeyExtent(entry.getKey()), ranges);
}
-
+
timeoutTracker.startingScan();
TTransport transport = null;
try {
@@ -618,13 +618,13 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
client = ThriftUtil.getTServerClient(parsedServer, context);
try {
-
+
OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver=" + server + " #tablets=" + requested.size() + " #ranges="
+ sumSizes(requested.values()) + " ssil=" + options.serverSideIteratorList + " ssio=" + options.serverSideIteratorOptions);
-
+
TabletType ttype = TabletType.type(requested.keySet());
boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);
-
+
Map<TKeyExtent,List<TRange>> thriftTabletRanges = Translator.translate(requested, Translators.KET, new Translator.ListTranslator<Range,TRange>(
Translators.RT));
InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), context.rpcCreds(), thriftTabletRanges,
@@ -634,48 +634,48 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
MultiScanResult scanResult = imsr.result;
-
+
opTimer.stop("Got 1st multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? " scanID=" + imsr.scanID : "")
+ " in %DURATION%");
-
+
ArrayList<Entry<Key,Value>> entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
for (TKeyValue kv : scanResult.results) {
entries.add(new SimpleImmutableEntry<Key,Value>(new Key(kv.key), new Value(kv.value)));
}
-
+
if (entries.size() > 0)
receiver.receive(entries);
-
+
if (entries.size() > 0 || scanResult.fullScans.size() > 0)
timeoutTracker.madeProgress();
-
+
trackScanning(failures, unscanned, scanResult);
-
+
while (scanResult.more) {
-
+
timeoutTracker.check();
-
+
opTimer.start("Continuing multi scan, scanid=" + imsr.scanID);
scanResult = client.continueMultiScan(Tracer.traceInfo(), imsr.scanID);
opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? " scanID=" + imsr.scanID : "")
+ " in %DURATION%");
-
+
entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
for (TKeyValue kv : scanResult.results) {
entries.add(new SimpleImmutableEntry<Key,Value>(new Key(kv.key), new Value(kv.value)));
}
-
+
if (entries.size() > 0)
receiver.receive(entries);
-
+
if (entries.size() > 0 || scanResult.fullScans.size() > 0)
timeoutTracker.madeProgress();
-
+
trackScanning(failures, unscanned, scanResult);
}
-
+
client.closeMultiScan(Tracer.traceInfo(), imsr.scanID);
-
+
} finally {
ThriftUtil.returnClient(client);
}
@@ -700,14 +700,14 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
ThriftTransportPool.getInstance().returnTransport(transport);
}
}
-
+
static int sumSizes(Collection<List<Range>> values) {
int sum = 0;
-
+
for (List<Range> list : values) {
sum += list.size();
}
-
+
return sum;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index c54c2f1..7abc826 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -76,45 +76,45 @@ import com.google.common.net.HostAndPort;
/*
* Differences from previous TabletServerBatchWriter
* + As background threads finish sending mutations to tablet servers they decrement memory usage
- * + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads,
- * even if they are currently processing... new mutations are merged with mutations currently
+ * + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads,
+ * even if they are currently processing... new mutations are merged with mutations currently
* processing in the background
* + Failed mutations are held for 1000ms and then re-added to the unprocessed queue
* + Flush holds adding of new mutations so it does not wait indefinitely
- *
+ *
* Considerations
* + All background threads must catch and note Throwable
- * + mutations for a single tablet server are only processed by one thread concurrently (if new mutations
- * come in for a tablet server while one thread is processing mutations for it, no other thread should
+ * + mutations for a single tablet server are only processed by one thread concurrently (if new mutations
+ * come in for a tablet server while one thread is processing mutations for it, no other thread should
* start processing those mutations)
- *
+ *
* Memory accounting
* + when a mutation enters the system memory is incremented
* + when a mutation successfully leaves the system memory is decremented
- *
- *
- *
+ *
+ *
+ *
*/
public class TabletServerBatchWriter {
-
+
private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
-
+
// basic configuration
private final ClientContext context;
private final long maxMem;
private final long maxLatency;
private final long timeout;
private final Durability durability;
-
+
// state
private boolean flushing;
private boolean closed;
private MutationSet mutations;
-
+
// background writer
private final MutationWriter writer;
-
+
// latency timers
private final Timer jtimer = new Timer("BatchWriterLatencyTimer", true);
private final Map<String,TimeoutTracker> timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
@@ -122,7 +122,7 @@ public class TabletServerBatchWriter {
// stats
private long totalMemUsed = 0;
private long lastProcessingStartTime;
-
+
private long totalAdded = 0;
private final AtomicLong totalSent = new AtomicLong(0);
private final AtomicLong totalBinned = new AtomicLong(0);
@@ -132,7 +132,7 @@ public class TabletServerBatchWriter {
private long initialGCTimes;
private long initialCompileTimes;
private double initialSystemLoad;
-
+
private int tabletServersBatchSum = 0;
private int tabletBatchSum = 0;
private int numBatches = 0;
@@ -140,7 +140,7 @@ public class TabletServerBatchWriter {
private int minTabletBatch = Integer.MAX_VALUE;
private int minTabletServersBatch = Integer.MAX_VALUE;
private int maxTabletServersBatch = Integer.MIN_VALUE;
-
+
// error handling
private final Violations violations = new Violations();
private final Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>();
@@ -156,21 +156,21 @@ public class TabletServerBatchWriter {
final long timeOut;
long activityTime;
Long firstErrorTime = null;
-
+
TimeoutTracker(String server, long timeOut) {
this.timeOut = timeOut;
this.server = server;
}
-
+
void startingWrite() {
activityTime = System.currentTimeMillis();
}
-
+
void madeProgress() {
activityTime = System.currentTimeMillis();
firstErrorTime = null;
}
-
+
void wroteNothing() {
if (firstErrorTime == null) {
firstErrorTime = activityTime;
@@ -178,16 +178,16 @@ public class TabletServerBatchWriter {
throw new TimedOutException(Collections.singleton(server));
}
}
-
+
void errorOccured(Exception e) {
wroteNothing();
}
-
+
public long getTimeOut() {
return timeOut;
}
}
-
+
public TabletServerBatchWriter(ClientContext context, BatchWriterConfig config) {
this.context = context;
this.maxMem = config.getMaxMemory();
@@ -196,9 +196,9 @@ public class TabletServerBatchWriter {
this.mutations = new MutationSet();
this.lastProcessingStartTime = System.currentTimeMillis();
this.durability = config.getDurability();
-
+
this.writer = new MutationWriter(config.getMaxWriteThreads());
-
+
if (this.maxLatency != Long.MAX_VALUE) {
jtimer.schedule(new TimerTask() {
@Override
@@ -215,7 +215,7 @@ public class TabletServerBatchWriter {
}, 0, this.maxLatency / 4);
}
}
-
+
private synchronized void startProcessing() {
if (mutations.getMemoryUsed() == 0)
return;
@@ -223,125 +223,125 @@ public class TabletServerBatchWriter {
writer.addMutations(mutations);
mutations = new MutationSet();
}
-
+
private synchronized void decrementMemUsed(long amount) {
totalMemUsed -= amount;
this.notifyAll();
}
-
+
public synchronized void addMutation(String table, Mutation m) throws MutationsRejectedException {
-
+
if (closed)
throw new IllegalStateException("Closed");
if (m.size() == 0)
throw new IllegalArgumentException("Can not add empty mutations");
-
+
checkForFailures();
-
+
while ((totalMemUsed > maxMem || flushing) && !somethingFailed) {
waitRTE();
}
-
+
// do checks again since things could have changed while waiting and not holding lock
if (closed)
throw new IllegalStateException("Closed");
checkForFailures();
-
+
if (startTime == 0) {
startTime = System.currentTimeMillis();
-
+
List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
initialGCTimes += garbageCollectorMXBean.getCollectionTime();
}
-
+
CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
if (compMxBean.isCompilationTimeMonitoringSupported()) {
initialCompileTimes = compMxBean.getTotalCompilationTime();
}
-
+
initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
}
-
+
// create a copy of mutation so that after this method returns the user
// is free to reuse the mutation object, like calling readFields... this
// is important for the case where a mutation is passed from map to reduce
// to batch writer... the map reduce code will keep passing the same mutation
// object into the reduce method
m = new Mutation(m);
-
+
totalMemUsed += m.estimatedMemoryUsed();
mutations.addMutation(table, m);
totalAdded++;
-
+
if (mutations.getMemoryUsed() >= maxMem / 2) {
startProcessing();
checkForFailures();
}
}
-
+
public void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException {
while (iterator.hasNext()) {
addMutation(table, iterator.next());
}
}
-
+
public synchronized void flush() throws MutationsRejectedException {
-
+
if (closed)
throw new IllegalStateException("Closed");
-
+
Span span = Trace.start("flush");
-
+
try {
checkForFailures();
-
+
if (flushing) {
// some other thread is currently flushing, so wait
while (flushing && !somethingFailed)
waitRTE();
-
+
checkForFailures();
-
+
return;
}
-
+
flushing = true;
-
+
startProcessing();
checkForFailures();
-
+
while (totalMemUsed > 0 && !somethingFailed) {
waitRTE();
}
-
+
flushing = false;
this.notifyAll();
-
+
checkForFailures();
} finally {
span.stop();
// somethingFailed = false;
}
}
-
+
public synchronized void close() throws MutationsRejectedException {
-
+
if (closed)
return;
-
+
Span span = Trace.start("close");
try {
closed = true;
-
+
startProcessing();
-
+
while (totalMemUsed > 0 && !somethingFailed) {
waitRTE();
}
-
+
logStats();
-
+
checkForFailures();
} finally {
// make a best effort to release these resources
@@ -350,27 +350,27 @@ public class TabletServerBatchWriter {
span.stop();
}
}
-
+
private void logStats() {
long finishTime = System.currentTimeMillis();
-
+
long finalGCTimes = 0;
List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
finalGCTimes += garbageCollectorMXBean.getCollectionTime();
}
-
+
CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
long finalCompileTimes = 0;
if (compMxBean.isCompilationTimeMonitoringSupported()) {
finalCompileTimes = compMxBean.getTotalCompilationTime();
}
-
+
double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0);
double overallRate = totalAdded / ((finishTime - startTime) / 1000.0);
-
+
double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
-
+
if (log.isTraceEnabled()) {
log.trace("");
log.trace("TABLET SERVER BATCH WRITER STATISTICS");
@@ -400,39 +400,39 @@ public class TabletServerBatchWriter {
log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad, finalSystemLoad));
}
}
-
+
private void updateSendStats(long count, long time) {
totalSent.addAndGet(count);
totalSendTime.addAndGet(time);
}
-
+
public void updateBinningStats(int count, long time, Map<String,TabletServerMutations<Mutation>> binnedMutations) {
totalBinTime.addAndGet(time);
totalBinned.addAndGet(count);
updateBatchStats(binnedMutations);
}
-
+
private synchronized void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
tabletServersBatchSum += binnedMutations.size();
-
+
minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size());
-
+
int numTablets = 0;
-
+
for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) {
TabletServerMutations<Mutation> tsm = entry.getValue();
numTablets += tsm.getMutations().size();
}
-
+
tabletBatchSum += numTablets;
-
+
minTabletBatch = Math.min(minTabletBatch, numTablets);
maxTabletBatch = Math.max(maxTabletBatch, numTablets);
-
+
numBatches++;
}
-
+
private void waitRTE() {
try {
wait();
@@ -440,9 +440,9 @@ public class TabletServerBatchWriter {
throw new RuntimeException(e);
}
}
-
+
// BEGIN code for handling unrecoverable errors
-
+
private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) {
if (cvsList.size() > 0) {
synchronized (this) {
@@ -452,28 +452,28 @@ public class TabletServerBatchWriter {
}
}
}
-
+
private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) {
HashMap<KeyExtent,SecurityErrorCode> map = new HashMap<KeyExtent,SecurityErrorCode>();
for (KeyExtent ke : keySet)
map.put(ke, code);
-
+
updateAuthorizationFailures(map);
}
-
+
private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) {
if (authorizationFailures.size() > 0) {
-
+
// was a table deleted?
HashSet<String> tableIds = new HashSet<String>();
for (KeyExtent ke : authorizationFailures.keySet())
tableIds.add(ke.getTableId().toString());
-
+
Tables.clearCache(context.getInstance());
for (String tableId : tableIds)
if (!Tables.exists(context.getInstance(), tableId))
throw new TableDeletedException(tableId);
-
+
synchronized (this) {
somethingFailed = true;
mergeAuthorizationFailures(this.authorizationFailures, authorizationFailures);
@@ -481,7 +481,7 @@ public class TabletServerBatchWriter {
}
}
}
-
+
private void mergeAuthorizationFailures(Map<KeyExtent,Set<SecurityErrorCode>> source, Map<KeyExtent,SecurityErrorCode> addition) {
for (Entry<KeyExtent,SecurityErrorCode> entry : addition.entrySet()) {
Set<SecurityErrorCode> secs = source.get(entry.getKey());
@@ -492,14 +492,14 @@ public class TabletServerBatchWriter {
secs.add(entry.getValue());
}
}
-
+
private synchronized void updateServerErrors(String server, Exception e) {
somethingFailed = true;
this.serverSideErrors.add(server);
this.notifyAll();
log.error("Server side error on " + server + ": " + e);
}
-
+
private synchronized void updateUnknownErrors(String msg, Throwable t) {
somethingFailed = true;
unknownErrors++;
@@ -510,29 +510,29 @@ public class TabletServerBatchWriter {
else
log.error(msg, t);
}
-
+
private void checkForFailures() throws MutationsRejectedException {
if (somethingFailed) {
List<ConstraintViolationSummary> cvsList = violations.asList();
HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>> af = new HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>>();
for (Entry<KeyExtent,Set<SecurityErrorCode>> entry : authorizationFailures.entrySet()) {
HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode> codes = new HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode>();
-
+
for (SecurityErrorCode sce : entry.getValue()) {
codes.add(org.apache.accumulo.core.client.security.SecurityErrorCode.valueOf(sce.name()));
}
-
+
af.put(entry.getKey(), codes);
}
-
+
throw new MutationsRejectedException(context.getInstance(), cvsList, af, serverSideErrors, unknownErrors, lastUnknownError);
}
}
-
+
// END code for handling unrecoverable errors
-
+
// BEGIN code for handling failed mutations
-
+
/**
* Add mutations that previously failed back into the mix
*/
@@ -542,16 +542,16 @@ public class TabletServerBatchWriter {
startProcessing();
}
}
-
+
private class FailedMutations extends TimerTask {
-
+
private MutationSet recentFailures = null;
private long initTime;
-
+
FailedMutations() {
jtimer.schedule(this, 0, 500);
}
-
+
private MutationSet init() {
if (recentFailures == null) {
recentFailures = new MutationSet();
@@ -559,35 +559,35 @@ public class TabletServerBatchWriter {
}
return recentFailures;
}
-
+
synchronized void add(String table, ArrayList<Mutation> tableFailures) {
init().addAll(table, tableFailures);
}
-
+
synchronized void add(MutationSet failures) {
init().addAll(failures);
}
-
+
synchronized void add(String location, TabletServerMutations<Mutation> tsm) {
init();
for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) {
recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
}
-
+
}
-
+
@Override
public void run() {
try {
MutationSet rf = null;
-
+
synchronized (this) {
if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) {
rf = recentFailures;
recentFailures = null;
}
}
-
+
if (rf != null) {
if (log.isTraceEnabled())
log.trace("tid=" + Thread.currentThread().getId() + " Requeuing " + rf.size() + " failed mutations");
@@ -599,26 +599,26 @@ public class TabletServerBatchWriter {
}
}
}
-
+
// END code for handling failed mutations
-
+
// BEGIN code for sending mutations to tablet servers using background threads
-
+
private class MutationWriter {
-
+
private static final int MUTATION_BATCH_SIZE = 1 << 17;
private final ExecutorService sendThreadPool;
private final Map<String,TabletServerMutations<Mutation>> serversMutations;
private final Set<String> queued;
private final Map<String,TabletLocator> locators;
-
+
public MutationWriter(int numSendThreads) {
serversMutations = new HashMap<String,TabletServerMutations<Mutation>>();
queued = new HashSet<String>();
sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
locators = new HashMap<String,TabletLocator>();
}
-
+
private TabletLocator getLocator(String tableId) {
TabletLocator ret = locators.get(tableId);
if (ret == null) {
@@ -626,10 +626,10 @@ public class TabletServerBatchWriter {
ret = new TimeoutTabletLocator(ret, timeout);
locators.put(tableId, ret);
}
-
+
return ret;
}
-
+
private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations<Mutation>> binnedMutations) {
String tableId = null;
try {
@@ -637,17 +637,17 @@ public class TabletServerBatchWriter {
for (Entry<String,List<Mutation>> entry : es) {
tableId = entry.getKey();
TabletLocator locator = getLocator(tableId);
-
+
String table = entry.getKey();
List<Mutation> tableMutations = entry.getValue();
-
+
if (tableMutations != null) {
ArrayList<Mutation> tableFailures = new ArrayList<Mutation>();
locator.binMutations(context, tableMutations, binnedMutations, tableFailures);
-
+
if (tableFailures.size() > 0) {
failedMutations.add(table, tableFailures);
-
+
if (tableFailures.size() == tableMutations.size())
if (!Tables.exists(context.getInstance(), entry.getKey()))
throw new TableDeletedException(entry.getKey());
@@ -655,7 +655,7 @@ public class TabletServerBatchWriter {
throw new TableOfflineException(context.getInstance(), entry.getKey());
}
}
-
+
}
return;
} catch (AccumuloServerException ase) {
@@ -673,12 +673,12 @@ public class TabletServerBatchWriter {
} catch (TableNotFoundException e) {
updateUnknownErrors(e.getMessage(), e);
}
-
+
// an error ocurred
binnedMutations.clear();
-
+
}
-
+
void addMutations(MutationSet mutationsToSend) {
Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>();
Span span = Trace.start("binMutations");
@@ -692,17 +692,17 @@ public class TabletServerBatchWriter {
}
addMutations(binnedMutations);
}
-
+
private synchronized void addMutations(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
-
+
int count = 0;
-
+
// merge mutations into existing mutations for a tablet server
for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) {
String server = entry.getKey();
-
+
TabletServerMutations<Mutation> currentMutations = serversMutations.get(server);
-
+
if (currentMutations == null) {
serversMutations.put(server, entry.getValue());
} else {
@@ -712,136 +712,136 @@ public class TabletServerBatchWriter {
}
}
}
-
+
if (log.isTraceEnabled())
for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet())
count += entry2.getValue().size();
-
+
}
-
+
if (count > 0 && log.isTraceEnabled())
log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count, binnedMutations.keySet().size()));
-
+
// randomize order of servers
ArrayList<String> servers = new ArrayList<String>(binnedMutations.keySet());
Collections.shuffle(servers);
-
+
for (String server : servers)
if (!queued.contains(server)) {
sendThreadPool.submit(Trace.wrap(new SendTask(server)));
queued.add(server);
}
}
-
+
private synchronized TabletServerMutations<Mutation> getMutationsToSend(String server) {
TabletServerMutations<Mutation> tsmuts = serversMutations.remove(server);
if (tsmuts == null)
queued.remove(server);
-
+
return tsmuts;
}
-
+
class SendTask implements Runnable {
-
+
final private String location;
-
+
SendTask(String server) {
this.location = server;
}
-
+
@Override
public void run() {
try {
TabletServerMutations<Mutation> tsmuts = getMutationsToSend(location);
-
+
while (tsmuts != null) {
send(tsmuts);
tsmuts = getMutationsToSend(location);
}
-
+
return;
} catch (Throwable t) {
updateUnknownErrors("Failed to send tablet server " + location + " its batch : " + t.getMessage(), t);
}
}
-
+
public void send(TabletServerMutations<Mutation> tsm) throws AccumuloServerException, AccumuloSecurityException {
-
+
MutationSet failures = null;
-
+
String oldName = Thread.currentThread().getName();
-
+
Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations();
try {
-
+
long count = 0;
for (List<Mutation> list : mutationBatch.values()) {
count += list.size();
}
String msg = "sending " + String.format("%,d", count) + " mutations to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location;
Thread.currentThread().setName(msg);
-
+
Span span = Trace.start("sendMutations");
try {
-
+
TimeoutTracker timeoutTracker = timeoutTrackers.get(location);
if (timeoutTracker == null) {
timeoutTracker = new TimeoutTracker(location, timeout);
timeoutTrackers.put(location, timeoutTracker);
}
-
+
long st1 = System.currentTimeMillis();
failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker);
long st2 = System.currentTimeMillis();
if (log.isTraceEnabled())
log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in "
+ String.format("%.2f secs (%,.2f mutations/sec) with %,d failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size()));
-
+
long successBytes = 0;
for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) {
for (Mutation mutation : entry.getValue()) {
successBytes += mutation.estimatedMemoryUsed();
}
}
-
+
if (failures.size() > 0) {
failedMutations.add(failures);
successBytes -= failures.getMemoryUsed();
}
-
+
updateSendStats(count, st2 - st1);
decrementMemUsed(successBytes);
-
+
} finally {
span.stop();
}
} catch (IOException e) {
if (log.isTraceEnabled())
log.trace("failed to send mutations to " + location + " : " + e.getMessage());
-
+
HashSet<String> tables = new HashSet<String>();
for (KeyExtent ke : mutationBatch.keySet())
tables.add(ke.getTableId().toString());
-
+
for (String table : tables)
TabletLocator.getLocator(context, new Text(table)).invalidateCache(context.getInstance(), location);
-
+
failedMutations.add(location, tsm);
} finally {
Thread.currentThread().setName(oldName);
}
}
}
-
+
private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws IOException,
AccumuloSecurityException, AccumuloServerException {
if (tabMuts.size() == 0) {
return new MutationSet();
}
TInfo tinfo = Tracer.traceInfo();
-
+
timeoutTracker.startingWrite();
-
+
try {
final HostAndPort parsedServer = HostAndPort.fromString(location);
final TabletClientService.Iface client;
@@ -853,10 +853,10 @@ public class TabletServerBatchWriter {
try {
MutationSet allFailures = new MutationSet();
-
+
if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
-
+
try {
client.update(tinfo, context.rpcCreds(), entry.getKey().toThrift(), entry.getValue().get(0).toThrift(), DurabilityImpl.toThrift(durability));
} catch (NotServingTabletException e) {
@@ -867,9 +867,9 @@ public class TabletServerBatchWriter {
}
timeoutTracker.madeProgress();
} else {
-
+
long usid = client.startUpdate(tinfo, context.rpcCreds(), DurabilityImpl.toThrift(durability));
-
+
List<TMutation> updates = new ArrayList<TMutation>();
for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
long size = 0;
@@ -880,34 +880,34 @@ public class TabletServerBatchWriter {
updates.add(mutation.toThrift());
size += mutation.numBytes();
}
-
+
client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
updates.clear();
size = 0;
}
}
-
+
UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
-
+
Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translators.TKET);
updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translators.TCVST));
updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translators.TKET));
-
+
long totalCommitted = 0;
-
+
for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
KeyExtent failedExtent = entry.getKey();
int numCommitted = (int) (long) entry.getValue();
totalCommitted += numCommitted;
-
+
String table = failedExtent.getTableId().toString();
-
+
TabletLocator.getLocator(context, new Text(table)).invalidateCache(failedExtent);
-
+
ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
}
-
+
if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
// nothing was successfully written
timeoutTracker.wroteNothing();
@@ -936,34 +936,34 @@ public class TabletServerBatchWriter {
}
}
}
-
+
// END code for sending mutations to tablet servers using background threads
-
+
private static class MutationSet {
-
+
private final HashMap<String,List<Mutation>> mutations;
private int memoryUsed = 0;
-
+
MutationSet() {
mutations = new HashMap<String,List<Mutation>>();
}
-
+
void addMutation(String table, Mutation mutation) {
List<Mutation> tabMutList = mutations.get(table);
if (tabMutList == null) {
tabMutList = new ArrayList<Mutation>();
mutations.put(table, tabMutList);
}
-
+
tabMutList.add(mutation);
-
+
memoryUsed += mutation.estimatedMemoryUsed();
}
-
+
Map<String,List<Mutation>> getMutations() {
return mutations;
}
-
+
int size() {
int result = 0;
for (List<Mutation> perTable : mutations.values()) {
@@ -971,28 +971,28 @@ public class TabletServerBatchWriter {
}
return result;
}
-
+
public void addAll(MutationSet failures) {
Set<Entry<String,List<Mutation>>> es = failures.getMutations().entrySet();
-
+
for (Entry<String,List<Mutation>> entry : es) {
String table = entry.getKey();
-
+
for (Mutation mutation : entry.getValue()) {
addMutation(table, mutation);
}
}
}
-
+
public void addAll(String table, List<Mutation> mutations) {
for (Mutation mutation : mutations) {
addMutation(table, mutation);
}
}
-
+
public int getMemoryUsed() {
return memoryUsed;
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
index 3adcca9..d57bf94 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.core.data.KeyExtent;
public enum TabletType {
ROOT, METADATA, USER;
-
+
public static TabletType type(KeyExtent ke) {
if (ke.isRootTablet())
return ROOT;
@@ -30,20 +30,20 @@ public enum TabletType {
return METADATA;
return USER;
}
-
+
public static TabletType type(Collection<KeyExtent> extents) {
if (extents.size() == 0)
throw new IllegalArgumentException();
-
+
TabletType ttype = null;
-
+
for (KeyExtent extent : extents) {
if (ttype == null)
ttype = type(extent);
else if (ttype != type(extent))
throw new IllegalArgumentException("multiple extent types not allowed " + ttype + " " + type(extent));
}
-
+
return ttype;
}
}