You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2013/10/29 00:10:42 UTC
[2/3] ACCUMULO-1681 - Rolling in match (after merging)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9a2041d5/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index c1df5d3..b3e0f39 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -77,6 +77,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.Constraint.Environment;
import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
import org.apache.accumulo.core.data.Key;
@@ -111,6 +112,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.AuthorizationContainer;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.SecurityUtil;
import org.apache.accumulo.core.security.thrift.TCredentials;
@@ -233,21 +235,21 @@ enum ScanRunState {
public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean {
private static final Logger log = Logger.getLogger(TabletServer.class);
-
+
private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
private static long lastMemorySize = 0;
private static long gcTimeIncreasedCount;
-
+
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
-
+
private TabletServerLogger logger;
-
+
protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
-
+
private ServerConfiguration serverConfig;
private LogSorter logSorter = null;
-
+
public TabletServer(ServerConfiguration conf, VolumeManager fs) {
super();
this.serverConfig = conf;
@@ -269,36 +271,36 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}, 5000, 5000);
}
-
+
private synchronized static void logGCInfo(AccumuloConfiguration conf) {
List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
Runtime rt = Runtime.getRuntime();
-
+
StringBuilder sb = new StringBuilder("gc");
-
+
boolean sawChange = false;
-
+
long maxIncreaseInCollectionTime = 0;
-
+
for (GarbageCollectorMXBean gcBean : gcmBeans) {
Long prevTime = prevGcTime.get(gcBean.getName());
long pt = 0;
if (prevTime != null) {
pt = prevTime;
}
-
+
long time = gcBean.getCollectionTime();
-
+
if (time - pt != 0) {
sawChange = true;
}
-
+
long increaseInCollectionTime = time - pt;
sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
prevGcTime.put(gcBean.getName(), time);
}
-
+
long mem = rt.freeMemory();
if (maxIncreaseInCollectionTime == 0) {
gcTimeIncreasedCount = 0;
@@ -309,90 +311,90 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
gcTimeIncreasedCount = 0;
}
}
-
+
if (mem > lastMemorySize) {
sawChange = true;
}
-
+
String sign = "+";
if (mem - lastMemorySize <= 0) {
sign = "";
}
-
+
sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
-
+
if (sawChange) {
log.debug(sb.toString());
}
-
+
final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
if (maxIncreaseInCollectionTime > keepAliveTimeout) {
Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
}
-
+
lastMemorySize = mem;
}
-
+
private TabletStatsKeeper statsKeeper;
-
+
private static class Session {
long lastAccessTime;
long startTime;
String user;
String client = TServerUtils.clientAddress.get();
public boolean reserved;
-
+
public void cleanup() {}
}
-
+
private static class SessionManager {
-
+
SecureRandom random;
Map<Long,Session> sessions;
long maxIdle;
-
+
SessionManager(AccumuloConfiguration conf) {
random = new SecureRandom();
sessions = new HashMap<Long,Session>();
-
+
maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-
+
Runnable r = new Runnable() {
@Override
public void run() {
sweep(maxIdle);
}
};
-
+
SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
}
-
+
synchronized long createSession(Session session, boolean reserve) {
long sid = random.nextLong();
-
+
while (sessions.containsKey(sid)) {
sid = random.nextLong();
}
-
+
sessions.put(sid, session);
-
+
session.reserved = reserve;
-
+
session.startTime = session.lastAccessTime = System.currentTimeMillis();
-
+
return sid;
}
-
+
long getMaxIdleTime() {
return maxIdle;
}
-
+
/**
* while a session is reserved, it cannot be canceled or removed
*
* @param sessionId
*/
-
+
synchronized Session reserveSession(long sessionId) {
Session session = sessions.get(sessionId);
if (session != null) {
@@ -400,11 +402,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new IllegalStateException();
session.reserved = true;
}
-
+
return session;
-
+
}
-
+
synchronized Session reserveSession(long sessionId, boolean wait) {
Session session = sessions.get(sessionId);
if (session != null) {
@@ -415,16 +417,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException();
}
}
-
+
if (session.reserved)
throw new IllegalStateException();
session.reserved = true;
}
-
+
return session;
-
+
}
-
+
synchronized void unreserveSession(Session session) {
if (!session.reserved)
throw new IllegalStateException();
@@ -432,24 +434,24 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
session.reserved = false;
session.lastAccessTime = System.currentTimeMillis();
}
-
+
synchronized void unreserveSession(long sessionId) {
Session session = getSession(sessionId);
if (session != null)
unreserveSession(session);
}
-
+
synchronized Session getSession(long sessionId) {
Session session = sessions.get(sessionId);
if (session != null)
session.lastAccessTime = System.currentTimeMillis();
return session;
}
-
+
Session removeSession(long sessionId) {
return removeSession(sessionId, false);
}
-
+
Session removeSession(long sessionId, boolean unreserve) {
Session session = null;
synchronized (this) {
@@ -457,14 +459,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (unreserve && session != null)
unreserveSession(session);
}
-
+
// do clean up out side of lock..
if (session != null)
session.cleanup();
-
+
return session;
}
-
+
private void sweep(long maxIdle) {
ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
synchronized (this) {
@@ -478,13 +480,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
// do clean up outside of lock
for (Session session : sessionsToCleanup) {
session.cleanup();
}
}
-
+
synchronized void removeIfNotAccessed(final long sessionId, long delay) {
Session session = sessions.get(sessionId);
if (session != null) {
@@ -500,26 +502,26 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sessionToCleanup = session2;
}
}
-
+
// call clean up outside of lock
if (sessionToCleanup != null)
sessionToCleanup.cleanup();
}
};
-
+
SimpleTimer.getInstance().schedule(r, delay);
}
}
-
+
public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
for (Entry<Long,Session> entry : sessions.entrySet()) {
-
+
Session session = entry.getValue();
@SuppressWarnings("rawtypes")
ScanTask nbt = null;
String tableID = null;
-
+
if (session instanceof ScanSession) {
ScanSession ss = (ScanSession) session;
nbt = ss.nextBatchTask;
@@ -529,40 +531,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
nbt = mss.lookupTask;
tableID = mss.threadPoolExtent.getTableId().toString();
}
-
+
if (nbt == null)
continue;
-
+
ScanRunState srs = nbt.getScanRunState();
-
+
if (nbt == null || srs == ScanRunState.FINISHED)
continue;
-
+
MapCounter<ScanRunState> stateCounts = counts.get(tableID);
if (stateCounts == null) {
stateCounts = new MapCounter<ScanRunState>();
counts.put(tableID, stateCounts);
}
-
+
stateCounts.increment(srs, 1);
}
-
+
return counts;
}
-
+
public synchronized List<ActiveScan> getActiveScans() {
-
+
ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
-
+
long ct = System.currentTimeMillis();
-
+
for (Entry<Long,Session> entry : sessions.entrySet()) {
Session session = entry.getValue();
if (session instanceof ScanSession) {
ScanSession ss = (ScanSession) session;
-
+
ScanState state = ScanState.RUNNING;
-
+
ScanTask<ScanBatch> nbt = ss.nextBatchTask;
if (nbt == null) {
state = ScanState.IDLE;
@@ -580,15 +582,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
break;
}
}
-
+
activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
-
+
} else if (session instanceof MultiScanSession) {
MultiScanSession mss = (MultiScanSession) session;
-
+
ScanState state = ScanState.RUNNING;
-
+
ScanTask<MultiScanResult> nbt = mss.lookupTask;
if (nbt == null) {
state = ScanState.IDLE;
@@ -606,44 +608,45 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
break;
}
}
-
+
activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
.getAuthorizationsBB()));
}
}
-
+
return activeScans;
}
}
-
+
static class TservConstraintEnv implements Environment {
-
+
private TCredentials credentials;
private SecurityOperation security;
private Authorizations auths;
private KeyExtent ke;
-
+
TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
this.security = secOp;
this.credentials = credentials;
}
-
+
void setExtent(KeyExtent ke) {
this.ke = ke;
}
-
+
@Override
public KeyExtent getExtent() {
return ke;
}
-
+
@Override
public String getUser() {
return credentials.getPrincipal();
}
-
+
@Override
+ @Deprecated
public Authorizations getAuthorizations() {
if (auths == null)
try {
@@ -653,119 +656,132 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return auths;
}
-
+
+ @Override
+ public AuthorizationContainer getAuthorizationsContainer() {
+ return new AuthorizationContainer() {
+ @Override
+ public boolean contains(ByteSequence auth) {
+ try {
+ return security.userHasAuthorizations(credentials, Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray())));
+ } catch (ThriftSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
}
-
+
private abstract class ScanTask<T> implements RunnableFuture<T> {
-
+
protected AtomicBoolean interruptFlag;
protected ArrayBlockingQueue<Object> resultQueue;
protected AtomicInteger state;
protected AtomicReference<ScanRunState> runState;
-
+
private static final int INITIAL = 1;
private static final int ADDED = 2;
private static final int CANCELED = 3;
-
+
ScanTask() {
interruptFlag = new AtomicBoolean(false);
runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
state = new AtomicInteger(INITIAL);
resultQueue = new ArrayBlockingQueue<Object>(1);
}
-
+
protected void addResult(Object o) {
if (state.compareAndSet(INITIAL, ADDED))
resultQueue.add(o);
else if (state.get() == ADDED)
throw new IllegalStateException("Tried to add more than one result");
}
-
+
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!mayInterruptIfRunning)
throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
-
+
if (state.get() == CANCELED)
return true;
-
+
if (state.compareAndSet(INITIAL, CANCELED)) {
interruptFlag.set(true);
resultQueue = null;
return true;
}
-
+
return false;
}
-
+
@Override
public T get() throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
-
+
@SuppressWarnings("unchecked")
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-
+
ArrayBlockingQueue<Object> localRQ = resultQueue;
-
+
if (state.get() == CANCELED)
throw new CancellationException();
-
+
if (localRQ == null && state.get() == ADDED)
throw new IllegalStateException("Tried to get result twice");
-
+
Object r = localRQ.poll(timeout, unit);
-
+
// could have been canceled while waiting
if (state.get() == CANCELED) {
if (r != null)
throw new IllegalStateException("Nothing should have been added when in canceled state");
-
+
throw new CancellationException();
}
-
+
if (r == null)
throw new TimeoutException();
-
+
// make this method stop working now that something is being
// returned
resultQueue = null;
-
+
if (r instanceof Throwable)
throw new ExecutionException((Throwable) r);
-
+
return (T) r;
}
-
+
@Override
public boolean isCancelled() {
return state.get() == CANCELED;
}
-
+
@Override
public boolean isDone() {
return runState.get().equals(ScanRunState.FINISHED);
}
-
+
public ScanRunState getScanRunState() {
return runState.get();
}
-
+
}
-
+
private static class ConditionalSession extends Session {
public TCredentials credentials;
public Authorizations auths;
public String tableId;
public AtomicBoolean interruptFlag;
-
+
@Override
public void cleanup() {
interruptFlag.set(true);
}
}
-
+
private static class UpdateSession extends Session {
public Tablet currentTablet;
public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
@@ -783,7 +799,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public long queuedMutationSize = 0;
TservConstraintEnv cenv = null;
}
-
+
private static class ScanSession extends Session {
public KeyExtent extent;
public HashSet<Column> columnSet;
@@ -797,7 +813,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public AtomicBoolean interruptFlag;
public Scanner scanner;
public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
-
+
@Override
public void cleanup() {
try {
@@ -808,32 +824,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
scanner.close();
}
}
-
+
}
-
+
private static class MultiScanSession extends Session {
HashSet<Column> columnSet;
Map<KeyExtent,List<Range>> queries;
public List<IterInfo> ssiList;
public Map<String,Map<String,String>> ssio;
public Authorizations auths;
-
+
// stats
int numRanges;
int numTablets;
int numEntries;
long totalLookupTime;
-
+
public volatile ScanTask<MultiScanResult> lookupTask;
public KeyExtent threadPoolExtent;
-
+
@Override
public void cleanup() {
if (lookupTask != null)
lookupTask.cancel(true);
}
}
-
+
/**
* This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
* are monotonically increasing.
@@ -842,38 +858,38 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
static class WriteTracker {
private static AtomicLong operationCounter = new AtomicLong(1);
private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
-
+
WriteTracker() {
for (TabletType ttype : TabletType.values()) {
inProgressWrites.put(ttype, new TreeSet<Long>());
}
}
-
+
synchronized long startWrite(TabletType ttype) {
long operationId = operationCounter.getAndIncrement();
inProgressWrites.get(ttype).add(operationId);
return operationId;
}
-
+
synchronized void finishWrite(long operationId) {
if (operationId == -1)
return;
-
+
boolean removed = false;
-
+
for (TabletType ttype : TabletType.values()) {
removed = inProgressWrites.get(ttype).remove(operationId);
if (removed)
break;
}
-
+
if (!removed) {
throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
}
-
+
this.notifyAll();
}
-
+
synchronized void waitForWrites(TabletType ttype) {
long operationId = operationCounter.getAndIncrement();
while (inProgressWrites.get(ttype).floor(operationId) != null) {
@@ -884,40 +900,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
public long startWrite(Set<Tablet> keySet) {
if (keySet.size() == 0)
return -1;
-
+
ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
-
+
for (Tablet tablet : keySet)
extents.add(tablet.getExtent());
-
+
return startWrite(TabletType.type(extents));
}
}
-
+
public AccumuloConfiguration getSystemConfiguration() {
return serverConfig.getConfiguration();
}
-
+
TransactionWatcher watcher = new TransactionWatcher();
-
+
private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
-
+
SessionManager sessionManager;
-
+
AccumuloConfiguration acuConf = getSystemConfiguration();
-
+
TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
-
+
TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
-
+
WriteTracker writeTracker = new WriteTracker();
-
+
private RowLocks rowLocks = new RowLocks();
-
+
ThriftClientHandler() {
super(instance, watcher);
log.debug(ThriftClientHandler.class.getName() + " created");
@@ -930,16 +946,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error("Exception registering MBean with MBean Server", e);
}
}
-
+
@Override
public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
throws ThriftSecurityException {
-
+
if (!security.canPerformSystemActions(credentials))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
-
+
for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
TKeyExtent tke = entry.getKey();
Map<String,MapFileInfo> fileMap = entry.getValue();
@@ -950,9 +966,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
path = ns.makeQualified(path);
fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
}
-
+
Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
-
+
if (importTablet == null) {
failures.add(tke);
} else {
@@ -966,46 +982,46 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return failures;
}
-
+
private class NextBatchTask extends ScanTask<ScanBatch> {
-
+
private long scanID;
-
+
NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
this.scanID = scanID;
this.interruptFlag = interruptFlag;
-
+
if (interruptFlag.get())
cancel(true);
}
-
+
@Override
public void run() {
-
+
final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
-
+
try {
if (isCancelled() || scanSession == null)
return;
-
+
runState.set(ScanRunState.RUNNING);
-
+
Thread.currentThread().setName(
"User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
-
+
Tablet tablet = onlineTablets.get(scanSession.extent);
-
+
if (tablet == null) {
addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
return;
}
-
+
long t1 = System.currentTimeMillis();
ScanBatch batch = scanSession.scanner.read();
long t2 = System.currentTimeMillis();
scanSession.nbTimes.addStat(t2 - t1);
-
+
// there should only be one thing on the queue at a time, so
// it should be ok to call add()
// instead of put()... if add() fails because queue is at
@@ -1028,53 +1044,53 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
runState.set(ScanRunState.FINISHED);
Thread.currentThread().setName(oldThreadName);
}
-
+
}
}
-
+
private class LookupTask extends ScanTask<MultiScanResult> {
-
+
private long scanID;
-
+
LookupTask(long scanID) {
this.scanID = scanID;
}
-
+
@Override
public void run() {
MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
-
+
try {
if (isCancelled() || session == null)
return;
-
+
TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
-
+
runState.set(ScanRunState.RUNNING);
Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
-
+
long bytesAdded = 0;
long maxScanTime = 4000;
-
+
long startTime = System.currentTimeMillis();
-
+
ArrayList<KVEntry> results = new ArrayList<KVEntry>();
Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
KeyExtent partScan = null;
Key partNextKey = null;
boolean partNextKeyInclusive = false;
-
+
Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
-
+
// check the time so that the read ahead thread is not monopolized
while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
Entry<KeyExtent,List<Range>> entry = iter.next();
-
+
iter.remove();
-
+
// check that tablet server is serving requested tablet
Tablet tablet = onlineTablets.get(entry.getKey());
if (tablet == null) {
@@ -1083,32 +1099,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
Thread.currentThread().setName(
"Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
-
+
LookupResult lookupResult;
try {
-
+
// do the following check to avoid a race condition
// between setting false below and the task being
// canceled
if (isCancelled())
interruptFlag.set(true);
-
+
lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
session.ssio, interruptFlag);
-
+
// if the tablet was closed it it possible that the
// interrupt flag was set.... do not want it set for
// the next
// lookup
interruptFlag.set(false);
-
+
} catch (IOException e) {
log.warn("lookup failed for tablet " + entry.getKey(), e);
throw new RuntimeException(e);
}
-
+
bytesAdded += lookupResult.bytesAdded;
-
+
if (lookupResult.unfinishedRanges.size() > 0) {
if (lookupResult.closed) {
failures.put(entry.getKey(), lookupResult.unfinishedRanges);
@@ -1122,11 +1138,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
fullScans.add(entry.getKey());
}
}
-
+
long finishTime = System.currentTimeMillis();
session.totalLookupTime += (finishTime - startTime);
session.numEntries += results.size();
-
+
// convert everything to thrift before adding result
List<TKeyValue> retResults = new ArrayList<TKeyValue>();
for (KVEntry entry : results)
@@ -1155,23 +1171,20 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-
- Authorizations userauths = null;
+
if (!security.canScan(credentials, new String(textent.getTable()), range, columns, ssiList, ssio, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
- userauths = security.getUserAuthorizations(credentials);
- for (ByteBuffer auth : authorizations)
- if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
- throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
-
+
+ if (!security.userHasAuthorizations(credentials, authorizations))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
KeyExtent extent = new KeyExtent(textent);
-
+
// wait for any writes that are in flight.. this done to ensure
// consistency across client restarts... assume a client writes
// to accumulo and dies while waiting for a confirmation from
@@ -1184,11 +1197,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// !METADATA table
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(extent));
-
+
Tablet tablet = onlineTablets.get(extent);
if (tablet == null)
throw new NotServingTabletException(textent);
-
+
ScanSession scanSession = new ScanSession();
scanSession.user = credentials.getPrincipal();
scanSession.extent = new KeyExtent(extent);
@@ -1198,16 +1211,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
scanSession.auths = new Authorizations(authorizations);
scanSession.interruptFlag = new AtomicBoolean();
scanSession.readaheadThreshold = readaheadThreshold;
-
+
for (TColumn tcolumn : columns) {
scanSession.columnSet.add(new Column(tcolumn));
}
-
+
scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
scanSession.interruptFlag);
-
+
long sid = sessionManager.createSession(scanSession, true);
-
+
ScanResult scanResult;
try {
scanResult = continueScan(tinfo, sid, scanSession);
@@ -1217,10 +1230,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
sessionManager.unreserveSession(sid);
}
-
+
return new InitialScan(sid, scanResult);
}
-
+
@Override
public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
@@ -1228,22 +1241,22 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (scanSession == null) {
throw new NoSuchScanIDException();
}
-
+
try {
return continueScan(tinfo, scanID, scanSession);
} finally {
sessionManager.unreserveSession(scanSession);
}
}
-
+
private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-
+
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
}
-
+
ScanBatch bresult;
try {
bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
@@ -1273,32 +1286,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("Failed to get next batch", t);
throw new RuntimeException(t);
}
-
+
ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
-
+
scanSession.entriesReturned += scanResult.results.size();
-
+
scanSession.batchCount++;
-
+
if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
// start reading next batch while current batch is transmitted
// to client
scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
}
-
+
if (!scanResult.more)
closeScan(tinfo, scanID);
-
+
return scanResult;
}
-
+
@Override
public void closeScan(TInfo tinfo, long scanID) {
ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
if (ss != null) {
long t2 = System.currentTimeMillis();
-
+
log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
.toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
if (scanMetrics.isEnabled()) {
@@ -1307,7 +1320,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
@@ -1316,30 +1329,31 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
for (TKeyExtent keyExtent : tbatch.keySet()) {
tables.add(new String(keyExtent.getTable()));
}
-
+
if (tables.size() != 1)
throw new IllegalArgumentException("Cannot batch scan over multiple tables");
-
+
// check if user has permission to the tables
- Authorizations userauths = null;
for (String table : tables)
if (!security.canScan(credentials, table, tbatch, tcolumns, ssiList, ssio, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
- userauths = security.getUserAuthorizations(credentials);
- for (ByteBuffer auth : authorizations)
- if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+
+ try {
+ if (!security.userHasAuthorizations(credentials, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
-
+ } catch (ThriftSecurityException tse) {
+ log.error(tse, tse);
+ throw tse;
+ }
Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(), new Translator.ListTranslator<TRange,Range>(
new TRangeTranslator()));
-
+
// This is used to determine which thread pool to use
KeyExtent threadPoolExtent = batch.keySet().iterator().next();
-
+
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(batch.keySet()));
-
+
MultiScanSession mss = new MultiScanSession();
mss.user = credentials.getPrincipal();
mss.queries = batch;
@@ -1347,19 +1361,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
mss.ssiList = ssiList;
mss.ssio = ssio;
mss.auths = new Authorizations(authorizations);
-
+
mss.numTablets = batch.size();
for (List<Range> ranges : batch.values()) {
mss.numRanges += ranges.size();
}
-
+
for (TColumn tcolumn : tcolumns)
mss.columnSet.add(new Column(tcolumn));
-
+
mss.threadPoolExtent = threadPoolExtent;
-
+
long sid = sessionManager.createSession(mss, true);
-
+
MultiScanResult result;
try {
result = continueMultiScan(tinfo, sid, mss);
@@ -1369,33 +1383,33 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
sessionManager.unreserveSession(sid);
}
-
+
return new InitialMultiScan(sid, result);
}
-
+
@Override
public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
-
+
MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
-
+
if (session == null) {
throw new NoSuchScanIDException();
}
-
+
try {
return continueMultiScan(tinfo, scanID, session);
} finally {
sessionManager.unreserveSession(session);
}
}
-
+
private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
-
+
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(scanID);
resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
}
-
+
try {
MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
session.lookupTask = null;
@@ -1413,37 +1427,37 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException(t);
}
}
-
+
@Override
public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
if (session == null) {
throw new NoSuchScanIDException();
}
-
+
long t2 = System.currentTimeMillis();
log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
}
-
+
@Override
public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
// Make sure user is real
-
+
security.authenticateUser(credentials, credentials);
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
-
+
UpdateSession us = new UpdateSession();
us.violations = new Violations();
us.credentials = credentials;
us.cenv = new TservConstraintEnv(security, us.credentials);
-
+
long sid = sessionManager.createSession(us, false);
-
+
return sid;
}
-
+
private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
long t1 = System.currentTimeMillis();
if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
@@ -1452,7 +1466,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// if there were previous failures, then do not accept additional writes
return;
}
-
+
try {
// if user has no permission to write to this table, add it to
// the failures list
@@ -1491,18 +1505,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
return;
}
}
-
+
@Override
public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
if (us == null) {
throw new RuntimeException("No Such SessionID");
}
-
+
try {
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
setUpdateTablet(us, keyExtent);
-
+
if (us.currentTablet != null) {
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
@@ -1517,34 +1531,34 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sessionManager.unreserveSession(us);
}
}
-
+
private void flush(UpdateSession us) {
-
+
int mutationCount = 0;
Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
Throwable error = null;
-
+
long pt1 = System.currentTimeMillis();
-
+
boolean containsMetadataTablet = false;
for (Tablet tablet : us.queuedMutations.keySet())
if (tablet.getExtent().isMeta())
containsMetadataTablet = true;
-
+
if (!containsMetadataTablet && us.queuedMutations.size() > 0)
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-
+
Span prep = Trace.start("prep");
try {
for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
-
+
Tablet tablet = entry.getKey();
List<Mutation> mutations = entry.getValue();
if (mutations.size() > 0) {
try {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
-
+
CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
if (commitSession == null) {
if (us.currentTablet == tablet) {
@@ -1555,12 +1569,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sendables.put(commitSession, mutations);
mutationCount += mutations.size();
}
-
+
} catch (TConstraintViolationException e) {
us.violations.add(e.getViolations());
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
-
+
if (e.getNonViolators().size() > 0) {
// only log and commit mutations if there were some
// that did not
@@ -1569,9 +1583,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// expects
sendables.put(e.getCommitSession(), e.getNonViolators());
}
-
+
mutationCount += mutations.size();
-
+
} catch (HoldTimeoutException t) {
error = t;
log.debug("Giving up on mutations due to a long memory hold time");
@@ -1586,11 +1600,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
prep.stop();
}
-
+
long pt2 = System.currentTimeMillis();
us.prepareTimes.addStat(pt2 - pt1);
updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
-
+
if (error != null) {
for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
e.getKey().abortCommit(e.getValue());
@@ -1603,9 +1617,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
while (true) {
try {
long t1 = System.currentTimeMillis();
-
+
logger.logManyTablets(sendables);
-
+
long t2 = System.currentTimeMillis();
us.walogTimes.addStat(t2 - t1);
updateWalogWriteTime((t2 - t1));
@@ -1622,18 +1636,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
wal.stop();
}
-
+
Span commit = Trace.start("commit");
try {
long t1 = System.currentTimeMillis();
for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
CommitSession commitSession = entry.getKey();
List<Mutation> mutations = entry.getValue();
-
+
commitSession.commit(mutations);
-
+
Tablet tablet = commitSession.getTablet();
-
+
if (tablet == us.currentTablet) {
// because constraint violations may filter out some
// mutations, for proper
@@ -1645,10 +1659,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
long t2 = System.currentTimeMillis();
-
+
us.flushTime += (t2 - pt1);
us.commitTimes.addStat(t2 - t1);
-
+
updateAvgCommitTime(t2 - t1, sendables.size());
} finally {
commit.stop();
@@ -1662,40 +1676,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
us.totalUpdates += mutationCount;
}
-
+
private void updateWalogWriteTime(long time) {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, time);
}
-
+
private void updateAvgCommitTime(long time, int size) {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.commitTime, (long) ((time) / (double) size));
}
-
+
private void updateAvgPrepTime(long time, int size) {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (long) ((time) / (double) size));
}
-
+
@Override
public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
if (us == null) {
throw new NoSuchScanIDException();
}
-
+
// clients may or may not see data from an update session while
// it is in progress, however when the update session is closed
// want to ensure that reads wait for the write to finish
long opid = writeTracker.startWrite(us.queuedMutations.keySet());
-
+
try {
flush(us);
} finally {
writeTracker.finishWrite(opid);
}
-
+
log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
(System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
@@ -1712,15 +1726,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
KeyExtent first = us.authFailures.keySet().iterator().next();
log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
}
-
+
return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
us.authFailures, Translator.KET));
}
-
+
@Override
public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
ConstraintViolationException, ThriftSecurityException {
-
+
if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
@@ -1728,16 +1742,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (tablet == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
if (!keyExtent.isMeta())
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-
+
long opid = writeTracker.startWrite(TabletType.type(keyExtent));
-
+
try {
Mutation mutation = new ServerMutation(tmutation);
List<Mutation> mutations = Collections.singletonList(mutation);
-
+
Span prep = Trace.start("prep");
CommitSession cs;
try {
@@ -1748,7 +1762,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (cs == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
while (true) {
try {
Span wal = Trace.start("wal");
@@ -1762,7 +1776,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn(ex, ex);
}
}
-
+
Span commit = Trace.start("commit");
try {
cs.commit(mutations);
@@ -1775,69 +1789,69 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
writeTracker.finishWrite(opid);
}
}
-
+
private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
List<String> symbols) throws IOException {
Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
-
+
CompressedIterators compressedIters = new CompressedIterators(symbols);
-
+
while (iter.hasNext()) {
Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
Tablet tablet = onlineTablets.get(entry.getKey());
-
+
if (tablet == null || tablet.isClosed()) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
iter.remove();
} else {
List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
-
+
for (ServerConditionalMutation scm : entry.getValue()) {
if (checkCondition(results, cs, compressedIters, tablet, scm))
okMutations.add(scm);
}
-
+
entry.setValue(okMutations);
}
-
+
}
}
-
+
boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet,
ServerConditionalMutation scm) throws IOException {
boolean add = true;
-
+
Set<Column> emptyCols = Collections.emptySet();
-
+
for (TCondition tc : scm.getConditions()) {
-
+
Range range;
if (tc.hasTimestamp)
range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
else
range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
-
+
IterConfig ic = compressedIters.decompress(tc.iterators);
-
+
Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
-
+
try {
ScanBatch batch = scanner.read();
-
+
Value val = null;
-
+
for (KVEntry entry2 : batch.results) {
val = entry2.getValue();
break;
}
-
+
if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
add = false;
break;
}
-
+
} catch (TabletClosedException e) {
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
add = false;
@@ -1854,14 +1868,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return add;
}
-
+
private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
-
+
Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
-
+
boolean sessionCanceled = sess.interruptFlag.get();
-
+
Span prepSpan = Trace.start("prep");
try {
long t1 = System.currentTimeMillis();
@@ -1872,13 +1886,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
} else {
try {
-
+
@SuppressWarnings("unchecked")
List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
if (mutations.size() > 0) {
-
+
CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
-
+
if (cs == null) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
@@ -1894,19 +1908,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
for (Mutation m : e.getNonViolators())
results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
}
-
+
for (Mutation m : e.getViolators())
results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
}
}
}
-
+
long t2 = System.currentTimeMillis();
updateAvgPrepTime(t2 - t1, es.size());
} finally {
prepSpan.stop();
}
-
+
Span walSpan = Trace.start("wal");
try {
while (true && sendables.size() > 0) {
@@ -1928,14 +1942,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
walSpan.stop();
}
-
+
Span commitSpan = Trace.start("commit");
try {
long t1 = System.currentTimeMillis();
for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
CommitSession commitSession = entry.getKey();
List<Mutation> mutations = entry.getValue();
-
+
commitSession.commit(mutations);
}
long t2 = System.currentTimeMillis();
@@ -1943,19 +1957,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
commitSpan.stop();
}
-
+
}
-
+
private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
ArrayList<TCMResult> results, List<String> symbols) throws IOException {
// sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
ConditionalMutationSet.sortConditionalMutations(updates);
-
+
Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
-
+
// can not process two mutations for the same row, because one will not see what the other writes
ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
-
+
// get as many locks as possible w/o blocking... defer any rows that are locked
List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
try {
@@ -1965,7 +1979,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
checkSpan.stop();
}
-
+
Span updateSpan = Trace.start("apply conditional mutations");
try {
writeConditionalMutations(updates, results, cs);
@@ -1977,61 +1991,61 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return deferred;
}
-
+
@Override
public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
throws ThriftSecurityException, TException {
-
+
Authorizations userauths = null;
if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
userauths = security.getUserAuthorizations(credentials);
for (ByteBuffer auth : authorizations)
if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
-
+
ConditionalSession cs = new ConditionalSession();
cs.auths = new Authorizations(authorizations);
cs.credentials = credentials;
cs.tableId = tableID;
cs.interruptFlag = new AtomicBoolean();
-
+
long sid = sessionManager.createSession(cs, false);
return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
}
-
+
@Override
public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
throws NoSuchScanIDException, TException {
-
+
ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
-
+
if (cs == null || cs.interruptFlag.get())
throw new NoSuchScanIDException();
-
+
if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID))
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-
+
Text tid = new Text(cs.tableId);
long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
-
+
try {
Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
-
+
for (KeyExtent ke : updates.keySet())
if (!ke.getTableId().equals(tid))
throw new IllegalArgumentException("Unexpected table id " + tid + " != " + ke.getTableId());
-
+
ArrayList<TCMResult> results = new ArrayList<TCMResult>();
-
+
Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
-
+
while (deferred.size() > 0) {
deferred = conditionalUpdate(cs, deferred, results, symbols);
}
-
+
return results;
} catch (IOException ioe) {
throw new TException(ioe);
@@ -2040,41 +2054,41 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sessionManager.unreserveSession(sessID);
}
}
-
+
@Override
public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
// this method should wait for any running conditional update to complete
// after this method returns a conditional update should not be able to start
-
+
ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
if (cs != null)
cs.interruptFlag.set(true);
-
+
cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
if (cs != null)
sessionManager.removeSession(sessID, true);
}
-
+
@Override
public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
sessionManager.removeSession(sessID, false);
}
-
+
@Override
public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
ThriftSecurityException {
-
+
String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
if (!security.canSplitTablet(credentials, tableId))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-
+
Tablet tablet = onlineTablets.get(keyExtent);
if (tablet == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
try {
if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
@@ -2086,12 +2100,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
return getStats(sessionManager.getActiveScansPerTable());
}
-
+
@Override
public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
@@ -2116,9 +2130,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return result;
}
-
+
private ZooCache masterLockCache = new ZooCache();
-
+
private void checkPermission(TCredentials credentials, String lock, final String request) throws ThriftSecurityException {
boolean fatal = false;
try {
@@ -2144,12 +2158,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
});
}
}
-
+
if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
log.warn("Got " + request + " message from master before lock acquired, ignoring...");
throw new RuntimeException("Lock not acquired");
}
-
+
if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
Halt.halt(1, new Runnable() {
@Override
@@ -2159,10 +2173,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
});
}
-
+
if (lock != null) {
ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
-
+
try {
if (!ZooLock.isLockHeld(masterLockCache, lid)) {
// maybe the cache is out of date and a new master holds the
@@ -2178,38 +2192,38 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
-
+
try {
checkPermission(credentials, lock, "loadTablet");
} catch (ThriftSecurityException e) {
log.error(e, e);
throw new RuntimeException(e);
}
-
+
final KeyExtent extent = new KeyExtent(textent);
-
+
synchronized (unopenedTablets) {
synchronized (openingTablets) {
synchronized (onlineTablets) {
-
+
// checking if this exact tablet is in any of the sets
// below is not a strong enough check
// when splits and fix splits occurring
-
+
Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
-
+
Set<KeyExtent> all = new HashSet<KeyExtent>();
all.addAll(unopenedOverlapping);
all.addAll(openingOverlapping);
all.addAll(onlineOverlapping);
-
+
if (!all.isEmpty()) {
-
+
// ignore any tablets that have recently split, for error logging
for (KeyExtent e2 : onlineOverlapping) {
Tablet tablet = onlineTablets.get(e2);
@@ -2217,25 +2231,25 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
all.remove(e2);
}
}
-
+
// ignore self, for error logging
all.remove(extent);
-
+
if (all.size() > 0) {
log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping
+ " " + all);
}
return;
}
-
+
unopenedTablets.add(extent);
}
}
}
-
+
// add the assignment job to the appropriate queue
log.info("Loading tablet " + extent);
-
+
final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
// Root tablet assignment must take place immediately
if (extent.isRootTablet()) {
@@ -2248,7 +2262,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} else {
log.info("Root tablet failed to load");
}
-
+
}
}.start();
} else {
@@ -2259,7 +2273,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, boolean save) {
try {
@@ -2268,12 +2282,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
KeyExtent extent = new KeyExtent(textent);
-
+
resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save)));
}
-
+
@Override
public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) {
try {
@@ -2282,19 +2296,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
ArrayList<Tablet> tabletsToFlush = new ArrayList<Tablet>();
-
+
KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
-
+
synchronized (onlineTablets) {
for (Tablet tablet : onlineTablets.values())
if (ke.overlaps(tablet.getExtent()))
tabletsToFlush.add(tablet);
}
-
+
Long flushID = null;
-
+
for (Tablet tablet : tabletsToFlush) {
if (flushID == null) {
// read the flush id once from zookeeper instead of reading
@@ -2310,7 +2324,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
tablet.flush(flushID);
}
}
-
+
@Override
public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
try {
@@ -2319,7 +2333,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
Tablet tablet = onlineTablets.get(new KeyExtent(textent));
if (tablet != null) {
log.info("Flushing " + tablet.getExtent());
@@ -2330,12 +2344,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException {
-
+
checkPermission(credentials, lock, "halt");
-
+
Halt.halt(0, new Runnable() {
@Override
public void run() {
@@ -2350,7 +2364,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
});
}
-
+
@Override
public void fastHalt(TInfo info, TCredentials credentials, String lock) {
try {
@@ -2359,12 +2373,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("Error halting", e);
}
}
-
+
@Override
public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
return statsKeeper.getTabletStats();
}
-
+
@Override
public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
try {
@@ -2373,10 +2387,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
return sessionManager.getActiveScans();
}
-
+
@Override
public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
try {
@@ -2385,15 +2399,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
KeyExtent ke = new KeyExtent(textent);
-
+
Tablet tablet = onlineTablets.get(ke);
if (tablet != null) {
tablet.chopFiles();
}
}
-
+
@Override
public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
try {
@@ -2402,18 +2416,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
-
+
ArrayList<Tablet> tabletsToCompact = new ArrayList<Tablet>();
synchronized (onlineTablets) {
for (Tablet tablet : onlineTablets.values())
if (ke.overlaps(tablet.getExtent()))
tabletsToCompact.add(tablet);
}
-
+
Long compactionId = null;
-
+
for (Tablet tablet : tabletsToCompact) {
// all for the same table id, so only need to read
// compaction id once
@@ -2426,9 +2440,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
tablet.compactAll(compactionId);
}
-
+
}
-
+
@Override
public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
String myname = getClientAddressString();
@@ -2453,7 +2467,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
try {
Path source = new Path(filename);
if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
@@ -2483,7 +2497,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
try {
@@ -2492,25 +2506,25 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
List<CompactionInfo> compactions = Compactor.getRunningCompactions();
List<ActiveCompaction> ret = new ArrayList<ActiveCompaction>(compactions.size());
-
+
for (CompactionInfo compactionInfo : compactions) {
ret.add(compactionInfo.toThrift());
}
-
+
return ret;
}
}
-
+
private class SplitRunner implements Runnable {
private Tablet tablet;
-
+
public SplitRunner(Tablet tablet) {
this.tablet = tablet;
}
-
+
@Override
public void run() {
if (majorCompactorDisabled) {
@@ -2518,64 +2532,64 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// initiated exit
return;
}
-
+
splitTablet(tablet);
}
}
-
+
boolean isMajorCompactionDisabled() {
return majorCompactorDisabled;
}
-
+
void executeSplit(Tablet tablet) {
resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
}
-
+
private class MajorCompactor implements Runnable {
-
+
public MajorCompactor(AccumuloConfiguration config) {
CompactionWatcher.startWatching(config);
}
-
+
@Override
public void run() {
while (!majorCompactorDisabled) {
try {
UtilWaitThread.sleep(getSystemConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY));
-
+
TreeMap<KeyExtent,Tablet> copyOnlineTablets = new TreeMap<KeyExtent,Tablet>();
-
+
synchronized (onlineTablets) {
copyOnlineTablets.putAll(onlineTablets); // avoid
// concurrent
// modification
}
-
+
int numMajorCompactionsInProgress = 0;
-
+
Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
-
+
// bail early now if we're shutting down
while (iter.hasNext() && !majorCompactorDisabled) {
-
+
Entry<KeyExtent,Tablet> entry = iter.next();
-
+
Tablet tablet = entry.getValue();
-
+
// if we need to split AND compact, we need a good way
// to decide what to do
if (tablet.needsSplit()) {
executeSplit(tablet);
continue;
}
-
+
int maxLogEntriesPerTablet = getTableConfiguration(tablet.getExtent()).getCount(Property.TABLE_MINC_LOGS_MAX);
-
+
if (tablet.getLogCount() >= maxLogEntriesPerTablet) {
log.debug("Initiating minor compaction for " + tablet.getExtent() + " because it has " + tablet.getLogCount() + " write ahead logs");
tablet.initiateMinorCompaction(MinorCompactionReason.SYSTEM);
}
-
+
synchronized (tablet) {
if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL) || tablet.majorCompactionQueued() || tablet.majorCompactionRunning()) {
numMajorCompactionsInProgress++;
@@ -2583,18 +2597,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
int idleCompactionsToStart = Math.max(1, getSystemConfiguration().getCount(Property.TSERV_MAJC_MAXCONCURRENT) / 2);
-
+
if (numMajorCompactionsInProgress < idleCompactionsToStart) {
// system is not major compacting, can schedule some
// idle compactions
iter = copyOnlineTablets.entrySet().iterator();
-
+
while (iter.hasNext() && !majorCompactorDisabled && numMajorCompactionsInProgress < idleCompactionsToStart) {
Entry<KeyExtent,Tablet> entry = iter.next();
Tablet tablet = entry.getValue();
-
+
if (tablet.initiateMajorCompaction(MajorCompactionReason.IDLE)) {
numMajorCompactionsInProgress++;
}
@@ -2607,10 +2621,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
private void splitTablet(Tablet tablet) {
try {
-
+
TreeMap<KeyExtent,SplitInfo> tabletInfo = splitTablet(tablet, null);
if (tabletInfo == null) {
// either split or compact not both
@@ -2626,34 +2640,34 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error("Unknown error on split: " + e, e);
}
}
-
+
private TreeMap<KeyExtent,SplitInfo> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException {
long t1 = System.currentTimeMillis();
-
+
TreeMap<KeyExtent,SplitInfo> tabletInfo = tablet.split(splitPoint);
if (tabletInfo == null) {
return null;
}
-
+
log.info("Starting split: " + tablet.getExtent());
statsKeeper.incrementStatusSplit();
long start = System.currentTimeMillis();
-
+
Tablet[] newTablets = new Tablet[2];
-
+
Entry<KeyExtent,SplitInfo> first = tabletInfo.firstEntry();
newTablets[0] = new Tablet(TabletServer.this, new Text(first.getValue().dir), first.getKey(), resourceManager.createTabletResourceManager(),
first.getValue().datafiles, first.getValue().time, first.getValue().initFlushID, first.getValue().initCompactID);
-
+
Entry<KeyExtent,SplitInfo> last = tabletInfo.lastEntry();
newTablets[1] = new Tablet(TabletServer.this, new Text(last.getValue().dir), last.getKey(), resourceManager.createTabletResourceManager(),
last.getValue().datafiles, last.getValue().time, last.getValue().initFlushID, last.getValue().initCompactID);
-
+
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
statsKeeper.saveMinorTimes(tablet.timer);
statsKeeper.saveMajorTimes(tablet.timer);
-
+
// lose the reference to the old tablet and open two new ones
synchronized (onlineTablets) {
onlineTablets.remove(tablet.getExtent());
@@ -2663,40 +2677,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// tell the master
enqueueMasterMessage(new SplitReportMessage(tablet.getExtent(), newTablets[0].getExtent(), new Text("/" + newTablets[0].getLocation().getName()),
newTablets[1].getExtent(), new Text("/" + newTablets[1].getLocation().getName())));
-
+
statsKeeper.updateTime(Operation.SPLIT, start, 0, false);
long t2 = System.currentTimeMillis();
log.info("Tablet split: " + tablet.getExtent() + " size0 " + newTablets[0].estimateTabletSize() + " size1 " + newTablets[1].estimateTabletSize() + " time "
+ (t2 - t1) + "ms");
-
+
return tabletInfo;
}
-
+
public long lastPingTime = System.currentTimeMillis();
public Socket currentMaster;
-
+
// a queue to hold messages that are to be sent back to the master
private BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
-
+
// add a message for the main thread to send back to the master
void enqueueMasterMessage(MasterMessage m) {
masterMessages.addLast(m);
}
-
+
private class UnloadTabletHandler implements Runnable {
private KeyExtent extent;
private boolean saveState;
-
+
public UnloadTabletHandler(KeyExtent extent, boolean saveState) {
this.extent = extent;
this.saveState = saveState;
}
-
+
@Override
public void run() {
-
+
Tablet t = null;
-
+
synchronized (unopenedTablets) {
if (unopenedTablets.contains(extent)) {
unopenedTablets.remove(extent);
@@ -2716,7 +2730,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
t = onlineTablets.get(extent);
}
}
-
+
if (t == null) {
// Tablet has probably been recently unloaded: repeated master
// unload request is crossing the successful unloaded message
@@ -2726,11 +2740,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return;
}
-
+
try {
t.close(saveState);
} catch (Throwable e) {
-
+
if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) {
log.debug("Failed to unload tablet " + extent + "... it was alread closing or closed : " + e.getMessage());
} else {
@@ -2739,12 +2753,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return;
}
-
+
// stop serving tablet - client will get not serving tablet
// exceptions
recentlyUnloadedCache.put(extent, System.currentTimeMillis());
onlineTablets.remove(extent);
-
+
try {
TServerInstance instance = new TServerInstance(clientAddress, getLock().getSessionId());
TabletLocationState tls = null;
@@ -2762,37 +2776,37 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (InterruptedException e) {
log.warn("Interrupted while getting our zookeeper session information", e);
}
-
+
// tell the master how it went
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOADED, extent));
-
+
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
statsKeeper.saveMinorTimes(t.timer);
statsKeeper.saveMajorTimes(t.timer);
-
+
log.info("unloaded " + extent);
-
+
}
}
-
+
private class AssignmentHandler implements Runnable {
private KeyExtent extent;
private int retryAttempt = 0;
-
+
public AssignmentHandler(KeyExtent extent) {
this.extent = extent;
}
-
+
public AssignmentHandler(KeyExtent extent, int retryAttempt) {
this(extent);
this.retryAttempt = retryAttempt;
}
-
+
@Override
public void run() {
log.info(clientAddress + ": got assignment from master: " + extent);
-
+
synchronized (unopenedTablets) {
synchronized (openingTablets) {
synchronized (onlineTablets) {
@@ -2801,23 +2815,23 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
-
+
if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent))
return;
-
+
if (!unopenedTablets.contains(extent) || unopenedOverlapping.size() != 1 || openingOverlapping.size() > 0 || onlineOverlapping.size() > 0) {
throw new IllegalStateException("overlaps assigned " + extent + " " + !unopenedTablets.contains(extent) + " " + unopenedOverlapping + " "
+ openingOverlapping + " " + onlineOverlapping);
}
}
-
+
unopenedTablets.remove(extent);
openingTablets.add(extent);
}
}
-
+
log.debug("Loading extent: " + extent);
-
+
// check Metadata table before accepting assignment
Text locationToOpen = null;
SortedMap<Key,Value> tabletsKeyValues = new TreeMap<Key,Value>();
@@ -2847,7 +2861,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
throw new RuntimeException(e);
}
-
+
if (locationToOpen == null) {
log.debug("Reporting tablet " + extent + " assignment failure: unable to verify Tablet Information");
synchronized (openingTablets) {
@@ -2857,13 +2871,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
return;
}
-
+
Tablet tablet = null;
boolean successful = false;
-
+
try {
TabletResourceManager trm = resourceManager.createTabletResourceManager();
-
+
// this opens the tablet file and fills in the endKey in the
// extent
tablet = new Tablet(TabletServer.this, locationToOpen, extent, trm, tabletsKeyValues);
@@ -2880,10 +2894,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow(MinorCompactionReason.SYSTEM)) {
throw new RuntimeException("Minor compaction after recovery fails for " + extent);
}
-
+
Assignment assignment = new Assignment(extent, getTabletSession());
TabletStateStore.setLocation(assignment);
-
+
synchronized (openingTablets) {
synchronized (onlineTablets) {
openingTablets.remove(extent);
@@ -2901,7 +2915,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
String table = extent.getTableId().toString();
ProblemReports.getInstance().report(new ProblemReport(table, TABLET_LOAD, extent.getUUID().toString(), getClientAddressString(), e));
}
-
+
if (!successful) {
synchronized (unopenedTablets) {
synchronized (openingTablets) {
@@ -2935,47 +2949,47 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
private VolumeManager fs;
private Instance instance;
-
+
private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
@SuppressWarnings("unchecked")
private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000));
-
+
private Thread majorCompactorThread;
-
+
// used for stopping the server and MasterListener thread
private volatile boolean serverStopRequested = false;
-
+
private HostAndPort clientAddress;
-
+
private TabletServerResourceManager resourceManager;
private SecurityOperation security;
private volatile boolean majorCompactorDisabled = false;
-
+
private volatile boolean shutdownComplete = false;
-
+
private ZooLock tabletServerLock;
-
+
private TServer server;
-
+
private DistributedWorkQueue bulkFailedCopyQ;
-
+
private String lockID;
-
+
private static final String METRICS_PREFIX = "tserver";
-
+
<TRUNCATED>