You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/01/10 16:15:37 UTC
[2/7] ACCUMULO-2160 back-port real bugs found by findbugs
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cb50a743/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index aa52834,0000000..c5695bc
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@@ -1,3602 -1,0 +1,3602 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.client.impl.TabletType;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.constraints.Constraint.Environment;
+import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.InitialMultiScan;
+import org.apache.accumulo.core.data.thrift.InitialScan;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TKey;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TKeyValue;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.master.thrift.Compacting;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
+import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
+import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
+import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
+import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.server.tabletserver.Tablet.ScanBatch;
+import org.apache.accumulo.server.tabletserver.Tablet.Scanner;
+import org.apache.accumulo.server.tabletserver.Tablet.SplitInfo;
+import org.apache.accumulo.server.tabletserver.Tablet.TConstraintViolationException;
+import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
+import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.LogSorter;
+import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
+import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
+import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
+import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage;
+import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean;
+import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
+import org.apache.accumulo.server.tabletserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.server.tabletserver.metrics.TabletServerUpdateMetrics;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.accumulo.server.util.FileSystemMonitor;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.MapCounter;
+import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.accumulo.server.util.MetadataTable.LogEntry;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerPort;
+import org.apache.accumulo.server.util.time.RelativeTime;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.Platform;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.start.classloader.vfs.ContextManager;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.server.TServer;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+enum ScanRunState {
+ QUEUED, RUNNING, FINISHED
+}
+
+public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean {
+ private static final Logger log = Logger.getLogger(TabletServer.class);
+
+ private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
+ private static long lastMemorySize = 0;
+ private static long gcTimeIncreasedCount;
+
+ private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
+
+ private TabletServerLogger logger;
+
+ protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+
+ private ServerConfiguration serverConfig;
+ private LogSorter logSorter = null;
+
+ public TabletServer(ServerConfiguration conf, FileSystem fs) {
+ super();
+ this.serverConfig = conf;
+ this.instance = conf.getInstance();
+ this.fs = TraceFileSystem.wrap(fs);
+ this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (onlineTablets) {
+ long now = System.currentTimeMillis();
+ for (Tablet tablet : onlineTablets.values())
+ try {
+ tablet.updateRates(now);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+ }
+ }, 5000, 5000);
+ }
+
+ private synchronized static void logGCInfo(AccumuloConfiguration conf) {
+ List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ Runtime rt = Runtime.getRuntime();
+
+ StringBuilder sb = new StringBuilder("gc");
+
+ boolean sawChange = false;
+
+ long maxIncreaseInCollectionTime = 0;
+
+ for (GarbageCollectorMXBean gcBean : gcmBeans) {
+ Long prevTime = prevGcTime.get(gcBean.getName());
+ long pt = 0;
+ if (prevTime != null) {
+ pt = prevTime;
+ }
+
+ long time = gcBean.getCollectionTime();
+
+ if (time - pt != 0) {
+ sawChange = true;
+ }
+
+ long increaseInCollectionTime = time - pt;
+ sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
+ maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
+ prevGcTime.put(gcBean.getName(), time);
+ }
+
+ long mem = rt.freeMemory();
+ if (maxIncreaseInCollectionTime == 0) {
+ gcTimeIncreasedCount = 0;
+ } else {
+ gcTimeIncreasedCount++;
+ if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
+ log.warn("Running low on memory");
+ gcTimeIncreasedCount = 0;
+ }
+ }
+
+ if (mem > lastMemorySize) {
+ sawChange = true;
+ }
+
+ String sign = "+";
+ if (mem - lastMemorySize <= 0) {
+ sign = "";
+ }
+
+ sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
+
+ if (sawChange) {
+ log.debug(sb.toString());
+ }
+
+ final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+ if (maxIncreaseInCollectionTime > keepAliveTimeout) {
+ Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
+ }
+
+ lastMemorySize = mem;
+ }
+
+ private TabletStatsKeeper statsKeeper;
+
+ private static class Session {
+ long lastAccessTime;
+ long startTime;
+ String user;
+ String client = TServerUtils.clientAddress.get();
+ public boolean reserved;
+
+ public void cleanup() {}
+ }
+
+ private static class SessionManager {
+
+ SecureRandom random;
+ Map<Long,Session> sessions;
+
+ SessionManager(AccumuloConfiguration conf) {
+ random = new SecureRandom();
+ sessions = new HashMap<Long,Session>();
+
+ final long maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ sweep(maxIdle);
+ }
+ };
+
+ SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
+ }
+
+ synchronized long createSession(Session session, boolean reserve) {
+ long sid = random.nextLong();
+
+ while (sessions.containsKey(sid)) {
+ sid = random.nextLong();
+ }
+
+ sessions.put(sid, session);
+
+ session.reserved = reserve;
+
+ session.startTime = session.lastAccessTime = System.currentTimeMillis();
+
+ return sid;
+ }
+
+ /**
+ * while a session is reserved, it cannot be canceled or removed
+ *
+ * @param sessionId
+ */
+
+ synchronized Session reserveSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ synchronized void unreserveSession(Session session) {
+ if (!session.reserved)
+ throw new IllegalStateException();
+ session.reserved = false;
+ session.lastAccessTime = System.currentTimeMillis();
+ }
+
+ synchronized void unreserveSession(long sessionId) {
+ Session session = getSession(sessionId);
+ if (session != null)
+ unreserveSession(session);
+ }
+
+ synchronized Session getSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null)
+ session.lastAccessTime = System.currentTimeMillis();
+ return session;
+ }
+
+ Session removeSession(long sessionId) {
+ Session session = null;
+ synchronized (this) {
+ session = sessions.remove(sessionId);
+ }
+
+ // do clean up out side of lock..
+ if (session != null)
+ session.cleanup();
+
+ return session;
+ }
+
+ private void sweep(long maxIdle) {
+ ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
+ synchronized (this) {
+ Iterator<Session> iter = sessions.values().iterator();
+ while (iter.hasNext()) {
+ Session session = iter.next();
+ long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+ if (idleTime > maxIdle && !session.reserved) {
+ iter.remove();
+ sessionsToCleanup.add(session);
+ }
+ }
+ }
+
+ // do clean up outside of lock
+ for (Session session : sessionsToCleanup) {
+ session.cleanup();
+ }
+ }
+
+ synchronized void removeIfNotAccessed(final long sessionId, long delay) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ final long removeTime = session.lastAccessTime;
+ TimerTask r = new TimerTask() {
+ @Override
+ public void run() {
+ Session sessionToCleanup = null;
+ synchronized (SessionManager.this) {
+ Session session2 = sessions.get(sessionId);
+ if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
+ sessions.remove(sessionId);
+ sessionToCleanup = session2;
+ }
+ }
+
+ // call clean up outside of lock
+ if (sessionToCleanup != null)
+ sessionToCleanup.cleanup();
+ }
+ };
+
+ SimpleTimer.getInstance().schedule(r, delay);
+ }
+ }
+
+ public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
+ Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+
+ Session session = entry.getValue();
+ @SuppressWarnings("rawtypes")
+ ScanTask nbt = null;
+ String tableID = null;
+
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+ nbt = ss.nextBatchTask;
+ tableID = ss.extent.getTableId().toString();
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+ nbt = mss.lookupTask;
+ tableID = mss.threadPoolExtent.getTableId().toString();
+ }
+
+ if (nbt == null)
+ continue;
+
+ ScanRunState srs = nbt.getScanRunState();
+
+ if (nbt == null || srs == ScanRunState.FINISHED)
+ continue;
+
+ MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+ if (stateCounts == null) {
+ stateCounts = new MapCounter<ScanRunState>();
+ counts.put(tableID, stateCounts);
+ }
+
+ stateCounts.increment(srs, 1);
+ }
+
+ return counts;
+ }
+
+ public synchronized List<ActiveScan> getActiveScans() {
+
+ ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+
+ long ct = System.currentTimeMillis();
+
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+ Session session = entry.getValue();
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+ state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<MultiScanResult> nbt = mss.lookupTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
+ ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
+ .getAuthorizationsBB()));
+ }
+ }
+
+ return activeScans;
+ }
+ }
+
+ static class TservConstraintEnv implements Environment {
+
+ private TCredentials credentials;
+ private SecurityOperation security;
+ private Authorizations auths;
+ private KeyExtent ke;
+
+ TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
+ this.security = secOp;
+ this.credentials = credentials;
+ }
+
+ void setExtent(KeyExtent ke) {
+ this.ke = ke;
+ }
+
+ @Override
+ public KeyExtent getExtent() {
+ return ke;
+ }
+
+ @Override
+ public String getUser() {
+ return credentials.getPrincipal();
+ }
+
+ @Override
+ public Authorizations getAuthorizations() {
+ if (auths == null)
+ try {
+ this.auths = security.getUserAuthorizations(credentials);
+ } catch (ThriftSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ return auths;
+ }
+
+ }
+
+ private abstract class ScanTask<T> implements RunnableFuture<T> {
+
+ protected AtomicBoolean interruptFlag;
+ protected ArrayBlockingQueue<Object> resultQueue;
+ protected AtomicInteger state;
+ protected AtomicReference<ScanRunState> runState;
+
+ private static final int INITIAL = 1;
+ private static final int ADDED = 2;
+ private static final int CANCELED = 3;
+
+ ScanTask() {
+ interruptFlag = new AtomicBoolean(false);
+ runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
+ state = new AtomicInteger(INITIAL);
+ resultQueue = new ArrayBlockingQueue<Object>(1);
+ }
+
+ protected void addResult(Object o) {
+ if (state.compareAndSet(INITIAL, ADDED))
+ resultQueue.add(o);
+ else if (state.get() == ADDED)
+ throw new IllegalStateException("Tried to add more than one result");
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (!mayInterruptIfRunning)
+ throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
+
+ if (state.get() == CANCELED)
+ return true;
+
+ if (state.compareAndSet(INITIAL, CANCELED)) {
+ interruptFlag.set(true);
+ resultQueue = null;
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+
+ ArrayBlockingQueue<Object> localRQ = resultQueue;
+
+ if (state.get() == CANCELED)
+ throw new CancellationException();
+
+ if (localRQ == null && state.get() == ADDED)
+ throw new IllegalStateException("Tried to get result twice");
+
+ Object r = localRQ.poll(timeout, unit);
+
+ // could have been canceled while waiting
+ if (state.get() == CANCELED) {
+ if (r != null)
+ throw new IllegalStateException("Nothing should have been added when in canceled state");
+
+ throw new CancellationException();
+ }
+
+ if (r == null)
+ throw new TimeoutException();
+
+ // make this method stop working now that something is being
+ // returned
+ resultQueue = null;
+
+ if (r instanceof Throwable)
+ throw new ExecutionException((Throwable) r);
+
+ return (T) r;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state.get() == CANCELED;
+ }
+
+ @Override
+ public boolean isDone() {
+ return runState.get().equals(ScanRunState.FINISHED);
+ }
+
+ public ScanRunState getScanRunState() {
+ return runState.get();
+ }
+
+ }
+
+ private static class UpdateSession extends Session {
+ public Tablet currentTablet;
+ public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
+ Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
+ HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
+ public Violations violations;
+ public TCredentials credentials;
+ public long totalUpdates = 0;
+ public long flushTime = 0;
+ Stat prepareTimes = new Stat();
+ Stat walogTimes = new Stat();
+ Stat commitTimes = new Stat();
+ Stat authTimes = new Stat();
+ public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
+ public long queuedMutationSize = 0;
+ TservConstraintEnv cenv = null;
+ }
+
+ private static class ScanSession extends Session {
+ public KeyExtent extent;
+ public HashSet<Column> columnSet;
+ public List<IterInfo> ssiList;
+ public Map<String,Map<String,String>> ssio;
+ public Authorizations auths;
+ public long entriesReturned = 0;
+ public Stat nbTimes = new Stat();
+ public long batchCount = 0;
+ public volatile ScanTask<ScanBatch> nextBatchTask;
+ public AtomicBoolean interruptFlag;
+ public Scanner scanner;
+
+ @Override
+ public void cleanup() {
+ try {
+ if (nextBatchTask != null)
+ nextBatchTask.cancel(true);
+ } finally {
+ if (scanner != null)
+ scanner.close();
+ }
+ }
+
+ }
+
+ private static class MultiScanSession extends Session {
+ HashSet<Column> columnSet;
+ Map<KeyExtent,List<Range>> queries;
+ public List<IterInfo> ssiList;
+ public Map<String,Map<String,String>> ssio;
+ public Authorizations auths;
+
+ // stats
+ int numRanges;
+ int numTablets;
+ int numEntries;
+ long totalLookupTime;
+
+ public volatile ScanTask<MultiScanResult> lookupTask;
+ public KeyExtent threadPoolExtent;
+
+ @Override
+ public void cleanup() {
+ if (lookupTask != null)
+ lookupTask.cancel(true);
+ }
+ }
+
+ /**
+ * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
+ * are monotonically increasing.
+ *
+ */
+ static class WriteTracker {
+ private static AtomicLong operationCounter = new AtomicLong(1);
+ private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
+
+ WriteTracker() {
+ for (TabletType ttype : TabletType.values()) {
+ inProgressWrites.put(ttype, new TreeSet<Long>());
+ }
+ }
+
+ synchronized long startWrite(TabletType ttype) {
+ long operationId = operationCounter.getAndIncrement();
+ inProgressWrites.get(ttype).add(operationId);
+ return operationId;
+ }
+
+ synchronized void finishWrite(long operationId) {
+ if (operationId == -1)
+ return;
+
+ boolean removed = false;
+
+ for (TabletType ttype : TabletType.values()) {
+ removed = inProgressWrites.get(ttype).remove(operationId);
+ if (removed)
+ break;
+ }
+
+ if (!removed) {
+ throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
+ }
+
+ this.notifyAll();
+ }
+
+ synchronized void waitForWrites(TabletType ttype) {
+ long operationId = operationCounter.getAndIncrement();
+ while (inProgressWrites.get(ttype).floor(operationId) != null) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ }
+ }
+ }
+
+ public long startWrite(Set<Tablet> keySet) {
+ if (keySet.size() == 0)
+ return -1;
+
+ ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
+
+ for (Tablet tablet : keySet)
+ extents.add(tablet.getExtent());
+
+ return startWrite(TabletType.type(extents));
+ }
+ }
+
+ public AccumuloConfiguration getSystemConfiguration() {
+ return serverConfig.getConfiguration();
+ }
+
+ TransactionWatcher watcher = new TransactionWatcher();
+
+ private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
+
+ SessionManager sessionManager;
+
+ AccumuloConfiguration acuConf = getSystemConfiguration();
+
+ TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
+
+ TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
+
+ WriteTracker writeTracker = new WriteTracker();
+
+ ThriftClientHandler() {
+ super(instance, watcher);
+ log.debug(ThriftClientHandler.class.getName() + " created");
+ sessionManager = new SessionManager(getSystemConfiguration());
+ // Register the metrics MBean
+ try {
+ updateMetrics.register();
+ scanMetrics.register();
+ } catch (Exception e) {
+ log.error("Exception registering MBean with MBean Server", e);
+ }
+ }
+
+ @Override
+ public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
+ throws ThriftSecurityException {
+
+ if (!security.canPerformSystemActions(credentials))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+
+ for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+ TKeyExtent tke = entry.getKey();
+ Map<String,MapFileInfo> fileMap = entry.getValue();
+
+ Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+
+ if (importTablet == null) {
+ failures.add(tke);
+ } else {
+ try {
+ importTablet.importMapFiles(tid, fileMap, setTime);
+ } catch (IOException ioe) {
+ log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
+ failures.add(tke);
+ }
+ }
+ }
+ return failures;
+ }
+
+ private class NextBatchTask extends ScanTask<ScanBatch> {
+
+ private long scanID;
+
+ NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
+ this.scanID = scanID;
+ this.interruptFlag = interruptFlag;
+
+ if (interruptFlag.get())
+ cancel(true);
+ }
+
+ @Override
+ public void run() {
+
+ final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
+ String oldThreadName = Thread.currentThread().getName();
+
+ try {
+ if (isCancelled() || scanSession == null)
+ return;
+
+ runState.set(ScanRunState.RUNNING);
+
+ Thread.currentThread().setName(
+ "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
+
+ Tablet tablet = onlineTablets.get(scanSession.extent);
+
+ if (tablet == null) {
+ addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+ return;
+ }
+
+ long t1 = System.currentTimeMillis();
+ ScanBatch batch = scanSession.scanner.read();
+ long t2 = System.currentTimeMillis();
+ scanSession.nbTimes.addStat(t2 - t1);
+
+ // there should only be one thing on the queue at a time, so
+ // it should be ok to call add()
+ // instead of put()... if add() fails because queue is at
+ // capacity it means there is code
+ // problem somewhere
+ addResult(batch);
+ } catch (TabletClosedException e) {
+ addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+ } catch (IterationInterruptedException iie) {
+ if (!isCancelled()) {
+ log.warn("Iteration interrupted, when scan not cancelled", iie);
+ addResult(iie);
+ }
+ } catch (TooManyFilesException tmfe) {
+ addResult(tmfe);
+ } catch (Throwable e) {
+ log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
+ addResult(e);
+ } finally {
+ runState.set(ScanRunState.FINISHED);
+ Thread.currentThread().setName(oldThreadName);
+ }
+
+ }
+ }
+
+ private class LookupTask extends ScanTask<MultiScanResult> {
+
+ private long scanID;
+
+ LookupTask(long scanID) {
+ this.scanID = scanID;
+ }
+
+ @Override
+ public void run() {
+ MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
+ String oldThreadName = Thread.currentThread().getName();
+
+ try {
+ if (isCancelled() || session == null)
+ return;
+
+ TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
+ long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+
+ runState.set(ScanRunState.RUNNING);
+ Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
+
+ long bytesAdded = 0;
+ long maxScanTime = 4000;
+
+ long startTime = System.currentTimeMillis();
+
+ ArrayList<KVEntry> results = new ArrayList<KVEntry>();
+ Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
+ ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
+ KeyExtent partScan = null;
+ Key partNextKey = null;
+ boolean partNextKeyInclusive = false;
+
+ Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
+
+ // check the time so that the read ahead thread is not monopolized
+ while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
+ Entry<KeyExtent,List<Range>> entry = iter.next();
+
+ iter.remove();
+
+ // check that tablet server is serving requested tablet
+ Tablet tablet = onlineTablets.get(entry.getKey());
+ if (tablet == null) {
+ failures.put(entry.getKey(), entry.getValue());
+ continue;
+ }
+ Thread.currentThread().setName(
+ "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
+
+ LookupResult lookupResult;
+ try {
+
+ // do the following check to avoid a race condition
+ // between setting false below and the task being
+ // canceled
+ if (isCancelled())
+ interruptFlag.set(true);
+
+ lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
+ session.ssio, interruptFlag);
+
+ // if the tablet was closed it it possible that the
+ // interrupt flag was set.... do not want it set for
+ // the next
+ // lookup
+ interruptFlag.set(false);
+
+ } catch (IOException e) {
+ log.warn("lookup failed for tablet " + entry.getKey(), e);
+ throw new RuntimeException(e);
+ }
+
+ bytesAdded += lookupResult.bytesAdded;
+
+ if (lookupResult.unfinishedRanges.size() > 0) {
+ if (lookupResult.closed) {
+ failures.put(entry.getKey(), lookupResult.unfinishedRanges);
+ } else {
+ session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
+ partScan = entry.getKey();
+ partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
+ partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
+ }
+ } else {
+ fullScans.add(entry.getKey());
+ }
+ }
+
+ long finishTime = System.currentTimeMillis();
+ session.totalLookupTime += (finishTime - startTime);
+ session.numEntries += results.size();
+
+ // convert everything to thrift before adding result
+ List<TKeyValue> retResults = new ArrayList<TKeyValue>();
+ for (KVEntry entry : results)
+ retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value)));
+ Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translator.KET, new Translator.ListTranslator<Range,TRange>(Translator.RT));
+ List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translator.KET);
+ TKeyExtent retPartScan = null;
+ TKey retPartNextKey = null;
+ if (partScan != null) {
+ retPartScan = partScan.toThrift();
+ retPartNextKey = partNextKey.toThrift();
+ }
+ // add results to queue
+ addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
+ } catch (IterationInterruptedException iie) {
+ if (!isCancelled()) {
+ log.warn("Iteration interrupted, when scan not cancelled", iie);
+ addResult(iie);
+ }
+ } catch (Throwable e) {
+ log.warn("exception while doing multi-scan ", e);
+ addResult(e);
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ runState.set(ScanRunState.FINISHED);
+ }
+ }
+ }
+
+ @Override
+ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated)
+ throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+
+ Authorizations userauths = null;
+ if (!security.canScan(credentials, new String(textent.getTable())))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ userauths = security.getUserAuthorizations(credentials);
+ for (ByteBuffer auth : authorizations)
+ if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
+ KeyExtent extent = new KeyExtent(textent);
+
+ // wait for any writes that are in flight.. this done to ensure
+ // consistency across client restarts... assume a client writes
+ // to accumulo and dies while waiting for a confirmation from
+ // accumulo... the client process restarts and tries to read
+ // data from accumulo making the assumption that it will get
+ // any writes previously made... however if the server side thread
+ // processing the write from the dead client is still in progress,
+ // the restarted client may not see the write unless we wait here.
+ // this behavior is very important when the client is reading the
+ // !METADATA table
+ if (waitForWrites)
+ writeTracker.waitForWrites(TabletType.type(extent));
+
+ Tablet tablet = onlineTablets.get(extent);
+ if (tablet == null)
+ throw new NotServingTabletException(textent);
+
+ ScanSession scanSession = new ScanSession();
+ scanSession.user = credentials.getPrincipal();
+ scanSession.extent = new KeyExtent(extent);
+ scanSession.columnSet = new HashSet<Column>();
+ scanSession.ssiList = ssiList;
+ scanSession.ssio = ssio;
+ scanSession.auths = new Authorizations(authorizations);
+ scanSession.interruptFlag = new AtomicBoolean();
+
+ for (TColumn tcolumn : columns) {
+ scanSession.columnSet.add(new Column(tcolumn));
+ }
+
+ scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
+ scanSession.interruptFlag);
+
+ long sid = sessionManager.createSession(scanSession, true);
+
+ ScanResult scanResult;
+ try {
+ scanResult = continueScan(tinfo, sid, scanSession);
+ } catch (NoSuchScanIDException e) {
+ log.error("The impossible happened", e);
+ throw new RuntimeException();
+ } finally {
+ sessionManager.unreserveSession(sid);
+ }
+
+ return new InitialScan(sid, scanResult);
+ }
+
+ @Override
+ public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+ ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
+ if (scanSession == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ try {
+ return continueScan(tinfo, scanID, scanSession);
+ } finally {
+ sessionManager.unreserveSession(scanSession);
+ }
+ }
+
+ private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+
+ if (scanSession.nextBatchTask == null) {
+ scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+ resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+ }
+
+ ScanBatch bresult;
+ try {
+ bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ scanSession.nextBatchTask = null;
+ } catch (ExecutionException e) {
+ sessionManager.removeSession(scanID);
+ if (e.getCause() instanceof NotServingTabletException)
+ throw (NotServingTabletException) e.getCause();
+ else if (e.getCause() instanceof TooManyFilesException)
+ throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift());
+ else
+ throw new RuntimeException(e);
+ } catch (CancellationException ce) {
+ sessionManager.removeSession(scanID);
+ Tablet tablet = onlineTablets.get(scanSession.extent);
+ if (tablet == null || tablet.isClosed())
+ throw new NotServingTabletException(scanSession.extent.toThrift());
+ else
+ throw new NoSuchScanIDException();
+ } catch (TimeoutException e) {
+ List<TKeyValue> param = Collections.emptyList();
+ long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ sessionManager.removeIfNotAccessed(scanID, timeout);
+ return new ScanResult(param, true);
+ } catch (Throwable t) {
+ sessionManager.removeSession(scanID);
+ log.warn("Failed to get next batch", t);
+ throw new RuntimeException(t);
+ }
+
+ ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
+
+ scanSession.entriesReturned += scanResult.results.size();
+
+ scanSession.batchCount++;
+
+ if (scanResult.more && scanSession.batchCount > 3) {
+ // start reading next batch while current batch is transmitted
+ // to client
+ scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+ resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+ }
+
+ if (!scanResult.more)
+ closeScan(tinfo, scanID);
+
+ return scanResult;
+ }
+
+ @Override
+ public void closeScan(TInfo tinfo, long scanID) {
+ ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
+ if (ss != null) {
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
+ .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+ if (scanMetrics.isEnabled()) {
+ scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime);
+ scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned);
+ }
+ }
+ }
+
+ @Override
+ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
+ // find all of the tables that need to be scanned
+ HashSet<String> tables = new HashSet<String>();
+ for (TKeyExtent keyExtent : tbatch.keySet()) {
+ tables.add(new String(keyExtent.getTable()));
+ }
+
+ // check if user has permission to the tables
+ Authorizations userauths = null;
+ for (String table : tables)
+ if (!security.canScan(credentials, table))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ userauths = security.getUserAuthorizations(credentials);
+ for (ByteBuffer auth : authorizations)
+ if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
+ KeyExtent threadPoolExtent = null;
+
+ Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, Translator.TKET, new Translator.ListTranslator<TRange,Range>(Translator.TRT));
+
+ for (KeyExtent keyExtent : batch.keySet()) {
+ if (threadPoolExtent == null) {
+ threadPoolExtent = keyExtent;
+ } else if (keyExtent.isRootTablet()) {
+ throw new IllegalArgumentException("Cannot batch query root tablet with other tablets " + threadPoolExtent + " " + keyExtent);
+ } else if (keyExtent.isMeta() && !threadPoolExtent.isMeta()) {
+ throw new IllegalArgumentException("Cannot batch query !METADATA and non !METADATA tablets " + threadPoolExtent + " " + keyExtent);
+ }
+
+ }
+
+ if (waitForWrites)
+ writeTracker.waitForWrites(TabletType.type(batch.keySet()));
+
+ MultiScanSession mss = new MultiScanSession();
+ mss.user = credentials.getPrincipal();
+ mss.queries = batch;
+ mss.columnSet = new HashSet<Column>(tcolumns.size());
+ mss.ssiList = ssiList;
+ mss.ssio = ssio;
+ mss.auths = new Authorizations(authorizations);
+
+ mss.numTablets = batch.size();
+ for (List<Range> ranges : batch.values()) {
+ mss.numRanges += ranges.size();
+ }
+
+ for (TColumn tcolumn : tcolumns)
+ mss.columnSet.add(new Column(tcolumn));
+
+ mss.threadPoolExtent = threadPoolExtent;
+
+ long sid = sessionManager.createSession(mss, true);
+
+ MultiScanResult result;
+ try {
+ result = continueMultiScan(tinfo, sid, mss);
+ } catch (NoSuchScanIDException e) {
+ log.error("the impossible happened", e);
+ throw new RuntimeException("the impossible happened", e);
+ } finally {
+ sessionManager.unreserveSession(sid);
+ }
+
+ return new InitialMultiScan(sid, result);
+ }
+
+ @Override
+ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+
+ MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
+
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ try {
+ return continueMultiScan(tinfo, scanID, session);
+ } finally {
+ sessionManager.unreserveSession(session);
+ }
+ }
+
+ private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
+
+ if (session.lookupTask == null) {
+ session.lookupTask = new LookupTask(scanID);
+ resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
+ }
+
+ try {
+ MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ session.lookupTask = null;
+ return scanResult;
+ } catch (TimeoutException e1) {
+ long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ sessionManager.removeIfNotAccessed(scanID, timeout);
+ List<TKeyValue> results = Collections.emptyList();
+ Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
+ List<TKeyExtent> fullScans = Collections.emptyList();
+ return new MultiScanResult(results, failures, fullScans, null, null, false, true);
+ } catch (Throwable t) {
+ sessionManager.removeSession(scanID);
+ log.warn("Failed to get multiscan result", t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ @Override
+ public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+ MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
+ session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
+ }
+
+ @Override
+ public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
+ // Make sure user is real
+
+ security.authenticateUser(credentials, credentials);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+
+ UpdateSession us = new UpdateSession();
+ us.violations = new Violations();
+ us.credentials = credentials;
+ us.cenv = new TservConstraintEnv(security, us.credentials);
+
+ long sid = sessionManager.createSession(us, false);
+
+ return sid;
+ }
+
+ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
+ long t1 = System.currentTimeMillis();
+ if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
+ return;
+ if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
+ // if there were previous failures, then do not accept additional writes
+ return;
+ }
+
+ try {
+ // if user has no permission to write to this table, add it to
+ // the failures list
+ boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
+ if (sameTable || security.canWrite(us.credentials, keyExtent.getTableId().toString())) {
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = onlineTablets.get(keyExtent);
+ if (us.currentTablet != null) {
+ us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
+ } else {
+ // not serving tablet, so report all mutations as
+ // failures
+ us.failures.put(keyExtent, 0l);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0);
+ }
+ } else {
+ log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal());
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+ return;
+ }
+ } catch (ThriftSecurityException e) {
+ log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e);
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, e.getCode());
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+ return;
+ }
+ }
+
+ @Override
+ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
+ UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
+ if (us == null) {
+ throw new RuntimeException("No Such SessionID");
+ }
+
+ try {
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+ setUpdateTablet(us, keyExtent);
+
+ if (us.currentTablet != null) {
+ List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
+ for (TMutation tmutation : tmutations) {
+ Mutation mutation = new ServerMutation(tmutation);
+ mutations.add(mutation);
+ us.queuedMutationSize += mutation.numBytes();
+ }
+ if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
+ flush(us);
+ }
+ } finally {
+ sessionManager.unreserveSession(us);
+ }
+ }
+
+ private void flush(UpdateSession us) {
+
+ int mutationCount = 0;
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+ Throwable error = null;
+
+ long pt1 = System.currentTimeMillis();
+
+ boolean containsMetadataTablet = false;
+ for (Tablet tablet : us.queuedMutations.keySet())
+ if (tablet.getExtent().isMeta())
+ containsMetadataTablet = true;
+
+ if (!containsMetadataTablet && us.queuedMutations.size() > 0)
+ TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+
+ Span prep = Trace.start("prep");
+ for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
+
+ Tablet tablet = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+ if (mutations.size() > 0) {
+ try {
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
+
+ CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
+ if (commitSession == null) {
+ if (us.currentTablet == tablet) {
+ us.currentTablet = null;
+ }
+ us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
+ } else {
+ sendables.put(commitSession, mutations);
+ mutationCount += mutations.size();
+ }
+
+ } catch (TConstraintViolationException e) {
+ us.violations.add(e.getViolations());
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
+
+ if (e.getNonViolators().size() > 0) {
+ // only log and commit mutations if there were some
+ // that did not
+ // violate constraints... this is what
+ // prepareMutationsForCommit()
+ // expects
+ sendables.put(e.getCommitSession(), e.getNonViolators());
+ }
+
+ mutationCount += mutations.size();
+
+ } catch (HoldTimeoutException t) {
+ error = t;
+ log.debug("Giving up on mutations due to a long memory hold time");
+ break;
+ } catch (Throwable t) {
+ error = t;
+ log.error("Unexpected error preparing for commit", error);
+ break;
+ }
+ }
+ }
+ prep.stop();
+
+ Span wal = Trace.start("wal");
+ long pt2 = System.currentTimeMillis();
+ long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size());
+ us.prepareTimes.addStat(pt2 - pt1);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (avgPrepareTime));
+
+ if (error != null) {
+ for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
+ e.getKey().abortCommit(e.getValue());
+ }
+ throw new RuntimeException(error);
+ }
+ try {
+ while (true) {
+ try {
+ long t1 = System.currentTimeMillis();
+
+ logger.logManyTablets(sendables);
+
+ long t2 = System.currentTimeMillis();
+ us.walogTimes.addStat(t2 - t1);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1));
+
+ break;
+ } catch (IOException ex) {
+ log.warn("logging mutations failed, retrying");
+ } catch (Throwable t) {
+ log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ wal.stop();
+
+ Span commit = Trace.start("commit");
+ long t1 = System.currentTimeMillis();
+ for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+ CommitSession commitSession = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+
+ commitSession.commit(mutations);
+
+ Tablet tablet = commitSession.getTablet();
+
+ if (tablet == us.currentTablet) {
+ // because constraint violations may filter out some
+ // mutations, for proper
+ // accounting with the client code, need to increment
+ // the count based
+ // on the original number of mutations from the client
+ // NOT the filtered number
+ us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
+ }
+ }
+ long t2 = System.currentTimeMillis();
+
+ long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
+
+ us.flushTime += (t2 - pt1);
+ us.commitTimes.addStat(t2 - t1);
+
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime);
+ commit.stop();
+ } finally {
+ us.queuedMutations.clear();
+ if (us.currentTablet != null) {
+ us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
+ }
+ us.queuedMutationSize = 0;
+ }
+ us.totalUpdates += mutationCount;
+ }
+
+ @Override
+ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
+ UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
+ if (us == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ // clients may or may not see data from an update session while
+ // it is in progress, however when the update session is closed
+ // want to ensure that reads wait for the write to finish
+ long opid = writeTracker.startWrite(us.queuedMutations.keySet());
+
+ try {
+ flush(us);
+ } finally {
+ writeTracker.finishWrite(opid);
+ }
+
+ log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
+ (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
+ us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
+ if (us.failures.size() > 0) {
+ Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
+ log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue()));
+ }
+ List<ConstraintViolationSummary> violations = us.violations.asList();
+ if (violations.size() > 0) {
+ ConstraintViolationSummary first = us.violations.asList().iterator().next();
+ log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations));
+ }
+ if (us.authFailures.size() > 0) {
+ KeyExtent first = us.authFailures.keySet().iterator().next();
+ log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
+ }
+
+ return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
+ us.authFailures, Translator.KET));
+ }
+
+ @Override
+ public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
+ ConstraintViolationException, ThriftSecurityException {
+
+ if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+ Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
+ if (tablet == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ if (!keyExtent.isMeta())
+ TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+
+ long opid = writeTracker.startWrite(TabletType.type(keyExtent));
+
+ try {
+ Mutation mutation = new ServerMutation(tmutation);
+ List<Mutation> mutations = Collections.singletonList(mutation);
+
+ Span prep = Trace.start("prep");
+ CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
+ prep.stop();
+ if (cs == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ while (true) {
+ try {
+ Span wal = Trace.start("wal");
+ logger.log(cs, cs.getWALogSeq(), mutation);
+ wal.stop();
+ break;
+ } catch (IOException ex) {
+ log.warn(ex, ex);
+ }
+ }
+
+ Span commit = Trace.start("commit");
+ cs.commit(mutations);
+ commit.stop();
+ } catch (TConstraintViolationException e) {
+ throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translator.CVST));
+ } finally {
+ writeTracker.finishWrite(opid);
+ }
+ }
+
+ @Override
+ public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint)
+ throws NotServingTabletException, ThriftSecurityException {
+
+ String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
+ if (!security.canSplitTablet(credentials, tableId))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+
+ Tablet tablet = onlineTablets.get(keyExtent);
+ if (tablet == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
+ try {
+ if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+ } catch (IOException e) {
+ log.warn("Failed to split " + keyExtent, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return getStats(sessionManager.getActiveScansPerTable());
+ }
+
+ @Override
+ public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
+ TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
+ synchronized (onlineTablets) {
+ onlineTabletsCopy = new TreeMap<KeyExtent,Tablet>(onlineTablets);
+ }
+ List<TabletStats> result = new ArrayList<TabletStats>();
+ Text text = new Text(tableId);
+ KeyExtent start = new KeyExtent(text, new Text(), null);
+ for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.tailMap(start).entrySet()) {
+ KeyExtent ke = entry.getKey();
+ if (ke.getTableId().compareTo(text) == 0) {
+ Tablet tablet = entry.getValue();
+ TabletStats stats = tablet.timer.getTabletStats();
+ stats.extent = ke.toThrift();
+ stats.ingestRate = tablet.ingestRate();
+ stats.queryRate = tablet.queryRate();
+ stats.splitCreationTime = tablet.getSplitCreationTime();
+ stats.numEntries = tablet.getNumEntries();
+ result.add(stats);
+ }
+ }
+ return result;
+ }
+
+ private ZooCache masterLockCache = new ZooCache();
+
+ private void checkPermission(TCredentials credentials, String lock, boolean requiresSystemPermission, final String request)
+ throws ThriftSecurityException {
+ if (requiresSystemPermission) {
+ boolean fatal = false;
+ try {
+ log.debug("Got " + request + " message from user: " + credentials.getPrincipal());
+ if (!security.canPerformSystemActions(credentials)) {
+ log.warn("Got " + request + " message from user: " + credentials.getPrincipal());
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ }
+ } catch (ThriftSecurityException e) {
+ log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser());
+ if (e.getUser().equals(SecurityConstants.SYSTEM_PRINCIPAL)) {
+ log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e);
+ fatal = true;
+ }
+ throw e;
+ } finally {
+ if (fatal) {
+ Halt.halt(1, new Runnable() {
+ @Override
+ public void run() {
+ logGCInfo(getSystemConfiguration());
+ }
+ });
+ }
+ }
+ }
+
+ if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
+ log.warn("Got " + request + " message from master before lock acquired, ignoring...");
+ throw new RuntimeException("Lock not acquired");
+ }
+
+ if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
+ Halt.halt(1, new Runnable() {
+ @Override
+ public void run() {
+ log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting");
+ logGCInfo(getSystemConfiguration());
+ }
+ });
+ }
+
+ if (lock != null) {
+ ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
+
+ try {
+ if (!ZooLock.isLockHeld(masterLockCache, lid)) {
+ // maybe the cache is out of date and a new master holds the
+ // lock?
+ masterLockCache.clear();
+ if (!ZooLock.isLockHeld(masterLockCache, lid)) {
+ log.warn("Got " + request + " message from a master that does not hold the current lock " + lock);
+ throw new RuntimeException("bad master lock");
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("bad master lock", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
+
+ try {
+ checkPermission(credentials, lock, true, "loadTablet");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ final KeyExtent extent = new KeyExtent(textent);
+
+ synchronized (unopenedTablets) {
+ synchronized (openingTablets) {
+ synchronized (onlineTablets) {
+
+ // checking if this exact tablet is in any of the sets
+ // below is not a strong enough check
+ // when splits and fix splits occurring
+
+ Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
+ Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
+ Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
+ Set<KeyExtent> all = new HashSet<KeyExtent>();
+ all.addAll(unopenedOverlapping);
+ all.addAll(openingOverlapping);
+ all.addAll(onlineOverlapping);
+
+ if (!all.isEmpty()) {
+ if (all.size() != 1 || !all.contains(extent)) {
+ log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
+ }
+ return;
+ }
+
+ unopenedTablets.add(extent);
+ }
+ }
+ }
+
+ // add the assignment job to the appropriate queue
+ log.info("Loading tablet " + extent);
+
+ final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
+ // Root tablet assignment must take place immediately
+ if (extent.isRootTablet()) {
+ new Daemon("Root Tablet Assignment") {
+ @Override
+ public void run() {
+ ah.run();
+ if (onlineTablets.containsKey(extent)) {
+ log.info("Root tablet loaded: " + extent);
+ } else {
+ log.info("Root tablet failed to load");
+ }
+
+ }
+ }.start();
+ } else {
+ if (extent.isMeta()) {
+ resourceManager.addMetaDataAssignment(ah);
+ } else {
+ resourceManager.addAssignment(ah);
+ }
+ }
+ }
+
+ @Override
+ public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, boolean save) {
+ try {
+ checkPermission(credentials, lock, true, "unloadTablet");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent extent = new KeyExtent(textent);
+
+ resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save)));
+ }
+
+ @Override
+ public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) {
+ try {
+ checkPermission(credentials, lock, true, "flush");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ ArrayList<Tablet> tabletsToFlush = new ArrayList<Tablet>();
+
+ KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
+
+ synchronized (onlineTablets) {
+ for (Tablet tablet : onlineTablets.values())
+ if (ke.overlaps(tablet.getExtent()))
+ tabletsToFlush.add(tablet);
+ }
+
+ Long flushID = null;
+
+ for (Tablet tablet : tabletsToFlush) {
+ if (flushID == null) {
+ // read the flush id once from zookeeper instead of reading
+ // it for each tablet
+ try {
+ flushID = tablet.getFlushID();
+ } catch (NoNodeException e) {
+ // table was probably deleted
+ log.info("Asked to flush table that has no flush id " + ke + " " + e.getMessage());
+ return;
+ }
+ }
+ tablet.flush(flushID);
+ }
+ }
+
+ @Override
+ public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
+ try {
+ checkPermission(credentials, lock, true, "flushTablet");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ Tablet tablet = onlineTablets.get(new KeyExtent(textent));
+ if (tablet != null) {
+ log.info("Flushing " + tablet.getExtent());
+ try {
+ tablet.flush(tablet.getFlushID());
+ } catch (NoNodeException nne) {
+ log.info("Asked to flush tablet that has no flush id " + new KeyExtent(textent) + " " + nne.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException {
+
+ checkPermission(credentials, lock, true, "halt");
+
+ Halt.halt(0, new Runnable() {
+ @Override
+ public void run() {
+ log.info("Master requested tablet server halt");
+ logGCInfo(getSystemConfiguration());
+ serverStopRequested = true;
+ try {
+ tabletServerLock.unlock();
+ } catch (Exception e) {
+ log.error(e, e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void fastHalt(TInfo info, TCredentials credentials, String lock) {
+ try {
+ halt(info, credentials, lock);
+ } catch (Exception e) {
+ log.warn("Error halting", e);
+ }
+ }
+
+ @Override
+ public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return statsKeeper.getTabletStats();
+ }
+
+ @Override
+ public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ try {
+ checkPermission(credentials, null, true, "getScans");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ return sessionManager.getActiveScans();
+ }
+
+ @Override
+ public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
+ try {
+ checkPermission(credentials, lock, true, "chop");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent ke = new KeyExtent(textent);
+
+ Tablet tablet = onlineTablets.get(ke);
+ if (tablet != null) {
+ tablet.chopFiles();
+ }
+ }
+
+ @Override
+ public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow)
+ throws TException {
+ try {
+ checkPermission(credentials, lock, true, "compact");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
+
+ ArrayList<Tablet> tabletsToCompact = new ArrayList<Tablet>();
+ synchronized (onlineTablets) {
+ for (Tablet tablet : onlineTablets.values())
+ if (ke.overlaps(tablet.getExtent()))
+ tabletsToCompact.add(tablet);
+ }
+
+ Long compactionId = null;
+
+ for (Tablet tablet : tabletsToCompact) {
+ // all for the same table id, so only need to read
+ // compaction id once
+ if (compactionId == null)
+ try {
+ compactionId = tablet.getCompactionID().getFirst();
+ } catch (NoNodeException e) {
+ log.info("Asked to compact table with no compaction id " + ke + " " + e.getMessage());
+ return;
+ }
+ tablet.compactAll(compactionId);
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.trace.thrift.TInfo,
+ * org.apache.accumulo.core.security.thrift.Credentials, java.util.List)
+ */
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
+ String myname = getClientAddressString();
+ myname = myname.replace(':', '+');
+ Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
+ Set<String> loggers = new HashSet<String>();
+ logger.getLoggers(loggers);
+ nextFile: for (String filename : filenames) {
+ for (String logger : loggers) {
+ if (logger.contains(filename))
+ continue nextFile;
+ }
+ List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
+ synchronized (onlineTablets) {
+ onlineTabletsCopy.addAll(onlineTablets.values());
+ }
+ for (Tablet tablet : onlineTabletsCopy) {
+ for (String current : tablet.getCurrentLogs()) {
+ if (current.contains(filename)) {
+ log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
+ continue nextFile;
+ }
+ }
+ }
+ try {
+ String source = logDir + "/" + filename;
+ if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
+ String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
+ fs.mkdirs(new Path(walogArchive));
+ String dest = walogArchive + "/" + filename;
+ log.info("Archiving walog " + source + " to " + dest);
+ if (!fs.rename(new Path(source), new Path(dest)))
+ log.error("rename is unsuccessful");
+ } else {
+ log.info("Deleting walog " + filename);
+ Trash trash = new Trash(fs, fs.getConf());
+ Path sourcePath = new Path(source);
+ if (!(!acuConf.getBoolean(Property.GC_TRASH_IGNORE) && trash.moveToTrash(sourcePath)) && !fs.delete(sourcePath, true))
+ log.warn("Failed to delete walog " + source);
+ Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename);
+ try {
+ if (trash.moveToTrash(recoveryPath) || fs.delete(recoveryPath, true))
+ log.info("Deleted any recovery log " + filename);
+ } catch (FileNotFoundException ex) {
+ // ignore
+ }
+
+ }
+ } catch (IOException e) {
+ log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
+ }
+ }
+ }
+
+ @Override
+ public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ try {
+ checkPermission(credentials, null, true, "getActiveCompactions");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ List<CompactionInfo> compactions = Compactor.getRunningCompactions();
+ List<ActiveCompaction> ret = new ArrayList<ActiveCompaction>(compactions.size());
+
+ for (CompactionInfo compactionInfo : compactions) {
+ ret.add(compactionInfo.toThrift());
+ }
+
+ return ret;
+ }
+ }
+
+ private class SplitRunner implements Runnable {
+ private Tablet tablet;
+
+ public SplitRunner(Tablet tablet) {
+ this.tablet = tablet;
+ }
+
+ @Override
+ public void run() {
+ if (majorCompactorDisabled) {
+ // this will make split task that were queued when shutdown was
+ // initiated exit
+ return;
+ }
+
+ splitTablet(tablet);
+ }
+ }
+
+ boolean isMajorCompactionDisabled() {
+ return majorCompactorDisabled;
+ }
+
+ void executeSplit(Tablet tablet) {
+ resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
+ }
+
+ private class MajorCompactor implements Runnable {
+
+ @Override
+ public void run() {
+ while (!majorCompactorDisabled) {
+ try {
+ UtilWaitThread.sleep(getSystemConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY));
+
+ TreeMap<KeyExtent,Tablet> copyOnlineTablets = new TreeMap<KeyExtent,Tablet>();
+
+ synchronized (onlineTablets) {
+ copyOnlineTablets.putAll(onlineTablets); // avoid
+ // concurrent
+ // modification
+ }
+
+ int numMajorCompactionsInProgress = 0;
+
+ Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
+
<TRUNCATED>