You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2014/01/28 23:56:57 UTC
[1/3] git commit: ACCUMULO-2266 Checking hdfs minimum block size and
failing if it clashes with walog max size
Updated Branches:
refs/heads/1.6.0-SNAPSHOT 07d7ececf -> 550d8ac65
ACCUMULO-2266 Checking hdfs minimum block size and failing if it clashes with walog max size
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/07da9e3f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/07da9e3f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/07da9e3f
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 07da9e3f3cf783074a52331e0d3a4fe0ab4e388b
Parents: f1ae5a8
Author: John Vines <vi...@apache.org>
Authored: Tue Jan 28 17:52:25 2014 -0500
Committer: John Vines <vi...@apache.org>
Committed: Tue Jan 28 17:52:25 2014 -0500
----------------------------------------------------------------------
.../apache/accumulo/server/tabletserver/TabletServer.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/07da9e3f/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 2ce6b9d..ebbc214 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -3042,8 +3042,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.info("Tablet server starting on " + hostname);
security = AuditedSecurityOperation.getInstance();
clientAddress = new InetSocketAddress(hostname, 0);
- logger = new TabletServerLogger(this, getSystemConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
-
+ long walogMaxSize = getSystemConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
+ long minBlockSize = CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size", 0);
+ if (minBlockSize != 0 && minBlockSize > walogMaxSize)
+ throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is "
+ + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
+ logger = new TabletServerLogger(this, walogMaxSize);
+
if (getSystemConfiguration().getBoolean(Property.TSERV_LOCK_MEMORY)) {
String path = "lib/native/mlock/" + System.mapLibraryName("MLock-" + Platform.getPlatform());
path = new File(path).getAbsolutePath();
[3/3] git commit: Merge remote-tracking branch
'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Posted by vi...@apache.org.
Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/550d8ac6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/550d8ac6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/550d8ac6
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 550d8ac654c33beffc72b08c6033a8602620b672
Parents: 07d7ece 07da9e3
Author: John Vines <vi...@apache.org>
Authored: Tue Jan 28 17:56:23 2014 -0500
Committer: John Vines <vi...@apache.org>
Committed: Tue Jan 28 17:56:23 2014 -0500
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/tserver/TabletServer.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[2/3] Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into
1.6.0-SNAPSHOT
Posted by vi...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/550d8ac6/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index d03f4b6,0000000..71d7328
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -1,3892 -1,0 +1,3898 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.CompressedIterators;
+import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.client.impl.TabletType;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
+import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.constraints.Constraint.Environment;
+import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.InitialMultiScan;
+import org.apache.accumulo.core.data.thrift.InitialScan;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
+import org.apache.accumulo.core.data.thrift.TCMStatus;
+import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+import org.apache.accumulo.core.data.thrift.TConditionalSession;
+import org.apache.accumulo.core.data.thrift.TKey;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TKeyValue;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.master.thrift.Compacting;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.AuthorizationContainer;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.ByteBufferUtil;
++import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.recovery.RecoveryPath;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.util.FileSystemMonitor;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
+import org.apache.accumulo.server.util.time.RelativeTime;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.start.classloader.vfs.ContextManager;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.accumulo.tserver.Compactor.CompactionInfo;
+import org.apache.accumulo.tserver.RowLocks.RowLock;
+import org.apache.accumulo.tserver.Tablet.CommitSession;
+import org.apache.accumulo.tserver.Tablet.KVEntry;
+import org.apache.accumulo.tserver.Tablet.LookupResult;
+import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.tserver.Tablet.ScanBatch;
+import org.apache.accumulo.tserver.Tablet.Scanner;
+import org.apache.accumulo.tserver.Tablet.SplitInfo;
+import org.apache.accumulo.tserver.Tablet.TConstraintViolationException;
+import org.apache.accumulo.tserver.Tablet.TabletClosedException;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.log.LogSorter;
+import org.apache.accumulo.tserver.log.MutationReceiver;
+import org.apache.accumulo.tserver.log.TabletServerLogger;
+import org.apache.accumulo.tserver.mastermessage.MasterMessage;
+import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
+import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.tserver.metrics.TabletServerMBean;
+import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.server.TServer;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+import com.google.common.net.HostAndPort;
+
+enum ScanRunState {
+ QUEUED, RUNNING, FINISHED
+}
+
+public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.tserver.metrics.TabletServerMBean {
+ private static final Logger log = Logger.getLogger(TabletServer.class);
+
+ private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
+ private static long lastMemorySize = 0;
+ private static long gcTimeIncreasedCount;
+
+ private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
+ private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
+
+ private TabletServerLogger logger;
+
+ protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+
+ private ServerConfiguration serverConfig;
+ private LogSorter logSorter = null;
+
+ public TabletServer(ServerConfiguration conf, VolumeManager fs) {
+ super();
+ this.serverConfig = conf;
+ this.instance = conf.getInstance();
+ this.fs = fs;
+ this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (onlineTablets) {
+ long now = System.currentTimeMillis();
+ for (Tablet tablet : onlineTablets.values())
+ try {
+ tablet.updateRates(now);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+ }
+ }, 5000, 5000);
+ }
+
+ private synchronized static void logGCInfo(AccumuloConfiguration conf) {
+ List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ Runtime rt = Runtime.getRuntime();
+
+ StringBuilder sb = new StringBuilder("gc");
+
+ boolean sawChange = false;
+
+ long maxIncreaseInCollectionTime = 0;
+
+ for (GarbageCollectorMXBean gcBean : gcmBeans) {
+ Long prevTime = prevGcTime.get(gcBean.getName());
+ long pt = 0;
+ if (prevTime != null) {
+ pt = prevTime;
+ }
+
+ long time = gcBean.getCollectionTime();
+
+ if (time - pt != 0) {
+ sawChange = true;
+ }
+
+ long increaseInCollectionTime = time - pt;
+ sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
+ maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
+ prevGcTime.put(gcBean.getName(), time);
+ }
+
+ long mem = rt.freeMemory();
+ if (maxIncreaseInCollectionTime == 0) {
+ gcTimeIncreasedCount = 0;
+ } else {
+ gcTimeIncreasedCount++;
+ if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
+ log.warn("Running low on memory");
+ gcTimeIncreasedCount = 0;
+ }
+ }
+
+ if (mem > lastMemorySize) {
+ sawChange = true;
+ }
+
+ String sign = "+";
+ if (mem - lastMemorySize <= 0) {
+ sign = "";
+ }
+
+ sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
+
+ if (sawChange) {
+ log.debug(sb.toString());
+ }
+
+ final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+ if (maxIncreaseInCollectionTime > keepAliveTimeout) {
+ Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
+ }
+
+ lastMemorySize = mem;
+ }
+
+ private TabletStatsKeeper statsKeeper;
+
+ private static class Session {
+ long lastAccessTime;
+ long startTime;
+ String user;
+ String client = TServerUtils.clientAddress.get();
+ public boolean reserved;
+
+ public void cleanup() {}
+ }
+
+ private static class SessionManager {
+
+ SecureRandom random;
+ Map<Long,Session> sessions;
+ long maxIdle;
+
+ SessionManager(AccumuloConfiguration conf) {
+ random = new SecureRandom();
+ sessions = new HashMap<Long,Session>();
+
+ maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ sweep(maxIdle);
+ }
+ };
+
+ SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
+ }
+
+ synchronized long createSession(Session session, boolean reserve) {
+ long sid = random.nextLong();
+
+ while (sessions.containsKey(sid)) {
+ sid = random.nextLong();
+ }
+
+ sessions.put(sid, session);
+
+ session.reserved = reserve;
+
+ session.startTime = session.lastAccessTime = System.currentTimeMillis();
+
+ return sid;
+ }
+
+ long getMaxIdleTime() {
+ return maxIdle;
+ }
+
+ /**
+ * while a session is reserved, it cannot be canceled or removed
+ *
+ * @param sessionId
+ */
+
+ synchronized Session reserveSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ synchronized Session reserveSession(long sessionId, boolean wait) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ while (wait && session.reserved) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ synchronized void unreserveSession(Session session) {
+ if (!session.reserved)
+ throw new IllegalStateException();
+ notifyAll();
+ session.reserved = false;
+ session.lastAccessTime = System.currentTimeMillis();
+ }
+
+ synchronized void unreserveSession(long sessionId) {
+ Session session = getSession(sessionId);
+ if (session != null)
+ unreserveSession(session);
+ }
+
+ synchronized Session getSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null)
+ session.lastAccessTime = System.currentTimeMillis();
+ return session;
+ }
+
+ Session removeSession(long sessionId) {
+ return removeSession(sessionId, false);
+ }
+
+ Session removeSession(long sessionId, boolean unreserve) {
+ Session session = null;
+ synchronized (this) {
+ session = sessions.remove(sessionId);
+ if (unreserve && session != null)
+ unreserveSession(session);
+ }
+
+ // do clean up out side of lock..
+ if (session != null)
+ session.cleanup();
+
+ return session;
+ }
+
+ private void sweep(long maxIdle) {
+ ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
+ synchronized (this) {
+ Iterator<Session> iter = sessions.values().iterator();
+ while (iter.hasNext()) {
+ Session session = iter.next();
+ long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+ if (idleTime > maxIdle && !session.reserved) {
+ iter.remove();
+ sessionsToCleanup.add(session);
+ }
+ }
+ }
+
+ // do clean up outside of lock
+ for (Session session : sessionsToCleanup) {
+ session.cleanup();
+ }
+ }
+
+ synchronized void removeIfNotAccessed(final long sessionId, long delay) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ final long removeTime = session.lastAccessTime;
+ TimerTask r = new TimerTask() {
+ @Override
+ public void run() {
+ Session sessionToCleanup = null;
+ synchronized (SessionManager.this) {
+ Session session2 = sessions.get(sessionId);
+ if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
+ sessions.remove(sessionId);
+ sessionToCleanup = session2;
+ }
+ }
+
+ // call clean up outside of lock
+ if (sessionToCleanup != null)
+ sessionToCleanup.cleanup();
+ }
+ };
+
+ SimpleTimer.getInstance().schedule(r, delay);
+ }
+ }
+
+ public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
+ Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+
+ Session session = entry.getValue();
+ @SuppressWarnings("rawtypes")
+ ScanTask nbt = null;
+ String tableID = null;
+
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+ nbt = ss.nextBatchTask;
+ tableID = ss.extent.getTableId().toString();
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+ nbt = mss.lookupTask;
+ tableID = mss.threadPoolExtent.getTableId().toString();
+ }
+
+ if (nbt == null)
+ continue;
+
+ ScanRunState srs = nbt.getScanRunState();
+
+ if (nbt == null || srs == ScanRunState.FINISHED)
+ continue;
+
+ MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+ if (stateCounts == null) {
+ stateCounts = new MapCounter<ScanRunState>();
+ counts.put(tableID, stateCounts);
+ }
+
+ stateCounts.increment(srs, 1);
+ }
+
+ return counts;
+ }
+
+ public synchronized List<ActiveScan> getActiveScans() {
+
+ ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+
+ long ct = System.currentTimeMillis();
+
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+ Session session = entry.getValue();
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+ state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<MultiScanResult> nbt = mss.lookupTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
+ ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
+ .getAuthorizationsBB()));
+ }
+ }
+
+ return activeScans;
+ }
+ }
+
+ static class TservConstraintEnv implements Environment {
+
+ private TCredentials credentials;
+ private SecurityOperation security;
+ private Authorizations auths;
+ private KeyExtent ke;
+
+ TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
+ this.security = secOp;
+ this.credentials = credentials;
+ }
+
+ void setExtent(KeyExtent ke) {
+ this.ke = ke;
+ }
+
+ @Override
+ public KeyExtent getExtent() {
+ return ke;
+ }
+
+ @Override
+ public String getUser() {
+ return credentials.getPrincipal();
+ }
+
+ @Override
+ @Deprecated
+ public Authorizations getAuthorizations() {
+ if (auths == null)
+ try {
+ this.auths = security.getUserAuthorizations(credentials);
+ } catch (ThriftSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ return auths;
+ }
+
+ @Override
+ public AuthorizationContainer getAuthorizationsContainer() {
+ return new AuthorizationContainer() {
+ @Override
+ public boolean contains(ByteSequence auth) {
+ try {
+ return security.userHasAuthorizations(credentials,
+ Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length())));
+ } catch (ThriftSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+
+ private abstract class ScanTask<T> implements RunnableFuture<T> {
+
+ protected AtomicBoolean interruptFlag;
+ protected ArrayBlockingQueue<Object> resultQueue;
+ protected AtomicInteger state;
+ protected AtomicReference<ScanRunState> runState;
+
+ private static final int INITIAL = 1;
+ private static final int ADDED = 2;
+ private static final int CANCELED = 3;
+
+ ScanTask() {
+ interruptFlag = new AtomicBoolean(false);
+ runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
+ state = new AtomicInteger(INITIAL);
+ resultQueue = new ArrayBlockingQueue<Object>(1);
+ }
+
+ protected void addResult(Object o) {
+ if (state.compareAndSet(INITIAL, ADDED))
+ resultQueue.add(o);
+ else if (state.get() == ADDED)
+ throw new IllegalStateException("Tried to add more than one result");
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (!mayInterruptIfRunning)
+ throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
+
+ if (state.get() == CANCELED)
+ return true;
+
+ if (state.compareAndSet(INITIAL, CANCELED)) {
+ interruptFlag.set(true);
+ resultQueue = null;
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+
+ ArrayBlockingQueue<Object> localRQ = resultQueue;
+
+ if (state.get() == CANCELED)
+ throw new CancellationException();
+
+ if (localRQ == null && state.get() == ADDED)
+ throw new IllegalStateException("Tried to get result twice");
+
+ Object r = localRQ.poll(timeout, unit);
+
+ // could have been canceled while waiting
+ if (state.get() == CANCELED) {
+ if (r != null)
+ throw new IllegalStateException("Nothing should have been added when in canceled state");
+
+ throw new CancellationException();
+ }
+
+ if (r == null)
+ throw new TimeoutException();
+
+ // make this method stop working now that something is being
+ // returned
+ resultQueue = null;
+
+ if (r instanceof Throwable)
+ throw new ExecutionException((Throwable) r);
+
+ return (T) r;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state.get() == CANCELED;
+ }
+
+ @Override
+ public boolean isDone() {
+ return runState.get().equals(ScanRunState.FINISHED);
+ }
+
+ public ScanRunState getScanRunState() {
+ return runState.get();
+ }
+
+ }
+
+ private static class ConditionalSession extends Session {
+ public TCredentials credentials;
+ public Authorizations auths;
+ public String tableId;
+ public AtomicBoolean interruptFlag;
+
+ @Override
+ public void cleanup() {
+ interruptFlag.set(true);
+ }
+ }
+
+ private static class UpdateSession extends Session {
+ public Tablet currentTablet;
+ public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
+ Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
+ HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
+ public Violations violations;
+ public TCredentials credentials;
+ public long totalUpdates = 0;
+ public long flushTime = 0;
+ Stat prepareTimes = new Stat();
+ Stat walogTimes = new Stat();
+ Stat commitTimes = new Stat();
+ Stat authTimes = new Stat();
+ public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
+ public long queuedMutationSize = 0;
+ TservConstraintEnv cenv = null;
+ }
+
+ private static class ScanSession extends Session {
+ public KeyExtent extent;
+ public HashSet<Column> columnSet;
+ public List<IterInfo> ssiList;
+ public Map<String,Map<String,String>> ssio;
+ public Authorizations auths;
+ public long entriesReturned = 0;
+ public Stat nbTimes = new Stat();
+ public long batchCount = 0;
+ public volatile ScanTask<ScanBatch> nextBatchTask;
+ public AtomicBoolean interruptFlag;
+ public Scanner scanner;
+ public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
+
+ @Override
+ public void cleanup() {
+ try {
+ if (nextBatchTask != null)
+ nextBatchTask.cancel(true);
+ } finally {
+ if (scanner != null)
+ scanner.close();
+ }
+ }
+
+ }
+
+ private static class MultiScanSession extends Session {
+ HashSet<Column> columnSet;
+ Map<KeyExtent,List<Range>> queries;
+ public List<IterInfo> ssiList;
+ public Map<String,Map<String,String>> ssio;
+ public Authorizations auths;
+
+ // stats
+ int numRanges;
+ int numTablets;
+ int numEntries;
+ long totalLookupTime;
+
+ public volatile ScanTask<MultiScanResult> lookupTask;
+ public KeyExtent threadPoolExtent;
+
+ @Override
+ public void cleanup() {
+ if (lookupTask != null)
+ lookupTask.cancel(true);
+ }
+ }
+
+ /**
+ * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
+ * are monotonically increasing.
+ *
+ */
+ static class WriteTracker {
+ private static AtomicLong operationCounter = new AtomicLong(1);
+ private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
+
+ WriteTracker() {
+ for (TabletType ttype : TabletType.values()) {
+ inProgressWrites.put(ttype, new TreeSet<Long>());
+ }
+ }
+
+ synchronized long startWrite(TabletType ttype) {
+ long operationId = operationCounter.getAndIncrement();
+ inProgressWrites.get(ttype).add(operationId);
+ return operationId;
+ }
+
+ synchronized void finishWrite(long operationId) {
+ if (operationId == -1)
+ return;
+
+ boolean removed = false;
+
+ for (TabletType ttype : TabletType.values()) {
+ removed = inProgressWrites.get(ttype).remove(operationId);
+ if (removed)
+ break;
+ }
+
+ if (!removed) {
+ throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
+ }
+
+ this.notifyAll();
+ }
+
+ synchronized void waitForWrites(TabletType ttype) {
+ long operationId = operationCounter.getAndIncrement();
+ while (inProgressWrites.get(ttype).floor(operationId) != null) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ }
+ }
+ }
+
+ public long startWrite(Set<Tablet> keySet) {
+ if (keySet.size() == 0)
+ return -1;
+
+ ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
+
+ for (Tablet tablet : keySet)
+ extents.add(tablet.getExtent());
+
+ return startWrite(TabletType.type(extents));
+ }
+ }
+
+ public AccumuloConfiguration getSystemConfiguration() {
+ return serverConfig.getConfiguration();
+ }
+
+ TransactionWatcher watcher = new TransactionWatcher();
+
+ private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
+
+ SessionManager sessionManager;
+
+ AccumuloConfiguration acuConf = getSystemConfiguration();
+
+ TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
+
+ TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
+
+ WriteTracker writeTracker = new WriteTracker();
+
+ private RowLocks rowLocks = new RowLocks();
+
+ ThriftClientHandler() {
+ super(instance, watcher, fs);
+ log.debug(ThriftClientHandler.class.getName() + " created");
+ sessionManager = new SessionManager(getSystemConfiguration());
+ // Register the metrics MBean
+ try {
+ updateMetrics.register();
+ scanMetrics.register();
+ } catch (Exception e) {
+ log.error("Exception registering MBean with MBean Server", e);
+ }
+ }
+
+ @Override
+ public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
+ throws ThriftSecurityException {
+
+ if (!security.canPerformSystemActions(credentials))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+
+ for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+ TKeyExtent tke = entry.getKey();
+ Map<String,MapFileInfo> fileMap = entry.getValue();
+ Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
+ for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+ Path path = new Path(mapping.getKey());
+ FileSystem ns = fs.getFileSystemByPath(path);
+ path = ns.makeQualified(path);
+ fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
+ }
+
+ Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+
+ if (importTablet == null) {
+ failures.add(tke);
+ } else {
+ try {
+ importTablet.importMapFiles(tid, fileRefMap, setTime);
+ } catch (IOException ioe) {
+ log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
+ failures.add(tke);
+ }
+ }
+ }
+ return failures;
+ }
+
+ private class NextBatchTask extends ScanTask<ScanBatch> {
+
+ private long scanID;
+
+ NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
+ this.scanID = scanID;
+ this.interruptFlag = interruptFlag;
+
+ if (interruptFlag.get())
+ cancel(true);
+ }
+
+ @Override
+ public void run() {
+
+ final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
+ String oldThreadName = Thread.currentThread().getName();
+
+ try {
+ if (isCancelled() || scanSession == null)
+ return;
+
+ runState.set(ScanRunState.RUNNING);
+
+ Thread.currentThread().setName(
+ "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
+
+ Tablet tablet = onlineTablets.get(scanSession.extent);
+
+ if (tablet == null) {
+ addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+ return;
+ }
+
+ long t1 = System.currentTimeMillis();
+ ScanBatch batch = scanSession.scanner.read();
+ long t2 = System.currentTimeMillis();
+ scanSession.nbTimes.addStat(t2 - t1);
+
+ // there should only be one thing on the queue at a time, so
+ // it should be ok to call add()
+ // instead of put()... if add() fails because queue is at
+ // capacity it means there is code
+ // problem somewhere
+ addResult(batch);
+ } catch (TabletClosedException e) {
+ addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+ } catch (IterationInterruptedException iie) {
+ if (!isCancelled()) {
+ log.warn("Iteration interrupted, when scan not cancelled", iie);
+ addResult(iie);
+ }
+ } catch (TooManyFilesException tmfe) {
+ addResult(tmfe);
+ } catch (Throwable e) {
+ log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
+ addResult(e);
+ } finally {
+ runState.set(ScanRunState.FINISHED);
+ Thread.currentThread().setName(oldThreadName);
+ }
+
+ }
+ }
+
+ private class LookupTask extends ScanTask<MultiScanResult> {
+
+ private long scanID;
+
+ LookupTask(long scanID) {
+ this.scanID = scanID;
+ }
+
+ @Override
+ public void run() {
+ MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
+ String oldThreadName = Thread.currentThread().getName();
+
+ try {
+ if (isCancelled() || session == null)
+ return;
+
+ TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
+ long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+
+ runState.set(ScanRunState.RUNNING);
+ Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
+
+ long bytesAdded = 0;
+ long maxScanTime = 4000;
+
+ long startTime = System.currentTimeMillis();
+
+ ArrayList<KVEntry> results = new ArrayList<KVEntry>();
+ Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
+ ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
+ KeyExtent partScan = null;
+ Key partNextKey = null;
+ boolean partNextKeyInclusive = false;
+
+ Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
+
+ // check the time so that the read ahead thread is not monopolized
+ while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
+ Entry<KeyExtent,List<Range>> entry = iter.next();
+
+ iter.remove();
+
+ // check that tablet server is serving requested tablet
+ Tablet tablet = onlineTablets.get(entry.getKey());
+ if (tablet == null) {
+ failures.put(entry.getKey(), entry.getValue());
+ continue;
+ }
+ Thread.currentThread().setName(
+ "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
+
+ LookupResult lookupResult;
+ try {
+
+ // do the following check to avoid a race condition
+ // between setting false below and the task being
+ // canceled
+ if (isCancelled())
+ interruptFlag.set(true);
+
+ lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
+ session.ssio, interruptFlag);
+
+ // if the tablet was closed it it possible that the
+ // interrupt flag was set.... do not want it set for
+ // the next
+ // lookup
+ interruptFlag.set(false);
+
+ } catch (IOException e) {
+ log.warn("lookup failed for tablet " + entry.getKey(), e);
+ throw new RuntimeException(e);
+ }
+
+ bytesAdded += lookupResult.bytesAdded;
+
+ if (lookupResult.unfinishedRanges.size() > 0) {
+ if (lookupResult.closed) {
+ failures.put(entry.getKey(), lookupResult.unfinishedRanges);
+ } else {
+ session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
+ partScan = entry.getKey();
+ partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
+ partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
+ }
+ } else {
+ fullScans.add(entry.getKey());
+ }
+ }
+
+ long finishTime = System.currentTimeMillis();
+ session.totalLookupTime += (finishTime - startTime);
+ session.numEntries += results.size();
+
+ // convert everything to thrift before adding result
+ List<TKeyValue> retResults = new ArrayList<TKeyValue>();
+ for (KVEntry entry : results)
+ retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value)));
+ Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translator.KET, new Translator.ListTranslator<Range,TRange>(Translator.RT));
+ List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translator.KET);
+ TKeyExtent retPartScan = null;
+ TKey retPartNextKey = null;
+ if (partScan != null) {
+ retPartScan = partScan.toThrift();
+ retPartNextKey = partNextKey.toThrift();
+ }
+ // add results to queue
+ addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
+ } catch (IterationInterruptedException iie) {
+ if (!isCancelled()) {
+ log.warn("Iteration interrupted, when scan not cancelled", iie);
+ addResult(iie);
+ }
+ } catch (Throwable e) {
+ log.warn("exception while doing multi-scan ", e);
+ addResult(e);
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ runState.set(ScanRunState.FINISHED);
+ }
+ }
+ }
+
+ @Override
+ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
+ long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+
+ if (!security.canScan(credentials, new String(textent.getTable()), range, columns, ssiList, ssio, authorizations))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ if (!security.userHasAuthorizations(credentials, authorizations))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
+ KeyExtent extent = new KeyExtent(textent);
+
+ // wait for any writes that are in flight.. this done to ensure
+ // consistency across client restarts... assume a client writes
+ // to accumulo and dies while waiting for a confirmation from
+ // accumulo... the client process restarts and tries to read
+ // data from accumulo making the assumption that it will get
+ // any writes previously made... however if the server side thread
+ // processing the write from the dead client is still in progress,
+ // the restarted client may not see the write unless we wait here.
+ // this behavior is very important when the client is reading the
+ // metadata
+ if (waitForWrites)
+ writeTracker.waitForWrites(TabletType.type(extent));
+
+ Tablet tablet = onlineTablets.get(extent);
+ if (tablet == null)
+ throw new NotServingTabletException(textent);
+
+ ScanSession scanSession = new ScanSession();
+ scanSession.user = credentials.getPrincipal();
+ scanSession.extent = new KeyExtent(extent);
+ scanSession.columnSet = new HashSet<Column>();
+ scanSession.ssiList = ssiList;
+ scanSession.ssio = ssio;
+ scanSession.auths = new Authorizations(authorizations);
+ scanSession.interruptFlag = new AtomicBoolean();
+ scanSession.readaheadThreshold = readaheadThreshold;
+
+ for (TColumn tcolumn : columns) {
+ scanSession.columnSet.add(new Column(tcolumn));
+ }
+
+ scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
+ scanSession.interruptFlag);
+
+ long sid = sessionManager.createSession(scanSession, true);
+
+ ScanResult scanResult;
+ try {
+ scanResult = continueScan(tinfo, sid, scanSession);
+ } catch (NoSuchScanIDException e) {
+ log.error("The impossible happened", e);
+ throw new RuntimeException();
+ } finally {
+ sessionManager.unreserveSession(sid);
+ }
+
+ return new InitialScan(sid, scanResult);
+ }
+
+ @Override
+ public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+ ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
+ if (scanSession == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ try {
+ return continueScan(tinfo, scanID, scanSession);
+ } finally {
+ sessionManager.unreserveSession(scanSession);
+ }
+ }
+
+ private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+
+ if (scanSession.nextBatchTask == null) {
+ scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+ resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+ }
+
+ ScanBatch bresult;
+ try {
+ bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ scanSession.nextBatchTask = null;
+ } catch (ExecutionException e) {
+ sessionManager.removeSession(scanID);
+ if (e.getCause() instanceof NotServingTabletException)
+ throw (NotServingTabletException) e.getCause();
+ else if (e.getCause() instanceof TooManyFilesException)
+ throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift());
+ else
+ throw new RuntimeException(e);
+ } catch (CancellationException ce) {
+ sessionManager.removeSession(scanID);
+ Tablet tablet = onlineTablets.get(scanSession.extent);
+ if (tablet == null || tablet.isClosed())
+ throw new NotServingTabletException(scanSession.extent.toThrift());
+ else
+ throw new NoSuchScanIDException();
+ } catch (TimeoutException e) {
+ List<TKeyValue> param = Collections.emptyList();
+ long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ sessionManager.removeIfNotAccessed(scanID, timeout);
+ return new ScanResult(param, true);
+ } catch (Throwable t) {
+ sessionManager.removeSession(scanID);
+ log.warn("Failed to get next batch", t);
+ throw new RuntimeException(t);
+ }
+
+ ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
+
+ scanSession.entriesReturned += scanResult.results.size();
+
+ scanSession.batchCount++;
+
+ if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
+ // start reading next batch while current batch is transmitted
+ // to client
+ scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+ resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+ }
+
+ if (!scanResult.more)
+ closeScan(tinfo, scanID);
+
+ return scanResult;
+ }
+
+ @Override
+ public void closeScan(TInfo tinfo, long scanID) {
+ ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
+ if (ss != null) {
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
+ .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+ if (scanMetrics.isEnabled()) {
+ scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime);
+ scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned);
+ }
+ }
+ }
+
+ @Override
+ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
+ // find all of the tables that need to be scanned
+ HashSet<String> tables = new HashSet<String>();
+ for (TKeyExtent keyExtent : tbatch.keySet()) {
+ tables.add(new String(keyExtent.getTable()));
+ }
+
+ if (tables.size() != 1)
+ throw new IllegalArgumentException("Cannot batch scan over multiple tables");
+
+ // check if user has permission to the tables
+ for (String table : tables)
+ if (!security.canScan(credentials, table, tbatch, tcolumns, ssiList, ssio, authorizations))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ try {
+ if (!security.userHasAuthorizations(credentials, authorizations))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+ } catch (ThriftSecurityException tse) {
+ log.error(tse, tse);
+ throw tse;
+ }
+ Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(), new Translator.ListTranslator<TRange,Range>(
+ new TRangeTranslator()));
+
+ // This is used to determine which thread pool to use
+ KeyExtent threadPoolExtent = batch.keySet().iterator().next();
+
+ if (waitForWrites)
+ writeTracker.waitForWrites(TabletType.type(batch.keySet()));
+
+ MultiScanSession mss = new MultiScanSession();
+ mss.user = credentials.getPrincipal();
+ mss.queries = batch;
+ mss.columnSet = new HashSet<Column>(tcolumns.size());
+ mss.ssiList = ssiList;
+ mss.ssio = ssio;
+ mss.auths = new Authorizations(authorizations);
+
+ mss.numTablets = batch.size();
+ for (List<Range> ranges : batch.values()) {
+ mss.numRanges += ranges.size();
+ }
+
+ for (TColumn tcolumn : tcolumns)
+ mss.columnSet.add(new Column(tcolumn));
+
+ mss.threadPoolExtent = threadPoolExtent;
+
+ long sid = sessionManager.createSession(mss, true);
+
+ MultiScanResult result;
+ try {
+ result = continueMultiScan(tinfo, sid, mss);
+ } catch (NoSuchScanIDException e) {
+ log.error("the impossible happened", e);
+ throw new RuntimeException("the impossible happened", e);
+ } finally {
+ sessionManager.unreserveSession(sid);
+ }
+
+ return new InitialMultiScan(sid, result);
+ }
+
+ @Override
+ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+
+ MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
+
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ try {
+ return continueMultiScan(tinfo, scanID, session);
+ } finally {
+ sessionManager.unreserveSession(session);
+ }
+ }
+
+ private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
+
+ if (session.lookupTask == null) {
+ session.lookupTask = new LookupTask(scanID);
+ resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
+ }
+
+ try {
+ MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ session.lookupTask = null;
+ return scanResult;
+ } catch (TimeoutException e1) {
+ long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ sessionManager.removeIfNotAccessed(scanID, timeout);
+ List<TKeyValue> results = Collections.emptyList();
+ Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
+ List<TKeyExtent> fullScans = Collections.emptyList();
+ return new MultiScanResult(results, failures, fullScans, null, null, false, true);
+ } catch (Throwable t) {
+ sessionManager.removeSession(scanID);
+ log.warn("Failed to get multiscan result", t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ @Override
+ public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+ MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
+ session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
+ }
+
+ @Override
+ public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
+ // Make sure user is real
+
+ security.authenticateUser(credentials, credentials);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+
+ UpdateSession us = new UpdateSession();
+ us.violations = new Violations();
+ us.credentials = credentials;
+ us.cenv = new TservConstraintEnv(security, us.credentials);
+
+ long sid = sessionManager.createSession(us, false);
+
+ return sid;
+ }
+
+ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
+ long t1 = System.currentTimeMillis();
+ if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
+ return;
+ if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
+ // if there were previous failures, then do not accept additional writes
+ return;
+ }
+
+ try {
+ // if user has no permission to write to this table, add it to
+ // the failures list
+ boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
+ if (sameTable || security.canWrite(us.credentials, keyExtent.getTableId().toString())) {
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = onlineTablets.get(keyExtent);
+ if (us.currentTablet != null) {
+ us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
+ } else {
+ // not serving tablet, so report all mutations as
+ // failures
+ us.failures.put(keyExtent, 0l);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0);
+ }
+ } else {
+ log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal());
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+ return;
+ }
+ } catch (ThriftSecurityException e) {
+ log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e);
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, e.getCode());
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+ return;
+ }
+ }
+
+ @Override
+ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
+ UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
+ if (us == null) {
+ throw new RuntimeException("No Such SessionID");
+ }
+
+ try {
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+ setUpdateTablet(us, keyExtent);
+
+ if (us.currentTablet != null) {
+ List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
+ for (TMutation tmutation : tmutations) {
+ Mutation mutation = new ServerMutation(tmutation);
+ mutations.add(mutation);
+ us.queuedMutationSize += mutation.numBytes();
+ }
+ if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
+ flush(us);
+ }
+ } finally {
+ sessionManager.unreserveSession(us);
+ }
+ }
+
+ private void flush(UpdateSession us) {
+
+ int mutationCount = 0;
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+ Throwable error = null;
+
+ long pt1 = System.currentTimeMillis();
+
+ boolean containsMetadataTablet = false;
+ for (Tablet tablet : us.queuedMutations.keySet())
+ if (tablet.getExtent().isMeta())
+ containsMetadataTablet = true;
+
+ if (!containsMetadataTablet && us.queuedMutations.size() > 0)
+ TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+
+ Span prep = Trace.start("prep");
+ try {
+ for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
+
+ Tablet tablet = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+ if (mutations.size() > 0) {
+ try {
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
+
+ CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
+ if (commitSession == null) {
+ if (us.currentTablet == tablet) {
+ us.currentTablet = null;
+ }
+ us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
+ } else {
+ sendables.put(commitSession, mutations);
+ mutationCount += mutations.size();
+ }
+
+ } catch (TConstraintViolationException e) {
+ us.violations.add(e.getViolations());
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
+
+ if (e.getNonViolators().size() > 0) {
+ // only log and commit mutations if there were some
+ // that did not
+ // violate constraints... this is what
+ // prepareMutationsForCommit()
+ // expects
+ sendables.put(e.getCommitSession(), e.getNonViolators());
+ }
+
+ mutationCount += mutations.size();
+
+ } catch (HoldTimeoutException t) {
+ error = t;
+ log.debug("Giving up on mutations due to a long memory hold time");
+ break;
+ } catch (Throwable t) {
+ error = t;
+ log.error("Unexpected error preparing for commit", error);
+ break;
+ }
+ }
+ }
+ } finally {
+ prep.stop();
+ }
+
+ long pt2 = System.currentTimeMillis();
+ us.prepareTimes.addStat(pt2 - pt1);
+ updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
+
+ if (error != null) {
+ for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
+ e.getKey().abortCommit(e.getValue());
+ }
+ throw new RuntimeException(error);
+ }
+ try {
+ Span wal = Trace.start("wal");
+ try {
+ while (true) {
+ try {
+ long t1 = System.currentTimeMillis();
+
+ logger.logManyTablets(sendables);
+
+ long t2 = System.currentTimeMillis();
+ us.walogTimes.addStat(t2 - t1);
+ updateWalogWriteTime((t2 - t1));
+ break;
+ } catch (IOException ex) {
+ log.warn("logging mutations failed, retrying");
+ } catch (FSError ex) { // happens when DFS is localFS
+ log.warn("logging mutations failed, retrying");
+ } catch (Throwable t) {
+ log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+ throw new RuntimeException(t);
+ }
+ }
+ } finally {
+ wal.stop();
+ }
+
+ Span commit = Trace.start("commit");
+ try {
+ long t1 = System.currentTimeMillis();
+ for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+ CommitSession commitSession = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+
+ commitSession.commit(mutations);
+
+ Tablet tablet = commitSession.getTablet();
+
+ if (tablet == us.currentTablet) {
+ // because constraint violations may filter out some
+ // mutations, for proper
+ // accounting with the client code, need to increment
+ // the count based
+ // on the original number of mutations from the client
+ // NOT the filtered number
+ us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
+ }
+ }
+ long t2 = System.currentTimeMillis();
+
+ us.flushTime += (t2 - pt1);
+ us.commitTimes.addStat(t2 - t1);
+
+ updateAvgCommitTime(t2 - t1, sendables.size());
+ } finally {
+ commit.stop();
+ }
+ } finally {
+ us.queuedMutations.clear();
+ if (us.currentTablet != null) {
+ us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
+ }
+ us.queuedMutationSize = 0;
+ }
+ us.totalUpdates += mutationCount;
+ }
+
+ private void updateWalogWriteTime(long time) {
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, time);
+ }
+
+ private void updateAvgCommitTime(long time, int size) {
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.commitTime, (long) ((time) / (double) size));
+ }
+
+ private void updateAvgPrepTime(long time, int size) {
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (long) ((time) / (double) size));
+ }
+
+ @Override
+ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
+ UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
+ if (us == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ // clients may or may not see data from an update session while
+ // it is in progress, however when the update session is closed
+ // want to ensure that reads wait for the write to finish
+ long opid = writeTracker.startWrite(us.queuedMutations.keySet());
+
+ try {
+ flush(us);
+ } finally {
+ writeTracker.finishWrite(opid);
+ }
+
+ log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
+ (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
+ us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
+ if (us.failures.size() > 0) {
+ Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
+ log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue()));
+ }
+ List<ConstraintViolationSummary> violations = us.violations.asList();
+ if (violations.size() > 0) {
+ ConstraintViolationSummary first = us.violations.asList().iterator().next();
+ log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations));
+ }
+ if (us.authFailures.size() > 0) {
+ KeyExtent first = us.authFailures.keySet().iterator().next();
+ log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
+ }
+
+ return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
+ us.authFailures, Translator.KET));
+ }
+
+ @Override
+ public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
+ ConstraintViolationException, ThriftSecurityException {
+
+ if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+ Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
+ if (tablet == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ if (!keyExtent.isMeta())
+ TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+
+ long opid = writeTracker.startWrite(TabletType.type(keyExtent));
+
+ try {
+ Mutation mutation = new ServerMutation(tmutation);
+ List<Mutation> mutations = Collections.singletonList(mutation);
+
+ Span prep = Trace.start("prep");
+ CommitSession cs;
+ try {
+ cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
+ } finally {
+ prep.stop();
+ }
+ if (cs == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ while (true) {
+ try {
+ Span wal = Trace.start("wal");
+ try {
+ logger.log(cs, cs.getWALogSeq(), mutation);
+ } finally {
+ wal.stop();
+ }
+ break;
+ } catch (IOException ex) {
+ log.warn(ex, ex);
+ }
+ }
+
+ Span commit = Trace.start("commit");
+ try {
+ cs.commit(mutations);
+ } finally {
+ commit.stop();
+ }
+ } catch (TConstraintViolationException e) {
+ throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translator.CVST));
+ } finally {
+ writeTracker.finishWrite(opid);
+ }
+ }
+
+ private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
+ List<String> symbols) throws IOException {
+ Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
+
+ CompressedIterators compressedIters = new CompressedIterators(symbols);
+
+ while (iter.hasNext()) {
+ Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
+ Tablet tablet = onlineTablets.get(entry.getKey());
+
+ if (tablet == null || tablet.isClosed()) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ iter.remove();
+ } else {
+ List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
+
+ for (ServerConditionalMutation scm : entry.getValue()) {
+ if (checkCondition(results, cs, compressedIters, tablet, scm))
+ okMutations.add(scm);
+ }
+
+ entry.setValue(okMutations);
+ }
+
+ }
+ }
+
+ boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet,
+ ServerConditionalMutation scm) throws IOException {
+ boolean add = true;
+
+ Set<Column> emptyCols = Collections.emptySet();
+
+ for (TCondition tc : scm.getConditions()) {
+
+ Range range;
+ if (tc.hasTimestamp)
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+ else
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+
+ IterConfig ic = compressedIters.decompress(tc.iterators);
+
+ Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
+
+ try {
+ ScanBatch batch = scanner.read();
+
+ Value val = null;
+
+ for (KVEntry entry2 : batch.results) {
+ val = entry2.getValue();
+ break;
+ }
+
+ if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+ add = false;
+ break;
+ }
+
+ } catch (TabletClosedException e) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (IterationInterruptedException iie) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (TooManyFilesException tmfe) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ }
+ }
+ return add;
+ }
+
+ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
+ Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
+
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+
+ boolean sessionCanceled = sess.interruptFlag.get();
+
+ Span prepSpan = Trace.start("prep");
+ try {
+ long t1 = System.currentTimeMillis();
+ for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+ Tablet tablet = onlineTablets.get(entry.getKey());
+ if (tablet == null || tablet.isClosed() || sessionCanceled) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ } else {
+ try {
+
+ @SuppressWarnings("unchecked")
+ List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
+ if (mutations.size() > 0) {
+
+ CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
+
+ if (cs == null) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ } else {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
+ sendables.put(cs, mutations);
+ }
+ }
+ } catch (TConstraintViolationException e) {
+ if (e.getNonViolators().size() > 0) {
+ sendables.put(e.getCommitSession(), e.getNonViolators());
+ for (Mutation m : e.getNonViolators())
+ results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
+ }
+
+ for (Mutation m : e.getViolators())
+ results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
+ }
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+ updateAvgPrepTime(t2 - t1, es.size());
+ } finally {
+ prepSpan.stop();
+ }
+
+ Span walSpan = Trace.start("wal");
+ try {
+ while (true && sendables.size() > 0) {
+ try {
+ long t1 = System.currentTimeMillis();
+ logger.logManyTablets(sendables);
+ long t2 = System.currentTimeMillis();
+ updateWalogWriteTime(t2 - t1);
+ break;
+ } catch (IOException ex) {
+ log.warn("logging mutations failed, retrying");
+ } catch (FSError ex) { // happens when DFS is localFS
+ log.warn("logging mutations failed, retrying");
+ } catch (Throwable t) {
+ log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+ throw new RuntimeException(t);
+ }
+ }
+ } finally {
+ walSpan.stop();
+ }
+
+ Span commitSpan = Trace.start("commit");
+ try {
+ long t1 = System.currentTimeMillis();
+ for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+ CommitSession commitSession = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+
+ commitSession.commit(mutations);
+ }
+ long t2 = System.currentTimeMillis();
+ updateAvgCommitTime(t2 - t1, sendables.size());
+ } finally {
+ commitSpan.stop();
+ }
+
+ }
+
+ private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
+ ArrayList<TCMResult> results, List<String> symbols) throws IOException {
+ // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
+ ConditionalMutationSet.sortConditionalMutations(updates);
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
+
+ // can not process two mutations for the same row, because one will not see what the other writes
+ ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
+
+ // get as many locks as possible w/o blocking... defer any rows that are locked
+ List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+ try {
+ Span checkSpan = Trace.start("Check conditions");
+ try {
+ checkConditions(updates, results, cs, symbols);
+ } finally {
+ checkSpan.stop();
+ }
+
+ Span updateSpan = Trace.start("apply conditional mutations");
+ try {
+ writeConditionalMutations(updates, results, cs);
+ } finally {
+ updateSpan.stop();
+ }
+ } finally {
+ rowLocks.releaseRowLocks(locks);
+ }
+ return deferred;
+ }
+
+ @Override
+ public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
+ throws ThriftSecurityException, TException {
+
+ Authorizations userauths = null;
+ if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ userauths = security.getUserAuthorizations(credentials);
+ for (ByteBuffer auth : authorizations)
+ if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
+ ConditionalSession cs = new ConditionalSession();
+ cs.auths = new Authorizations(authorizations);
+ cs.credentials = credentials;
+ cs.tableId = tableID;
+ cs.interruptFlag = new AtomicBoolean();
+
+ long sid = sessionManager.createSession(cs, false);
+ return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
+ }
+
+ @Override
+ public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+ throws NoSuchScanIDException, TException {
+
+ ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
+
+ if (cs == null || cs.interruptFlag.get())
+ throw new NoSuchScanIDException();
+
+ if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID))
+ TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+
+ Text tid = new Text(cs.tableId);
+ long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
+
+ try {
+ Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
+ new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+
+ for (KeyExtent ke : updates.keySet())
+ if (!ke.getTableId().equals(tid))
+ throw new IllegalArgumentException("Unexpected table id " + tid + " != " + ke.getTableId());
+
+ ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
+
+ while (deferred.size() > 0) {
+ deferred = conditionalUpdate(cs, deferred, results, symbols);
+ }
+
+ return results;
+ } catch (IOException ioe) {
+ throw new TException(ioe);
+ } finally {
+ writeTracker.finishWrite(opid);
+ sessionManager.unreserveSession(sessID);
+ }
+ }
+
+ @Override
+ public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+ // this method should wait for any running conditional update to complete
+ // after this method returns a conditional update should not be able to start
+
+ ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
+ if (cs != null)
+ cs.interruptFlag.set(true);
+
+ cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
+ if (cs != null)
+ sessionManager.removeSession(sessID, true);
+ }
+
+ @Override
+ public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+ sessionManager.removeSession(sessID, false);
+ }
+
+ @Override
+ public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
+ ThriftSecurityException {
+
+ String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
+ if (!security.canSplitTablet(credentials, tableId))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+
+ Tablet tablet = onlineTablets.get(keyExtent);
+ if (tablet == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
+ try {
+ if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+ } catch (IOException e) {
+ log.warn("Failed to split " + keyExtent, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return getStats(sessionManager.getActiveScansPerTable());
+ }
+
+ @Override
+ public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
+ TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
+ synchronized (onlineTablets) {
+ onlineTabletsCopy = new TreeMap<KeyExtent,Tablet>(onlineTablets);
+ }
+ List<TabletStats> result = new ArrayList<TabletStats>();
+ Text text = new Text(tableId);
+ KeyExtent start = new KeyExtent(text, new Text(), null);
+ for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.tailMap(start).entrySet()) {
+ KeyExtent ke = entry.getKey();
+ if (ke.getTableId().compareTo(text) == 0) {
+ Tablet tablet = entry.getValue();
+ TabletStats stats = tablet.timer.getTabletStats();
+ stats.extent = ke.toThrift();
+ stats.ingestRate = tablet.ingestRate();
+ stats.queryRate = tablet.queryRate();
+ stats.splitCreationTime = tablet.getSplitCreationTime();
+ stats.numEntries = tablet.getNumEntries();
+ result.add(stats);
+ }
+ }
+ return result;
+ }
+
+ private ZooCache masterLockCache = new ZooCache();
+
+ private void checkPermission(TCredentials credentials, String lock, final String request) throws ThriftSecurityException {
+ boolean fatal = false;
+ try {
+ log.debug("Got " + request + " message from user: " + credentials.getPrincipal());
+ if (!security.canPerformSystemActions(credentials)) {
+ log.warn("Got " + request + " message from user: " + credentials.getPrincipal());
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ }
+ } catch (ThriftSecurityException e) {
+ log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser());
+ if (SystemCredentials.get().getToken().getClass().getName().equals(credentials.getTokenClassName())) {
+ log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e);
+ fatal = true;
+ }
+ throw e;
+ } finally {
+ if (fatal) {
+ Halt.halt(1, new Runnable() {
+ @Override
+ public void run() {
+ logGCInfo(getSystemConfiguration());
+ }
+ });
+ }
+ }
+
+ if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
+ log.warn("Got " + request + " message from master before lock acquired, ignoring...");
+ throw new RuntimeException("Lock not acquired");
+ }
+
+ if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
+ Halt.halt(1, new Runnable() {
+ @Override
+ public void run() {
+ log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting");
+ logGCInfo(getSystemConfiguration());
+ }
+ });
+ }
+
+ if (lock != null) {
+ ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
+
+ try {
+ if (!ZooLock.isLockHeld(masterLockCache, lid)) {
+ // maybe the cache is out of date and a new master holds the
+ // lock?
+ masterLockCache.clear();
+ if (!ZooLock.isLockHeld(masterLockCache, lid)) {
+ log.warn("Got " + request + " message from a master that does not hold the current lock " + lock);
+ throw new RuntimeException("bad master lock");
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("bad master lock", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadTablet(
<TRUNCATED>