You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/12/03 22:16:21 UTC
[2/5] ACCUMULO-1961 Fix warnings
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3a88955/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 058973b..33082ea 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -213,20 +213,20 @@ enum ScanRunState {
public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean {
private static final Logger log = Logger.getLogger(TabletServer.class);
-
+
private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
private static long lastMemorySize = 0;
private static long gcTimeIncreasedCount;
private static AtomicLong scanCount = new AtomicLong();
private static final Class<? extends LoggerStrategy> DEFAULT_LOGGER_STRATEGY = RoundRobinLoggerStrategy.class;
-
+
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
-
+
private TabletServerLogger logger;
private LoggerStrategy loggerStrategy;
-
+
protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
-
+
public TabletServer() {
super();
watcher = new TransactionWatcher();
@@ -246,36 +246,36 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}, 5000, 5000);
}
-
+
private synchronized static void logGCInfo() {
List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
Runtime rt = Runtime.getRuntime();
-
+
StringBuilder sb = new StringBuilder("gc");
-
+
boolean sawChange = false;
-
+
long maxIncreaseInCollectionTime = 0;
-
+
for (GarbageCollectorMXBean gcBean : gcmBeans) {
Long prevTime = prevGcTime.get(gcBean.getName());
long pt = 0;
if (prevTime != null) {
pt = prevTime;
}
-
+
long time = gcBean.getCollectionTime();
-
+
if (time - pt != 0) {
sawChange = true;
}
-
+
long increaseInCollectionTime = time - pt;
sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
prevGcTime.put(gcBean.getName(), time);
}
-
+
long mem = rt.freeMemory();
if (maxIncreaseInCollectionTime == 0) {
gcTimeIncreasedCount = 0;
@@ -286,84 +286,85 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
gcTimeIncreasedCount = 0;
}
}
-
+
if (mem > lastMemorySize) {
sawChange = true;
}
-
+
String sign = "+";
if (mem - lastMemorySize <= 0) {
sign = "";
}
-
+
sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
-
+
if (sawChange) {
log.debug(sb.toString());
}
-
+
final long keepAliveTimeout = ServerConfiguration.getSystemConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
if (maxIncreaseInCollectionTime > keepAliveTimeout) {
Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
}
-
+
lastMemorySize = mem;
}
-
+
private TabletStatsKeeper statsKeeper;
-
+
private static class Session {
long lastAccessTime;
long startTime;
String user;
String client = TServerUtils.clientAddress.get();
public boolean reserved;
-
+
public void cleanup() {}
}
-
+
private static class SessionManager {
-
+
SecureRandom random;
Map<Long,Session> sessions;
-
+
SessionManager() {
random = new SecureRandom();
sessions = new HashMap<Long,Session>();
-
+
final long maxIdle = ServerConfiguration.getSystemConfiguration().getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-
+
TimerTask r = new TimerTask() {
+ @Override
public void run() {
sweep(maxIdle);
}
};
-
+
SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
}
-
+
synchronized long createSession(Session session, boolean reserve) {
long sid = random.nextLong();
-
+
while (sessions.containsKey(sid)) {
sid = random.nextLong();
}
-
+
sessions.put(sid, session);
-
+
session.reserved = reserve;
-
+
session.startTime = session.lastAccessTime = System.currentTimeMillis();
-
+
return sid;
}
-
+
/**
* while a session is reserved, it cannot be canceled or removed
*
* @param sessionId
*/
-
+
synchronized Session reserveSession(long sessionId) {
Session session = sessions.get(sessionId);
if (session != null) {
@@ -371,44 +372,44 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new IllegalStateException();
session.reserved = true;
}
-
+
return session;
-
+
}
-
+
synchronized void unreserveSession(Session session) {
if (!session.reserved)
throw new IllegalStateException();
session.reserved = false;
session.lastAccessTime = System.currentTimeMillis();
}
-
+
synchronized void unreserveSession(long sessionId) {
Session session = getSession(sessionId);
if (session != null)
unreserveSession(session);
}
-
+
synchronized Session getSession(long sessionId) {
Session session = sessions.get(sessionId);
if (session != null)
session.lastAccessTime = System.currentTimeMillis();
return session;
}
-
+
Session removeSession(long sessionId) {
Session session = null;
synchronized (this) {
session = sessions.remove(sessionId);
}
-
+
// do clean up out side of lock..
if (session != null)
session.cleanup();
-
+
return session;
}
-
+
private void sweep(long maxIdle) {
ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
synchronized (this) {
@@ -422,18 +423,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
// do clean up outside of lock
for (Session session : sessionsToCleanup) {
session.cleanup();
}
}
-
+
synchronized void removeIfNotAccessed(final long sessionId, long delay) {
Session session = sessions.get(sessionId);
if (session != null) {
final long removeTime = session.lastAccessTime;
TimerTask r = new TimerTask() {
+ @Override
public void run() {
Session sessionToCleanup = null;
synchronized (SessionManager.this) {
@@ -443,26 +445,26 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sessionToCleanup = session2;
}
}
-
+
// call clean up outside of lock
if (sessionToCleanup != null)
sessionToCleanup.cleanup();
}
};
-
+
SimpleTimer.getInstance().schedule(r, delay);
}
}
-
+
public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
for (Entry<Long,Session> entry : sessions.entrySet()) {
-
+
Session session = entry.getValue();
@SuppressWarnings("rawtypes")
ScanTask nbt = null;
String tableID = null;
-
+
if (session instanceof ScanSession) {
ScanSession ss = (ScanSession) session;
nbt = ss.nextBatchTask;
@@ -472,40 +474,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
nbt = mss.lookupTask;
tableID = mss.threadPoolExtent.getTableId().toString();
}
-
+
if (nbt == null)
continue;
-
+
ScanRunState srs = nbt.getScanRunState();
-
+
if (nbt == null || srs == ScanRunState.FINISHED)
continue;
-
+
MapCounter<ScanRunState> stateCounts = counts.get(tableID);
if (stateCounts == null) {
stateCounts = new MapCounter<ScanRunState>();
counts.put(tableID, stateCounts);
}
-
+
stateCounts.increment(srs, 1);
}
-
+
return counts;
}
-
+
public synchronized List<ActiveScan> getActiveScans() {
-
+
ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
-
+
long ct = System.currentTimeMillis();
-
+
for (Entry<Long,Session> entry : sessions.entrySet()) {
Session session = entry.getValue();
if (session instanceof ScanSession) {
ScanSession ss = (ScanSession) session;
-
+
ScanState state = ScanState.RUNNING;
-
+
ScanTask<ScanBatch> nbt = ss.nextBatchTask;
if (nbt == null) {
state = ScanState.IDLE;
@@ -517,17 +519,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
case FINISHED:
state = ScanState.IDLE;
break;
+ default:
+ break;
}
}
-
+
activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio));
-
+
} else if (session instanceof MultiScanSession) {
MultiScanSession mss = (MultiScanSession) session;
-
+
ScanState state = ScanState.RUNNING;
-
+
ScanTask<MultiScanResult> nbt = mss.lookupTask;
if (nbt == null) {
state = ScanState.IDLE;
@@ -539,44 +543,46 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
case FINISHED:
state = ScanState.IDLE;
break;
+ default:
+ break;
}
}
-
+
activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio));
}
}
-
+
return activeScans;
}
}
-
+
static class TservConstraintEnv implements Environment {
-
+
private AuthInfo credentials;
private Authenticator authenticator;
private Authorizations auths;
private KeyExtent ke;
-
+
TservConstraintEnv(Authenticator authenticator, AuthInfo credentials) {
this.authenticator = authenticator;
this.credentials = credentials;
}
-
+
void setExtent(KeyExtent ke) {
this.ke = ke;
}
-
+
@Override
public KeyExtent getExtent() {
return ke;
}
-
+
@Override
public String getUser() {
return credentials.user;
}
-
+
@Override
public Authorizations getAuthorizations() {
if (auths == null)
@@ -587,107 +593,107 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return auths;
}
-
+
}
-
+
private abstract class ScanTask<T> implements RunnableFuture<T> {
-
+
protected AtomicBoolean interruptFlag;
protected ArrayBlockingQueue<Object> resultQueue;
protected AtomicInteger state;
protected AtomicReference<ScanRunState> runState;
-
+
private static final int INITIAL = 1;
private static final int ADDED = 2;
private static final int CANCELED = 3;
-
+
ScanTask() {
interruptFlag = new AtomicBoolean(false);
runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
state = new AtomicInteger(INITIAL);
resultQueue = new ArrayBlockingQueue<Object>(1);
}
-
+
protected void addResult(Object o) {
if (state.compareAndSet(INITIAL, ADDED))
resultQueue.add(o);
else if (state.get() == ADDED)
throw new IllegalStateException("Tried to add more than one result");
}
-
+
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!mayInterruptIfRunning)
throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
-
+
if (state.get() == CANCELED)
return true;
-
+
if (state.compareAndSet(INITIAL, CANCELED)) {
interruptFlag.set(true);
resultQueue = null;
return true;
}
-
+
return false;
}
-
+
@Override
public T get() throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
-
+
@SuppressWarnings("unchecked")
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-
+
ArrayBlockingQueue<Object> localRQ = resultQueue;
-
+
if (state.get() == CANCELED)
throw new CancellationException();
-
+
if (localRQ == null && state.get() == ADDED)
throw new IllegalStateException("Tried to get result twice");
-
+
Object r = localRQ.poll(timeout, unit);
-
+
// could have been canceled while waiting
if (state.get() == CANCELED) {
if (r != null)
throw new IllegalStateException("Nothing should have been added when in canceled state");
-
+
throw new CancellationException();
}
-
+
if (r == null)
throw new TimeoutException();
-
+
// make this method stop working now that something is being
// returned
resultQueue = null;
-
+
if (r instanceof Throwable)
throw new ExecutionException((Throwable) r);
-
+
return (T) r;
}
-
+
@Override
public boolean isCancelled() {
return state.get() == CANCELED;
}
-
+
@Override
public boolean isDone() {
return runState.get().equals(ScanRunState.FINISHED);
}
-
+
public ScanRunState getScanRunState() {
return runState.get();
}
-
+
}
-
+
private static class UpdateSession extends Session {
public Tablet currentTablet;
public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
@@ -705,7 +711,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public long queuedMutationSize = 0;
TservConstraintEnv cenv = null;
}
-
+
private static class ScanSession extends Session {
public KeyExtent extent;
public HashSet<Column> columnSet;
@@ -717,7 +723,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public volatile ScanTask<ScanBatch> nextBatchTask;
public AtomicBoolean interruptFlag;
public Scanner scanner;
-
+
@Override
public void cleanup() {
try {
@@ -728,32 +734,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
scanner.close();
}
}
-
+
}
-
+
private static class MultiScanSession extends Session {
HashSet<Column> columnSet;
Map<KeyExtent,List<Range>> queries;
public List<IterInfo> ssiList;
public Map<String,Map<String,String>> ssio;
public Authorizations auths;
-
+
// stats
int numRanges;
int numTablets;
int numEntries;
long totalLookupTime;
-
+
public volatile ScanTask<MultiScanResult> lookupTask;
public KeyExtent threadPoolExtent;
-
+
@Override
public void cleanup() {
if (lookupTask != null)
lookupTask.cancel(true);
}
}
-
+
/**
* This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
* are monotonically increasing.
@@ -762,38 +768,38 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
static class WriteTracker {
private static AtomicLong operationCounter = new AtomicLong(1);
private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
-
+
WriteTracker() {
for (TabletType ttype : TabletType.values()) {
inProgressWrites.put(ttype, new TreeSet<Long>());
}
}
-
+
synchronized long startWrite(TabletType ttype) {
long operationId = operationCounter.getAndIncrement();
inProgressWrites.get(ttype).add(operationId);
return operationId;
}
-
+
synchronized void finishWrite(long operationId) {
if (operationId == -1)
return;
-
+
boolean removed = false;
-
+
for (TabletType ttype : TabletType.values()) {
removed = inProgressWrites.get(ttype).remove(operationId);
if (removed)
break;
}
-
+
if (!removed) {
throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
}
-
+
this.notifyAll();
}
-
+
synchronized void waitForWrites(TabletType ttype) {
long operationId = operationCounter.getAndIncrement();
while (inProgressWrites.get(ttype).floor(operationId) != null) {
@@ -804,34 +810,34 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
public long startWrite(Set<Tablet> keySet) {
if (keySet.size() == 0)
return -1;
-
+
ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
-
+
for (Tablet tablet : keySet)
extents.add(tablet.getExtent());
-
+
return startWrite(TabletType.type(extents));
}
}
-
+
TransactionWatcher watcher;
-
+
private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
-
+
SessionManager sessionManager;
-
+
AccumuloConfiguration acuConf = ServerConfiguration.getSystemConfiguration();
-
+
TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
-
+
TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
-
+
WriteTracker writeTracker = new WriteTracker();
-
+
ThriftClientHandler() {
super(watcher);
log.debug(ThriftClientHandler.class.getName() + " created");
@@ -844,26 +850,26 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error("Exception registering MBean with MBean Server", e);
}
}
-
+
@Override
public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
throws ThriftSecurityException {
-
+
try {
if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
-
+
List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
-
+
for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
TKeyExtent tke = entry.getKey();
Map<String,MapFileInfo> fileMap = entry.getValue();
-
+
Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
-
+
if (importTablet == null) {
failures.add(tke);
} else {
@@ -877,46 +883,46 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return failures;
}
-
+
private class NextBatchTask extends ScanTask<ScanBatch> {
-
+
private long scanID;
-
+
NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
this.scanID = scanID;
this.interruptFlag = interruptFlag;
-
+
if (interruptFlag.get())
cancel(true);
}
-
+
@Override
public void run() {
-
+
ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
try {
if (isCancelled() || scanSession == null)
return;
-
+
Thread.currentThread().setName(
"User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
runState.set(ScanRunState.RUNNING);
-
+
Tablet tablet = onlineTablets.get(scanSession.extent);
-
+
if (tablet == null) {
addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
return;
}
-
+
long t1 = System.currentTimeMillis();
ScanBatch batch = scanSession.scanner.read();
long t2 = System.currentTimeMillis();
scanSession.nbTimes.addStat(t2 - t1);
-
+
// there should only be one thing on the queue at a time, so
// it should be ok to call add()
// instead of put()... if add() fails because queue is at
@@ -939,23 +945,23 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
runState.set(ScanRunState.FINISHED);
Thread.currentThread().setName(oldThreadName);
}
-
+
}
}
-
+
private class LookupTask extends ScanTask<MultiScanResult> {
-
+
private long scanID;
-
+
LookupTask(long scanID) {
this.scanID = scanID;
}
-
+
@Override
public void run() {
MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
-
+
try {
if (isCancelled() || session == null)
return;
@@ -966,24 +972,24 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
long maxResultsSize = acuConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
long bytesAdded = 0;
long maxScanTime = 4000;
-
+
long startTime = System.currentTimeMillis();
-
+
ArrayList<KVEntry> results = new ArrayList<KVEntry>();
Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
KeyExtent partScan = null;
Key partNextKey = null;
boolean partNextKeyInclusive = false;
-
+
Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
-
+
// check the time so that the read ahead thread is not monopolized
while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
Entry<KeyExtent,List<Range>> entry = iter.next();
-
+
iter.remove();
-
+
// check that tablet server is serving requested tablet
Tablet tablet = onlineTablets.get(entry.getKey());
if (tablet == null) {
@@ -992,32 +998,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
Thread.currentThread().setName(
"Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
-
+
LookupResult lookupResult;
try {
-
+
// do the following check to avoid a race condition
// between setting false below and the task being
// canceled
if (isCancelled())
interruptFlag.set(true);
-
+
lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
session.ssio, interruptFlag);
-
+
// if the tablet was closed it it possible that the
// interrupt flag was set.... do not want it set for
// the next
// lookup
interruptFlag.set(false);
-
+
} catch (IOException e) {
log.warn("lookup failed for tablet " + entry.getKey(), e);
throw new RuntimeException(e);
}
-
+
bytesAdded += lookupResult.bytesAdded;
-
+
if (lookupResult.unfinishedRanges.size() > 0) {
if (lookupResult.closed) {
failures.put(entry.getKey(), lookupResult.unfinishedRanges);
@@ -1031,11 +1037,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
fullScans.add(entry.getKey());
}
}
-
+
long finishTime = System.currentTimeMillis();
session.totalLookupTime += (finishTime - startTime);
session.numEntries += results.size();
-
+
// convert everything to thrift before adding result
List<TKeyValue> retResults = new ArrayList<TKeyValue>();
for (KVEntry entry : results)
@@ -1064,18 +1070,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public InitialScan startScan(TInfo tinfo, AuthInfo credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated)
throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-
+
Authorizations userauths = null;
-
+
try {
if (!authenticator.hasTablePermission(credentials, credentials.user, new String(textent.getTable()), TablePermission.READ))
throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
-
+
userauths = authenticator.getUserAuthorizations(credentials, credentials.user);
for (ByteBuffer auth : authorizations)
if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
@@ -1083,11 +1089,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
-
+
scanCount.addAndGet(1);
-
+
KeyExtent extent = new KeyExtent(textent);
-
+
// wait for any writes that are in flight.. this done to ensure
// consistency across client restarts... assume a client writes
// to accumulo and dies while waiting for a confirmation from
@@ -1100,11 +1106,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// !METADATA table
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(extent));
-
+
Tablet tablet = onlineTablets.get(extent);
if (tablet == null)
throw new NotServingTabletException(textent);
-
+
ScanSession scanSession = new ScanSession();
scanSession.user = credentials.user;
scanSession.extent = new KeyExtent(extent);
@@ -1112,16 +1118,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
scanSession.ssiList = ssiList;
scanSession.ssio = ssio;
scanSession.interruptFlag = new AtomicBoolean();
-
+
for (TColumn tcolumn : columns) {
scanSession.columnSet.add(new Column(tcolumn));
}
-
+
scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, new Authorizations(authorizations), ssiList, ssio,
isolated, scanSession.interruptFlag);
-
+
long sid = sessionManager.createSession(scanSession, true);
-
+
ScanResult scanResult;
try {
scanResult = continueScan(tinfo, sid, scanSession);
@@ -1131,10 +1137,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
sessionManager.unreserveSession(sid);
}
-
+
return new InitialScan(sid, scanResult);
}
-
+
@Override
public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
@@ -1142,22 +1148,22 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (scanSession == null) {
throw new NoSuchScanIDException();
}
-
+
try {
return continueScan(tinfo, scanID, scanSession);
} finally {
sessionManager.unreserveSession(scanSession);
}
}
-
+
private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-
+
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
}
-
+
ScanBatch bresult;
try {
bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
@@ -1187,32 +1193,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("Failed to get next batch", t);
throw new RuntimeException(t);
}
-
+
ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
-
+
scanSession.entriesReturned += scanResult.results.size();
-
+
scanSession.batchCount++;
-
+
if (scanResult.more && scanSession.batchCount > 3) {
// start reading next batch while current batch is transmitted
// to client
scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
}
-
+
if (!scanResult.more)
closeScan(tinfo, scanID);
-
+
return scanResult;
}
-
+
@Override
public void closeScan(TInfo tinfo, long scanID) {
ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
if (ss != null) {
long t2 = System.currentTimeMillis();
-
+
log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
.toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
if (scanMetrics.isEnabled()) {
@@ -1221,7 +1227,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public InitialMultiScan startMultiScan(TInfo tinfo, AuthInfo credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
@@ -1230,14 +1236,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
for (TKeyExtent keyExtent : tbatch.keySet()) {
tables.add(new String(keyExtent.getTable()));
}
-
+
// check if user has permission to the tables
Authorizations userauths = null;
try {
for (String table : tables)
if (!authenticator.hasTablePermission(credentials, credentials.user, table, TablePermission.READ))
throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
-
+
userauths = authenticator.getUserAuthorizations(credentials, credentials.user);
for (ByteBuffer auth : authorizations)
if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
@@ -1245,11 +1251,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
-
+
KeyExtent threadPoolExtent = null;
-
+
Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, Translator.TKET, new Translator.ListTranslator<TRange,Range>(Translator.TRT));
-
+
for (KeyExtent keyExtent : batch.keySet()) {
if (threadPoolExtent == null) {
threadPoolExtent = keyExtent;
@@ -1259,12 +1265,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
&& !threadPoolExtent.getTableId().toString().equals(Constants.METADATA_TABLE_ID)) {
throw new IllegalArgumentException("Cannot batch query !METADATA and non !METADATA tablets " + threadPoolExtent + " " + keyExtent);
}
-
+
}
-
+
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(batch.keySet()));
-
+
MultiScanSession mss = new MultiScanSession();
mss.user = credentials.user;
mss.queries = batch;
@@ -1272,19 +1278,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
mss.ssiList = ssiList;
mss.ssio = ssio;
mss.auths = new Authorizations(authorizations);
-
+
mss.numTablets = batch.size();
for (List<Range> ranges : batch.values()) {
mss.numRanges += ranges.size();
}
-
+
for (TColumn tcolumn : tcolumns)
mss.columnSet.add(new Column(tcolumn));
-
+
mss.threadPoolExtent = threadPoolExtent;
-
+
long sid = sessionManager.createSession(mss, true);
-
+
MultiScanResult result;
try {
result = continueMultiScan(tinfo, sid, mss);
@@ -1294,34 +1300,34 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
sessionManager.unreserveSession(sid);
}
-
+
scanCount.addAndGet(batch.size());
return new InitialMultiScan(sid, result);
}
-
+
@Override
public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
-
+
MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
-
+
if (session == null) {
throw new NoSuchScanIDException();
}
-
+
try {
return continueMultiScan(tinfo, scanID, session);
} finally {
sessionManager.unreserveSession(session);
}
}
-
+
private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
-
+
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(scanID);
resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
}
-
+
try {
MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
session.lookupTask = null;
@@ -1339,19 +1345,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException(t);
}
}
-
+
@Override
public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
if (session == null) {
throw new NoSuchScanIDException();
}
-
+
long t2 = System.currentTimeMillis();
log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
}
-
+
@Override
public long startUpdate(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException {
// Make sure user is real
@@ -1364,17 +1370,17 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
-
+
UpdateSession us = new UpdateSession();
us.violations = new Violations();
us.credentials = credentials;
us.cenv = new TservConstraintEnv(authenticator, credentials);
-
+
long sid = sessionManager.createSession(us, false);
-
+
return sid;
}
-
+
private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
long t1 = System.currentTimeMillis();
if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
@@ -1384,7 +1390,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// if there were previous failures, then do not accept additional writes
return;
}
-
+
try {
// if user has no permission to write to this table, add it to
// the failures list
@@ -1425,18 +1431,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
return;
}
}
-
+
@Override
public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
if (us == null) {
throw new RuntimeException("No Such SessionID");
}
-
+
try {
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
setUpdateTablet(us, keyExtent);
-
+
if (us.currentTablet != null) {
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
@@ -1451,33 +1457,33 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sessionManager.unreserveSession(us);
}
}
-
+
private void flush(UpdateSession us) {
-
+
int mutationCount = 0;
Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
Throwable error = null;
-
+
long pt1 = System.currentTimeMillis();
-
+
boolean containsMetadataTablet = false;
for (Tablet tablet : us.queuedMutations.keySet())
if (tablet.getExtent().getTableId().toString().equals(Constants.METADATA_TABLE_ID))
containsMetadataTablet = true;
-
+
if (!containsMetadataTablet && us.queuedMutations.size() > 0)
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-
+
Span prep = Trace.start("prep");
for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
-
+
Tablet tablet = entry.getKey();
List<Mutation> mutations = entry.getValue();
if (mutations.size() > 0) {
try {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
-
+
CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
if (commitSession == null) {
if (us.currentTablet == tablet) {
@@ -1488,12 +1494,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sendables.put(commitSession, mutations);
mutationCount += mutations.size();
}
-
+
} catch (TConstraintViolationException e) {
us.violations.add(e.getViolations());
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
-
+
if (e.getNonViolators().size() > 0) {
// only log and commit mutations if there were some
// that did not
@@ -1502,9 +1508,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// expects
sendables.put(e.getCommitSession(), e.getNonViolators());
}
-
+
mutationCount += mutations.size();
-
+
} catch (HoldTimeoutException t) {
error = t;
log.debug("Giving up on mutations due to a long memory hold time");
@@ -1517,14 +1523,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
prep.stop();
-
+
Span wal = Trace.start("wal");
long pt2 = System.currentTimeMillis();
long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size());
us.prepareTimes.addStat(pt2 - pt1);
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (avgPrepareTime));
-
+
if (error != null) {
for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
e.getKey().abortCommit(e.getValue());
@@ -1535,14 +1541,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
while (true) {
try {
long t1 = System.currentTimeMillis();
-
+
logger.logManyTablets(sendables);
-
+
long t2 = System.currentTimeMillis();
us.walogTimes.addStat(t2 - t1);
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1));
-
+
break;
} catch (IOException ex) {
log.warn("logging mutations failed, retrying");
@@ -1551,19 +1557,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException(t);
}
}
-
+
wal.stop();
-
+
Span commit = Trace.start("commit");
long t1 = System.currentTimeMillis();
for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
CommitSession commitSession = entry.getKey();
List<Mutation> mutations = entry.getValue();
-
+
commitSession.commit(mutations);
-
+
Tablet tablet = commitSession.getTablet();
-
+
if (tablet == us.currentTablet) {
// because constraint violations may filter out some
// mutations, for proper
@@ -1575,12 +1581,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
long t2 = System.currentTimeMillis();
-
+
long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
-
+
us.flushTime += (t2 - pt1);
us.commitTimes.addStat(t2 - t1);
-
+
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime);
commit.stop();
@@ -1593,25 +1599,25 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
us.totalUpdates += mutationCount;
}
-
+
@Override
public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
if (us == null) {
throw new NoSuchScanIDException();
}
-
+
// clients may or may not see data from an update session while
// it is in progress, however when the update session is closed
// want to ensure that reads wait for the write to finish
long opid = writeTracker.startWrite(us.queuedMutations.keySet());
-
+
try {
flush(us);
} finally {
writeTracker.finishWrite(opid);
}
-
+
log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
(System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
@@ -1628,11 +1634,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
KeyExtent first = us.authFailures.iterator().next();
log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
}
-
+
return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
us.authFailures, Translator.KET));
}
-
+
@Override
public void update(TInfo tinfo, AuthInfo credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
ConstraintViolationException, ThriftSecurityException {
@@ -1642,29 +1648,29 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
-
+
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
if (tablet == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
if (!keyExtent.getTableId().toString().equals(Constants.METADATA_TABLE_ID))
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-
+
long opid = writeTracker.startWrite(TabletType.type(keyExtent));
-
+
try {
Mutation mutation = new Mutation(tmutation);
List<Mutation> mutations = Collections.singletonList(mutation);
-
+
Span prep = Trace.start("prep");
CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(authenticator, credentials), mutations);
prep.stop();
if (cs == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
while (true) {
try {
Span wal = Trace.start("wal");
@@ -1675,7 +1681,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn(ex, ex);
}
}
-
+
Span commit = Trace.start("commit");
cs.commit(mutations);
commit.stop();
@@ -1685,7 +1691,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
writeTracker.finishWrite(opid);
}
}
-
+
@Override
public void splitTablet(TInfo tinfo, AuthInfo credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
ThriftSecurityException {
@@ -1698,14 +1704,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
-
+
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-
+
Tablet tablet = onlineTablets.get(keyExtent);
if (tablet == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
try {
if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
@@ -1717,12 +1723,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public TabletServerStatus getTabletServerStatus(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
return getStats(sessionManager.getActiveScansPerTable());
}
-
+
@Override
public List<TabletStats> getTabletStats(TInfo tinfo, AuthInfo credentials, String tableId) throws ThriftSecurityException, TException {
TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
@@ -1747,9 +1753,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return result;
}
-
+
private ZooCache masterLockCache = new ZooCache();
-
+
private void checkPermission(AuthInfo credentials, String lock, boolean requiresSystemPermission, final String request) throws ThriftSecurityException {
if (requiresSystemPermission) {
boolean fatal = false;
@@ -1769,6 +1775,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
if (fatal) {
Halt.halt(1, new Runnable() {
+ @Override
public void run() {
logGCInfo();
}
@@ -1776,23 +1783,24 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
log.warn("Got " + request + " message from master before lock acquired, ignoring...");
throw new RuntimeException("Lock not acquired");
}
-
+
if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
Halt.halt(1, new Runnable() {
+ @Override
public void run() {
log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting");
logGCInfo();
}
});
}
-
+
ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_LOCK, lock);
-
+
try {
if (!ZooLock.isLockHeld(masterLockCache, lid)) {
// maybe the cache is out of date and a new master holds the
@@ -1807,7 +1815,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException("bad master lock", e);
}
}
-
+
@Override
public void loadTablet(TInfo tinfo, AuthInfo credentials, String lock, final TKeyExtent textent) {
try {
@@ -1816,17 +1824,17 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
final KeyExtent extent = new KeyExtent(textent);
-
+
synchronized (unopenedTablets) {
synchronized (openingTablets) {
synchronized (onlineTablets) {
-
+
// checking if this exact tablet is in any of the sets
// below is not a strong enough check
// when splits and fix splits occurring
-
+
Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
@@ -1834,26 +1842,27 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
all.addAll(unopenedOverlapping);
all.addAll(openingOverlapping);
all.addAll(onlineOverlapping);
-
+
if (!all.isEmpty()) {
if (all.size() != 1 || !all.contains(extent)) {
log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
}
return;
}
-
+
unopenedTablets.add(extent);
}
}
}
-
+
// add the assignment job to the appropriate queue
log.info("Loading tablet " + extent);
-
+
final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
// Root tablet assignment must take place immediately
if (extent.compareTo(Constants.ROOT_TABLET_EXTENT) == 0) {
new Thread("Root Tablet Assignment") {
+ @Override
public void run() {
ah.run();
if (onlineTablets.containsKey(extent)) {
@@ -1861,7 +1870,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} else {
log.info("Root tablet failed to load");
}
-
+
}
}.start();
} else {
@@ -1872,7 +1881,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public void unloadTablet(TInfo tinfo, AuthInfo credentials, String lock, TKeyExtent textent, boolean save) {
try {
@@ -1881,12 +1890,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
KeyExtent extent = new KeyExtent(textent);
-
+
resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save)));
}
-
+
@Override
public void flush(TInfo tinfo, AuthInfo credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) {
try {
@@ -1895,19 +1904,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
ArrayList<Tablet> tabletsToFlush = new ArrayList<Tablet>();
-
+
KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
-
+
synchronized (onlineTablets) {
for (Tablet tablet : onlineTablets.values())
if (ke.overlaps(tablet.getExtent()))
tabletsToFlush.add(tablet);
}
-
+
Long flushID = null;
-
+
for (Tablet tablet : tabletsToFlush) {
if (flushID == null) {
// read the flush id once from zookeeper instead of reading
@@ -1923,7 +1932,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
tablet.flush(flushID);
}
}
-
+
@Override
public void flushTablet(TInfo tinfo, AuthInfo credentials, String lock, TKeyExtent textent) throws TException {
try {
@@ -1942,13 +1951,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public void halt(TInfo tinfo, AuthInfo credentials, String lock) throws ThriftSecurityException {
-
+
checkPermission(credentials, lock, true, "halt");
-
+
Halt.halt(0, new Runnable() {
+ @Override
public void run() {
log.info("Master requested tablet server halt");
logGCInfo();
@@ -1961,7 +1971,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
});
}
-
+
@Override
public void fastHalt(TInfo info, AuthInfo credentials, String lock) {
try {
@@ -1970,30 +1980,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("Error halting", e);
}
}
-
+
@Override
public TabletStats getHistoricalStats(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
return statsKeeper.getTabletStats();
}
-
+
@Override
public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {
loggerStrategy.preferLoggers(loggers);
}
-
+
@Override
public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
-
+
try {
if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
-
+
return sessionManager.getActiveScans();
}
-
+
@Override
public void chop(TInfo tinfo, AuthInfo credentials, String lock, TKeyExtent textent) throws TException {
try {
@@ -2003,13 +2013,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException(e);
}
KeyExtent ke = new KeyExtent(textent);
-
+
Tablet tablet = onlineTablets.get(ke);
if (tablet != null) {
tablet.chopFiles();
}
}
-
+
@Override
public void compact(TInfo tinfo, AuthInfo credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
try {
@@ -2018,18 +2028,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
-
+
ArrayList<Tablet> tabletsToCompact = new ArrayList<Tablet>();
synchronized (onlineTablets) {
for (Tablet tablet : onlineTablets.values())
if (ke.overlaps(tablet.getExtent()))
tabletsToCompact.add(tablet);
}
-
+
Long compactionId = null;
-
+
for (Tablet tablet : tabletsToCompact) {
// all for the same table id, so only need to read
// compaction id once
@@ -2042,18 +2052,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
tablet.compactAll(compactionId);
}
-
+
}
-
+
}
-
+
private class SplitRunner implements Runnable {
private Tablet tablet;
-
+
public SplitRunner(Tablet tablet) {
this.tablet = tablet;
}
-
+
@Override
public void run() {
if (majorCompactorDisabled) {
@@ -2061,33 +2071,34 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// initiated exit
return;
}
-
+
splitTablet(tablet);
}
}
-
+
boolean isMajorCompactionDisabled() {
return majorCompactorDisabled;
}
-
+
private class MajorCompactor implements Runnable {
private AccumuloConfiguration acuConf = ServerConfiguration.getSystemConfiguration();
-
+
+ @Override
public void run() {
while (!majorCompactorDisabled) {
try {
UtilWaitThread.sleep(acuConf.getTimeInMillis(Property.TSERV_MAJC_DELAY));
-
+
TreeMap<KeyExtent,Tablet> copyOnlineTablets = new TreeMap<KeyExtent,Tablet>();
-
+
synchronized (onlineTablets) {
copyOnlineTablets.putAll(onlineTablets); // avoid
// concurrent
// modification
}
-
+
int numMajorCompactionsInProgress = 0;
-
+
Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
while (iter.hasNext() && !majorCompactorDisabled) { // bail
// early
@@ -2096,26 +2107,26 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// we're
// shutting
// down
-
+
Entry<KeyExtent,Tablet> entry = iter.next();
-
+
Tablet tablet = entry.getValue();
-
+
// if we need to split AND compact, we need a good way
// to decide what to do
if (tablet.needsSplit()) {
resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
continue;
}
-
+
int maxLogEntriesPerTablet = ServerConfiguration.getTableConfiguration(tablet.getExtent().getTableId().toString()).getCount(
Property.TABLE_MINC_LOGS_MAX);
-
+
if (tablet.getLogCount() >= maxLogEntriesPerTablet) {
log.debug("Initiating minor compaction for " + tablet.getExtent() + " because it has " + tablet.getLogCount() + " write ahead logs");
tablet.initiateMinorCompaction();
}
-
+
synchronized (tablet) {
if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL) || tablet.majorCompactionQueued() || tablet.majorCompactionRunning()) {
numMajorCompactionsInProgress++;
@@ -2123,18 +2134,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
int idleCompactionsToStart = Math.max(1, acuConf.getCount(Property.TSERV_MAJC_MAXCONCURRENT) / 2);
-
+
if (numMajorCompactionsInProgress < idleCompactionsToStart) {
// system is not major compacting, can schedule some
// idle compactions
iter = copyOnlineTablets.entrySet().iterator();
-
+
while (iter.hasNext() && !majorCompactorDisabled && numMajorCompactionsInProgress < idleCompactionsToStart) {
Entry<KeyExtent,Tablet> entry = iter.next();
Tablet tablet = entry.getValue();
-
+
if (tablet.initiateMajorCompaction(MajorCompactionReason.IDLE)) {
numMajorCompactionsInProgress++;
}
@@ -2147,10 +2158,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
private void splitTablet(Tablet tablet) {
try {
-
+
TreeMap<KeyExtent,SplitInfo> tabletInfo = splitTablet(tablet, null);
if (tabletInfo == null) {
// either split or compact not both
@@ -2166,34 +2177,34 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error("Unknown error on split: " + e, e);
}
}
-
+
private TreeMap<KeyExtent,SplitInfo> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException {
long t1 = System.currentTimeMillis();
-
+
TreeMap<KeyExtent,SplitInfo> tabletInfo = tablet.split(splitPoint);
if (tabletInfo == null) {
return null;
}
-
+
log.info("Starting split: " + tablet.getExtent());
statsKeeper.incrementStatusSplit();
long start = System.currentTimeMillis();
-
+
Tablet[] newTablets = new Tablet[2];
-
+
Entry<KeyExtent,SplitInfo> first = tabletInfo.firstEntry();
newTablets[0] = new Tablet(TabletServer.this, new Text(first.getValue().dir), first.getKey(), resourceManager.createTabletResourceManager(),
first.getValue().datafiles, first.getValue().time, first.getValue().initFlushID, first.getValue().initCompactID);
-
+
Entry<KeyExtent,SplitInfo> last = tabletInfo.lastEntry();
newTablets[1] = new Tablet(TabletServer.this, new Text(last.getValue().dir), last.getKey(), resourceManager.createTabletResourceManager(),
last.getValue().datafiles, last.getValue().time, last.getValue().initFlushID, last.getValue().initCompactID);
-
+
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
statsKeeper.saveMinorTimes(tablet.timer);
statsKeeper.saveMajorTimes(tablet.timer);
-
+
// lose the reference to the old tablet and open two new ones
synchronized (onlineTablets) {
onlineTablets.remove(tablet.getExtent());
@@ -2203,39 +2214,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// tell the master
enqueueMasterMessage(new SplitReportMessage(tablet.getExtent(), newTablets[0].getExtent(), new Text("/" + newTablets[0].getLocation().getName()),
newTablets[1].getExtent(), new Text("/" + newTablets[1].getLocation().getName())));
-
+
statsKeeper.updateTime(Operation.SPLIT, start, 0, false);
long t2 = System.currentTimeMillis();
log.info("Tablet split: " + tablet.getExtent() + " size0 " + newTablets[0].estimateTabletSize() + " size1 " + newTablets[1].estimateTabletSize() + " time "
+ (t2 - t1) + "ms");
-
+
return tabletInfo;
}
-
+
public long lastPingTime = System.currentTimeMillis();
public Socket currentMaster;
-
+
// a queue to hold messages that are to be sent back to the master
private BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
-
+
// add a message for the main thread to send back to the master
void enqueueMasterMessage(MasterMessage m) {
masterMessages.addLast(m);
}
-
+
private class UnloadTabletHandler implements Runnable {
private KeyExtent extent;
private boolean saveState;
-
+
public UnloadTabletHandler(KeyExtent extent, boolean saveState) {
this.extent = extent;
this.saveState = saveState;
}
-
+
+ @Override
public void run() {
-
+
Tablet t = null;
-
+
synchronized (unopenedTablets) {
if (unopenedTablets.contains(extent)) {
unopenedTablets.remove(extent);
@@ -2255,7 +2267,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
t = onlineTablets.get(extent);
}
}
-
+
if (t == null) {
// Tablet has probably been recently unloaded: repeated master
// unload request is crossing the successful unloaded message
@@ -2265,26 +2277,26 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return;
}
-
+
try {
t.close(saveState);
} catch (Throwable e) {
-
+
if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) {
log.debug("Failed to unload tablet " + extent + "... it was alread closing or closed : " + e.getMessage());
} else {
log.error("Failed to close tablet " + extent + "... Aborting migration", e);
}
-
+
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOAD_ERROR, extent));
return;
}
-
+
// stop serving tablet - client will get not serving tablet
// exceptions
recentlyUnloadedCache.put(extent, System.currentTimeMillis());
onlineTablets.remove(extent);
-
+
try {
TServerInstance instance = new TServerInstance(clientAddress, getLock().getSessionId());
TabletLocationState tls = new TabletLocationState(extent, null, instance, null, null, false);
@@ -2297,38 +2309,39 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (InterruptedException e) {
log.warn("Interrupted while getting our zookeeper session information", e);
}
-
+
// tell the master how it went
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOADED, extent));
-
+
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
statsKeeper.saveMinorTimes(t.timer);
statsKeeper.saveMajorTimes(t.timer);
-
+
log.info("unloaded " + extent);
-
+
}
}
-
+
private class AssignmentHandler implements Runnable {
private KeyExtent extent;
private int retryAttempt = 0;
-
+
public AssignmentHandler(KeyExtent extent) {
this.extent = extent;
}
-
+
public AssignmentHandler(KeyExtent extent, int retryAttempt) {
this(extent);
this.retryAttempt = retryAttempt;
}
-
+
+ @Override
public void run() {
log.info(clientAddress + ": got assignment from master: " + extent);
-
+
final boolean isMetaDataTablet = extent.getTableId().toString().compareTo(Constants.METADATA_TABLE_ID) == 0;
-
+
synchronized (unopenedTablets) {
synchronized (openingTablets) {
synchronized (onlineTablets) {
@@ -2337,23 +2350,23 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
-
+
if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent))
return;
-
+
if (!unopenedTablets.contains(extent) || unopenedOverlapping.size() != 1 || openingOverlapping.size() > 0 || onlineOverlapping.size() > 0) {
throw new IllegalStateException("overlaps assigned " + extent + " " + !unopenedTablets.contains(extent) + " " + unopenedOverlapping + " "
+ openingOverlapping + " " + onlineOverlapping);
}
}
-
+
unopenedTablets.remove(extent);
openingTablets.add(extent);
}
}
-
+
log.debug("Loading extent: " + extent);
-
+
// check Metadata table before accepting assignment
SortedMap<KeyExtent,Text> tabletsInRange = null;
SortedMap<Key,Value> tabletsKeyValues = new TreeMap<Key,Value>();
@@ -2367,7 +2380,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("Failed to verify tablet " + extent, e);
throw new RuntimeException(e);
}
-
+
if (tabletsInRange == null) {
log.info("Reporting tablet " + extent + " assignment failure: unable to verify Tablet Information");
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
@@ -2379,9 +2392,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
// If extent given is not the one to be opened, update
if (tabletsInRange.size() != 1 || !tabletsInRange.containsKey(extent)) {
-
+
tabletsKeyValues.clear();
-
+
synchronized (openingTablets) {
openingTablets.remove(extent);
openingTablets.notifyAll();
@@ -2403,15 +2416,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.debug("Master didn't know " + extent + " was split, letting it know about " + tabletsInRange.keySet());
enqueueMasterMessage(new SplitReportMessage(extent, tabletsInRange));
}
-
+
// create the tablet object
for (Entry<KeyExtent,Text> entry : tabletsInRange.entrySet()) {
Tablet tablet = null;
boolean successful = false;
-
+
final KeyExtent extentToOpen = entry.getKey();
Text locationToOpen = entry.getValue();
-
+
if (onlineTablets.containsKey(extentToOpen)) {
// know this was from fixing a split, because initial check
// would have caught original extent
@@ -2422,10 +2435,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
continue;
}
-
+
try {
TabletResourceManager trm = resourceManager.createTabletResourceManager();
-
+
// this opens the tablet file and fills in the endKey in the
// extent
tablet = new Tablet(TabletServer.this, locationToOpen, extentToOpen, trm, tabletsKeyValues);
@@ -2442,7 +2455,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow()) {
throw new RuntimeException("Minor compaction after recovery fails for " + extentToOpen);
}
-
+
Assignment assignment = new Assignment(extentToOpen, getTabletSession());
TabletStateStore.setLocation(assignment);
@@ -2463,7 +2476,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
String table = extent.getTableId().toString();
ProblemReports.getInstance().report(new ProblemReport(table, TABLET_LOAD, extentToOpen.getUUID().toString(), getClientAddressString(), e));
}
-
+
if (!successful) {
synchronized (unopenedTablets) {
synchronized (openingTablets) {
@@ -2498,44 +2511,44 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
private FileSystem fs;
private Configuration conf;
private ZooCache cache;
-
+
private SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
private SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
private SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
@SuppressWarnings("unchecked")
- private Map<KeyExtent,Long> recentlyUnloadedCache = (Map<KeyExtent, Long>)Collections.synchronizedMap(new LRUMap(1000));
-
+ private Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000));
+
private Thread majorCompactorThread;
-
+
// used for stopping the server and MasterListener thread
private volatile boolean serverStopRequested = false;
-
+
private InetSocketAddress clientAddress;
-
+
private TabletServerResourceManager resourceManager;
private Authenticator authenticator;
private volatile boolean majorCompactorDisabled = false;
-
+
private volatile boolean shutdownComplete = false;
-
+
private ZooLock tabletServerLock;
-
+
private TServer server;
-
+
private DistributedWorkQueue bulkFailedCopyQ;
-
+
private static final String METRICS_PREFIX = "tserver";
-
+
private static ObjectName OBJECT_NAME = null;
-
+
public TabletStatsKeeper getStatsKeeper() {
return statsKeeper;
}
-
+
public Set<String> getLoggers() throws TException, MasterNotRunningException, ThriftSecurityException {
Set<String> allLoggers = new HashSet<String>();
String dir = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZLOGGERS;
@@ -2556,7 +2569,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("strategy returned no useful loggers");
return result;
}
-
+
public void addLoggersToMetadata(List<RemoteLogger> logs, KeyExtent extent, int id) {
log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
if (!this.onlineTablets.containsKey(extent)) {
@@ -2564,7 +2577,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// if it doesn't, we'll need to do the same recovery with the old files.
return;
}
-
+
List<MetadataTable.LogEntry> entries = new ArrayList<MetadataTable.LogEntry>();
long now = RelativeTime.currentTimeMillis();
List<String> logSet = new ArrayList<String>();
@@ -2582,14 +2595,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
MetadataTable.addLogEntries(SecurityConstants.getSystemCredentials(), entries, getLock());
}
-
+
private int startServer(Property portHint, TProcessor processor, String threadName) throws UnknownHostException {
ServerPort sp = TServerUtils.startServer(portHint, processor, this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH,
Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK);
this.server = sp.server;
return sp.port;
}
-
+
private String getMasterAddress() {
try {
List<String> locations = HdfsZooInstance.getInstance().getMasterLocations();
@@ -2599,10 +2612,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (Exception e) {
log.warn("Failed to obtain master host " + e);
}
-
+
return null;
}
-
+
// Connect to the master for posting asynchronous results
private MasterClientService.Iface masterConnection(String address) {
try {
@@ -2618,11 +2631,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return null;
}
-
+
private void returnMasterConnection(MasterClientService.Iface client) {
ThriftUtil.returnClient(client);
}
-
+
private int startTabletClientService() throws UnknownHostException {
// start listening for client connection last
TabletClientService.Iface tch = TraceWrap.service(new ThriftClientHandler());
@@ -2631,25 +2644,26 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.info("port = " + port);
return port;
}
-
+
ZooLock getLock() {
return tabletServerLock;
}
-
+
private void announceExistence() {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
try {
String zPath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTSERVERS + "/" + getClientAddressString();
-
+
zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
-
+
tabletServerLock = new ZooLock(zPath);
-
+
LockWatcher lw = new LockWatcher() {
-
+
@Override
public void lostLock(final LockLossReason reason) {
Halt.halt(0, new Runnable() {
+ @Override
public void run() {
if (!serverStopRequested)
log.fatal("Lost tablet server lock (reason = " + reason + "), exiting.");
@@ -2657,7 +2671,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
});
}
-
+
@Override
public void unableToMonitorLockNode(final Throwable e) {
Halt.halt(0, new Runnable() {
@@ -2666,14 +2680,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
}
});
-
+
}
};
-
+
byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT).toString().getBytes();
for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP);
-
+
if (tabletServerLock.tryLock(lw, lockContent)) {
log.debug("Obtained tablet server lock " + tabletServerLock.getLockPath());
return;
@@ -2689,7 +2703,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException(e);
}
}
-
+
// main loop listens for client requests
public void run() {
SecurityUtil.serverLogin();
@@ -2706,10 +2720,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
announceExistence();
-
+
ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
- ServerConfiguration.getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
- new NamingThreadFactory("distributed work queue"));
+ ServerConfiguration.getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), new NamingThreadFactory("distributed work queue"));
bulkFailedCopyQ = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ);
try {
@@ -2727,27 +2740,27 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (Exception e) {
log.error("Error registering with JMX", e);
}
-
+
String masterHost;
while (!serverStopRequested) {
// send all of the pending messages
try {
MasterMessage mm = null;
MasterClientService.Iface iface = null;
-
+
try {
// wait until a message is ready to send, or a sever stop
// was requested
while (mm == null && !serverStopRequested) {
mm = masterMessages.poll(1000, TimeUnit.MILLISECONDS);
}
-
+
// have a message to send to the master, so grab a
// connection
masterHost = getMasterAddress();
iface = masterConnection(masterHost);
TServiceClient client = (TServiceClient) iface;
-
+
// if while loop does not execute at all and mm != null,
// then
// finally block should place mm back on queue
@@ -2762,25 +2775,25 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
mm = null;
throw ex;
}
-
+
// if any messages are immediately available grab em and
// send them
mm = masterMessages.poll();
}
-
+
} finally {
-
+
if (mm != null) {
masterMessages.putFirst(mm);
}
returnMasterConnection(iface);
-
+
UtilWaitThread.sleep(1000);
}
} catch (InterruptedException e) {
log.info("Interrupt Exception received, shutting down");
serverStopRequested = true;
-
+
} catch (Exception e) {
// may have lost connection with master
// loop back to the beginning and wait for a new one
@@ -2788,7 +2801,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(getClientAddressString() + ": TServerInfo: Exception. Master down?", e);
}
}
-
+
// wait for shutdown
// if the main thread exits oldServer the master listener, the JVM will
// kill the
@@ -2808,27 +2821,27 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
log.debug("Stopping Thrift Servers");
TServerUtils.stopTServer(server);
-
+
try {
log.debug("Closing filesystem");
fs.close();
} catch (IOException e) {
log.warn("Failed to close filesystem : " + e.getMessage(), e);
}
-
+
logGCInfo();
-
+
log.info("TServerInfo: stop requested. exiting ... ");
-
+
try {
tabletServerLock.unlock();
} catch (Exception e) {
log.warn("Failed to release tablet server lock", e);
}
}
-
+
private long totalMinorCompactions;
-
+
public static SortedMap<KeyExtent,Text> verifyTabletInformation(KeyExtent extent, TServerInstance instance, SortedMap<Key,Value> tabletsKeyValues,
String clientAddress, ZooLock lock) throws AccumuloSecurityException, DistributedStoreException {
for (int tries = 0; tries < 3; tries++) {
@@ -2849,29 +2862,29 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
set.put(extent, new Text(Constants.ZROOT_TABLET));
return set;
}
-
+
List<ColumnFQ> columnsToFetch = Arrays.asList(new ColumnFQ[] {Constants.METADATA_DIRECTORY_COLUMN, Constants.METADATA_PREV_ROW_COLUMN,
Constants.METADATA_SPLIT_RATIO_COLUMN, Constants.METADATA_OLD_PREV_ROW_COLUMN, Constants.METADATA_TIME_COLUMN});
-
+
if (tabletsKeyValues == null) {
tabletsKeyValues = new TreeMap<Key,Value>();
}
MetadataTable.getTabletAndPrevTabletKeyValues(tabletsKeyValues, extent, null, SecurityConstants.getSystemCredentials());
-
+
SortedMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries;
tabletEntries = MetadataTable.getTabletEntries(tabletsKeyValues, columnsToFetch);
-
+
if (tabletEntries.size() == 0) {
log.warn("Failed to find any metadata entries for " + extent);
return null;
}
-
+
// ensure lst key in map is same as extent that was passed in
if (!tabletEntries.lastKey().equals(extent.getMetadataEntry())) {
log.warn("Failed to find metadata entry for " + extent + " found " + tabletEntries.lastKey());
return null;
}
-
+
TServerInstance future = null;
Text metadataEntry = extent.getMetadataEntry();
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
@@ -2894,18 +2907,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("Table " + extent + " has been assigned to " + future + " which is not " + instance);
return null;
}
-
+
// look for incomplete splits
int splitsFixed = 0;
for (Entry<Text,SortedMap<ColumnFQ,Value>> entry : tabletEntries.entrySet()) {
-
+
if (extent.getPrevEndRow() != null) {
Text prevRowMetadataEntry = new Text(KeyExtent.getMetadataEntry(extent.getTableId(), extent.getPrevEndRow()));
if (entry.getKey().compareTo(prevRowMetadataEntry) <= 0) {
continue;
}
}
-
+
if (entry.getValue().containsKey(Constants.METADATA_OLD_PREV_ROW_COLUMN)) {
KeyExtent fixedke = MetadataTable.fixSplit(entry.getKey(), entry.getValue(), instance, SecurityConstants.getSystemCredentials(), lock);
if (fixedke != null) {
@@ -2917,48 +2930,48 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
if (splitsFixed > 0) {
// reread and reverify metadata entries now that metadata
// entries were fixed
tabletsKeyValues.clear();
return verifyTabletInformation(extent, instance, null, clientAddress, lock);
}
-
+
SortedMap<KeyExtent,Text> children = new TreeMap<KeyExtent,Text>();
-
+
for (Entry<Text,SortedMap<ColumnFQ,Value>> entry : tabletEntries.entrySet()) {
if (extent.getPrevEndRow() != null) {
Text prevRowMetadataEntry = new Text(KeyExtent.getMetadataEntry(extent.getTableId(), extent.getPrevEndRow()));
-
+
if (entry.getKey().compareTo(prevRowMetadataEntry) <= 0) {
continue;
}
}
-
+
Value prevEndRowIBW = entry.getValue().get(Constants.METADATA_PREV_ROW_COLUMN);
if (prevEndRowIBW == null) {
log.warn("Metadata entry does not have prev row (" + entry.getKey() + ")");
return null;
}
-
+
Value dirIBW = entry.getValue().get(Constants.METADATA_DIRECTORY_COLUMN);
if (dirIBW == null) {
log.warn("Metadata entry does not have directory (" + entry.getKey() + ")");
return null;
}
-
+
Text dir = new Text(dirIBW.get());
-
+
KeyExtent child = new KeyExtent(entry.getKey(), prevEndRowIBW);
children.put(child, dir);
}
-
+
if (!MetadataTable.isContiguousRange(extent, new TreeSet<KeyExtent>(children.keySet()))) {
<TRUNCATED>