You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/04/15 10:50:12 UTC
[accumulo] branch master updated: Fix #1581 Move
ThriftClientHandler out of TabletServer (#1584)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 960a009 Fix #1581 Move ThriftClientHandler out of TabletServer (#1584)
960a009 is described below
commit 960a009f0899df1b5af31be122d468361c6febb4
Author: andrewglowacki <an...@live.com>
AuthorDate: Wed Apr 15 06:50:01 2020 -0400
Fix #1581 Move ThriftClientHandler out of TabletServer (#1584)
Move ThriftClientHandler, AssignmentHandler and UnloadTabletHandler classes out of
TabletServer and into their own separate files.
---
.../server/client/ClientServiceHandler.java | 6 +-
.../apache/accumulo/tserver/AssignmentHandler.java | 238 +++
.../org/apache/accumulo/tserver/TabletServer.java | 2092 +-------------------
.../tserver/TabletServerResourceManager.java | 1 -
.../accumulo/tserver/ThriftClientHandler.java | 1792 +++++++++++++++++
.../accumulo/tserver/UnloadTabletHandler.java | 141 ++
6 files changed, 2201 insertions(+), 2069 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index bbfa6c0..2c8aad9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -74,9 +74,9 @@ import org.slf4j.LoggerFactory;
public class ClientServiceHandler implements ClientService.Iface {
private static final Logger log = LoggerFactory.getLogger(ClientServiceHandler.class);
protected final TransactionWatcher transactionWatcher;
- private final ServerContext context;
- private final VolumeManager fs;
- private final SecurityOperation security;
+ protected final ServerContext context;
+ protected final VolumeManager fs;
+ protected final SecurityOperation security;
private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
public ClientServiceHandler(ServerContext context, TransactionWatcher transactionWatcher,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
new file mode 100644
index 0000000..6bebe14
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.fate.util.LoggingRunnable;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AssignmentHandler implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(AssignmentHandler.class);
+ private final KeyExtent extent;
+ private final int retryAttempt;
+ private final TabletServer server;
+
+ public AssignmentHandler(TabletServer server, KeyExtent extent) {
+ this(server, extent, 0);
+ }
+
+ public AssignmentHandler(TabletServer server, KeyExtent extent, int retryAttempt) {
+ this.server = server;
+ this.extent = extent;
+ this.retryAttempt = retryAttempt;
+ }
+
+ @Override
+ public void run() {
+ synchronized (server.unopenedTablets) {
+ synchronized (server.openingTablets) {
+ synchronized (server.onlineTablets) {
+ // nothing should be moving between sets, do a sanity
+ // check
+ Set<KeyExtent> unopenedOverlapping =
+ KeyExtent.findOverlapping(extent, server.unopenedTablets);
+ Set<KeyExtent> openingOverlapping =
+ KeyExtent.findOverlapping(extent, server.openingTablets);
+ Set<KeyExtent> onlineOverlapping =
+ KeyExtent.findOverlapping(extent, server.onlineTablets.snapshot());
+
+ if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent)) {
+ return;
+ }
+
+ if (!unopenedOverlapping.contains(extent)) {
+ log.info("assignment {} no longer in the unopened set", extent);
+ return;
+ }
+
+ if (unopenedOverlapping.size() != 1 || openingOverlapping.size() > 0
+ || onlineOverlapping.size() > 0) {
+ throw new IllegalStateException(
+ "overlaps assigned " + extent + " " + !server.unopenedTablets.contains(extent) + " "
+ + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
+ }
+ }
+
+ server.unopenedTablets.remove(extent);
+ server.openingTablets.add(extent);
+ }
+ }
+
+ // check Metadata table before accepting assignment
+ Text locationToOpen = null;
+ TabletMetadata tabletMetadata = null;
+ boolean canLoad = false;
+ try {
+ tabletMetadata = server.getContext().getAmple().readTablet(extent);
+
+ canLoad = TabletServer.checkTabletMetadata(extent, server.getTabletSession(), tabletMetadata);
+
+ if (canLoad && tabletMetadata.sawOldPrevEndRow()) {
+ KeyExtent fixedExtent =
+ MasterMetadataUtil.fixSplit(server.getContext(), tabletMetadata, server.getLock());
+
+ synchronized (server.openingTablets) {
+ server.openingTablets.remove(extent);
+ server.openingTablets.notifyAll();
+ // it expected that the new extent will overlap the old one... if it does not, it
+ // should not be added to unopenedTablets
+ if (!KeyExtent.findOverlapping(extent, new TreeSet<>(Arrays.asList(fixedExtent)))
+ .contains(fixedExtent)) {
+ throw new IllegalStateException(
+ "Fixed split does not overlap " + extent + " " + fixedExtent);
+ }
+ server.unopenedTablets.add(fixedExtent);
+ }
+ // split was rolled back... try again
+ new AssignmentHandler(server, fixedExtent).run();
+ return;
+
+ }
+ } catch (Exception e) {
+ synchronized (server.openingTablets) {
+ server.openingTablets.remove(extent);
+ server.openingTablets.notifyAll();
+ }
+ log.warn("Failed to verify tablet " + extent, e);
+ server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
+ throw new RuntimeException(e);
+ }
+
+ if (!canLoad) {
+ log.debug("Reporting tablet {} assignment failure: unable to verify Tablet Information",
+ extent);
+ synchronized (server.openingTablets) {
+ server.openingTablets.remove(extent);
+ server.openingTablets.notifyAll();
+ }
+ server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
+ return;
+ }
+
+ Tablet tablet = null;
+ boolean successful = false;
+
+ try {
+ server.acquireRecoveryMemory(extent);
+
+ TabletResourceManager trm = server.resourceManager.createTabletResourceManager(extent,
+ server.getTableConfiguration(extent));
+ TabletData data = new TabletData(tabletMetadata);
+
+ tablet = new Tablet(server, extent, trm, data);
+ // If a minor compaction starts after a tablet opens, this indicates a log recovery
+ // occurred. This recovered data must be minor compacted.
+ // There are three reasons to wait for this minor compaction to finish before placing the
+ // tablet in online tablets.
+ //
+ // 1) The log recovery code does not handle data written to the tablet on multiple tablet
+ // servers.
+ // 2) The log recovery code does not block if memory is full. Therefore recovering lots of
+ // tablets that use a lot of memory could run out of memory.
+ // 3) The minor compaction finish event did not make it to the logs (the file will be in
+ // metadata, preventing replay of compacted data)... but do not
+ // want a majc to wipe the file out from metadata and then have another process failure...
+ // this could cause duplicate data to replay.
+ if (tablet.getNumEntriesInMemory() > 0
+ && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) {
+ throw new RuntimeException("Minor compaction after recovery fails for " + extent);
+ }
+ Assignment assignment = new Assignment(extent, server.getTabletSession());
+ TabletStateStore.setLocation(server.getContext(), assignment);
+
+ synchronized (server.openingTablets) {
+ synchronized (server.onlineTablets) {
+ server.openingTablets.remove(extent);
+ server.onlineTablets.put(extent, tablet);
+ server.openingTablets.notifyAll();
+ server.recentlyUnloadedCache.remove(tablet.getExtent());
+ }
+ }
+ tablet = null; // release this reference
+ successful = true;
+ } catch (Throwable e) {
+ log.warn("exception trying to assign tablet {} {}", extent, locationToOpen, e);
+
+ if (e.getMessage() != null) {
+ log.warn("{}", e.getMessage());
+ }
+
+ TableId tableId = extent.getTableId();
+ ProblemReports.getInstance(server.getContext()).report(new ProblemReport(tableId, TABLET_LOAD,
+ extent.getUUID().toString(), server.getClientAddressString(), e));
+ } finally {
+ server.releaseRecoveryMemory(extent);
+ }
+
+ if (!successful) {
+ synchronized (server.unopenedTablets) {
+ synchronized (server.openingTablets) {
+ server.openingTablets.remove(extent);
+ server.unopenedTablets.add(extent);
+ server.openingTablets.notifyAll();
+ }
+ }
+ log.warn("failed to open tablet {} reporting failure to master", extent);
+ server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
+ long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, 10 * 60 * 1000L);
+ log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
+ SimpleTimer.getInstance(server.getConfiguration()).schedule(new TimerTask() {
+ @Override
+ public void run() {
+ log.info("adding tablet {} back to the assignment pool (retry {})", extent, retryAttempt);
+ AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
+ if (extent.isMeta()) {
+ if (extent.isRootTablet()) {
+ new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
+ } else {
+ server.resourceManager.addMetaDataAssignment(extent, log, handler);
+ }
+ } else {
+ server.resourceManager.addAssignment(extent, log, handler);
+ }
+ }
+ }, reschedule);
+ } else {
+ server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOADED, extent));
+ }
+ }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 4ba635c..0a6d359 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -19,15 +19,11 @@
package org.apache.accumulo.tserver;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
-import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,113 +41,43 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
-import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Durability;
-import org.apache.accumulo.core.client.SampleNotPresentException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.clientImpl.CompressedIterators;
import org.apache.accumulo.core.clientImpl.DurabilityImpl;
-import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.clientImpl.TabletLocator;
-import org.apache.accumulo.core.clientImpl.TabletType;
-import org.apache.accumulo.core.clientImpl.Translator;
-import org.apache.accumulo.core.clientImpl.Translator.TKeyExtentTranslator;
-import org.apache.accumulo.core.clientImpl.Translator.TRangeTranslator;
-import org.apache.accumulo.core.clientImpl.Translators;
-import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
-import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Column;
-import org.apache.accumulo.core.data.ConstraintViolationSummary;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.NamespaceId;
-import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
-import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
-import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
-import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
-import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
-import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
-import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
-import org.apache.accumulo.core.dataImpl.thrift.TCMStatus;
-import org.apache.accumulo.core.dataImpl.thrift.TColumn;
-import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
-import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
-import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
-import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
-import org.apache.accumulo.core.dataImpl.thrift.TMutation;
-import org.apache.accumulo.core.dataImpl.thrift.TRange;
-import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
-import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
-import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
-import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
-import org.apache.accumulo.core.iterators.IterationInterruptedException;
-import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.master.thrift.Compacting;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.master.thrift.TableInfo;
-import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.rpc.ThriftUtil;
-import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
-import org.apache.accumulo.core.spi.cache.BlockCache;
-import org.apache.accumulo.core.spi.scan.ScanDispatcher;
-import org.apache.accumulo.core.summary.Gatherer;
-import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
-import org.apache.accumulo.core.summary.SummaryCollection;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
-import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
-import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.tabletserver.thrift.TDurability;
-import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
-import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
-import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.ComparablePair;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.HostAndPort;
@@ -172,7 +98,6 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
@@ -180,9 +105,7 @@ import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.TabletLevel;
-import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.fs.VolumeChooserEnvironment;
import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -190,15 +113,7 @@ import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.master.recovery.RecoveryPath;
-import org.apache.accumulo.server.master.state.Assignment;
-import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
-import org.apache.accumulo.server.master.state.TabletStateStore;
-import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.replication.ZooKeeperInitialization;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
@@ -211,93 +126,63 @@ import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretM
import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyWatcher;
import org.apache.accumulo.server.util.FileSystemMonitor;
import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.MasterMetadataUtil;
import org.apache.accumulo.server.util.ServerBulkImportStatus;
import org.apache.accumulo.server.util.time.RelativeTime;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.start.classloader.vfs.ContextManager;
-import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
-import org.apache.accumulo.tserver.RowLocks.RowLock;
import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
-import org.apache.accumulo.tserver.data.ServerConditionalMutation;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.log.LogSorter;
import org.apache.accumulo.tserver.log.MutationReceiver;
import org.apache.accumulo.tserver.log.TabletServerLogger;
import org.apache.accumulo.tserver.mastermessage.MasterMessage;
import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
-import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
import org.apache.accumulo.tserver.metrics.TabletServerMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
import org.apache.accumulo.tserver.replication.ReplicationWorker;
-import org.apache.accumulo.tserver.scan.LookupTask;
-import org.apache.accumulo.tserver.scan.NextBatchTask;
-import org.apache.accumulo.tserver.scan.ScanParameters;
import org.apache.accumulo.tserver.scan.ScanRunState;
-import org.apache.accumulo.tserver.session.ConditionalSession;
-import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.session.Session;
import org.apache.accumulo.tserver.session.SessionManager;
-import org.apache.accumulo.tserver.session.SingleScanSession;
-import org.apache.accumulo.tserver.session.SummarySession;
-import org.apache.accumulo.tserver.session.UpdateSession;
import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
import org.apache.accumulo.tserver.tablet.CommitSession;
-import org.apache.accumulo.tserver.tablet.CompactionInfo;
import org.apache.accumulo.tserver.tablet.CompactionWatcher;
-import org.apache.accumulo.tserver.tablet.Compactor;
-import org.apache.accumulo.tserver.tablet.KVEntry;
-import org.apache.accumulo.tserver.tablet.PreparedMutations;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.apache.accumulo.tserver.tablet.Tablet;
-import org.apache.accumulo.tserver.tablet.TabletClosedException;
import org.apache.accumulo.tserver.tablet.TabletData;
import org.apache.commons.collections4.map.LRUMap;
-import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.server.TServer;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.collect.Collections2;
public class TabletServer extends AbstractServer {
private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
- private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
- private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
private static final long TIME_BETWEEN_GC_CHECKS = 5000;
private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
- private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
- private final TransactionWatcher watcher;
- private ZooCache masterLockCache;
+ final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+ final ZooCache masterLockCache;
- private final TabletServerLogger logger;
+ final TabletServerLogger logger;
- private final TabletServerUpdateMetrics updateMetrics;
- private final TabletServerScanMetrics scanMetrics;
- private final TabletServerMinCMetrics mincMetrics;
+ final TabletServerUpdateMetrics updateMetrics;
+ final TabletServerScanMetrics scanMetrics;
+ final TabletServerMinCMetrics mincMetrics;
public TabletServerScanMetrics getScanMetrics() {
return scanMetrics;
@@ -309,7 +194,7 @@ public class TabletServer extends AbstractServer {
private final LogSorter logSorter;
private ReplicationWorker replWorker = null;
- private final TabletStatsKeeper statsKeeper;
+ final TabletStatsKeeper statsKeeper;
private final AtomicInteger logIdGenerator = new AtomicInteger();
private final AtomicLong flushCounter = new AtomicLong(0);
@@ -317,22 +202,19 @@ public class TabletServer extends AbstractServer {
private final VolumeManager fs;
- private final OnlineTablets onlineTablets = new OnlineTablets();
- private final SortedSet<KeyExtent> unopenedTablets =
- Collections.synchronizedSortedSet(new TreeSet<>());
- private final SortedSet<KeyExtent> openingTablets =
- Collections.synchronizedSortedSet(new TreeSet<>());
- private final Map<KeyExtent,Long> recentlyUnloadedCache =
- Collections.synchronizedMap(new LRUMap<>(1000));
+ final OnlineTablets onlineTablets = new OnlineTablets();
+ final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<>());
+ final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<>());
+ final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap<>(1000));
- private final TabletServerResourceManager resourceManager;
+ final TabletServerResourceManager resourceManager;
private final SecurityOperation security;
private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<>();
private Thread majorCompactorThread;
- private HostAndPort clientAddress;
+ HostAndPort clientAddress;
private volatile boolean serverStopRequested = false;
private volatile boolean shutdownComplete = false;
@@ -364,7 +246,6 @@ public class TabletServer extends AbstractServer {
ServerContext context = super.getContext();
context.setupCrypto();
this.masterLockCache = new ZooCache(context.getZooReaderWriter(), null);
- this.watcher = new TransactionWatcher(context);
this.fs = context.getVolumeManager();
final AccumuloConfiguration aconf = getConfiguration();
log.info("Version " + Constants.VERSION);
@@ -493,1649 +374,19 @@ public class TabletServer extends AbstractServer {
return (long) ((1. + (r.nextDouble() / 10)) * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
}
- private final SessionManager sessionManager;
+ final SessionManager sessionManager;
- private final WriteTracker writeTracker = new WriteTracker();
-
- private final RowLocks rowLocks = new RowLocks();
-
- private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
- private final ReentrantLock recoveryLock = new ReentrantLock(true);
- private ThriftClientHandler clientHandler;
- private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
-
- private class ThriftClientHandler extends ClientServiceHandler
- implements TabletClientService.Iface {
-
- ThriftClientHandler() {
- super(getContext(), watcher, fs);
- log.debug("{} created", ThriftClientHandler.class.getName());
- }
-
- @Override
- public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, final long tid,
- final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
- throws ThriftSecurityException {
-
- if (!security.canPerformSystemActions(credentials)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- try {
- return watcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
- List<TKeyExtent> failures = new ArrayList<>();
-
- for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
- TKeyExtent tke = entry.getKey();
- Map<String,MapFileInfo> fileMap = entry.getValue();
- Map<TabletFile,MapFileInfo> fileRefMap = new HashMap<>();
- for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
- Path path = new Path(mapping.getKey());
- FileSystem ns = fs.getFileSystemByPath(path);
- path = ns.makeQualified(path);
- fileRefMap.put(new TabletFile(path), mapping.getValue());
- }
-
- Tablet importTablet = getOnlineTablet(new KeyExtent(tke));
-
- if (importTablet == null) {
- failures.add(tke);
- } else {
- try {
- importTablet.importMapFiles(tid, fileRefMap, setTime);
- } catch (IOException ioe) {
- log.info("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
- ioe.getMessage());
- failures.add(tke);
- }
- }
- }
- return failures;
- });
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String dir,
- Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
- throws ThriftSecurityException {
- if (!security.canPerformSystemActions(credentials)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- watcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
- tabletImports.forEach((tke, fileMap) -> {
- Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>();
- for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
- Path path = new Path(dir, mapping.getKey());
- FileSystem ns = fs.getFileSystemByPath(path);
- path = ns.makeQualified(path);
- newFileMap.put(new TabletFile(path), mapping.getValue());
- }
-
- Tablet importTablet = getOnlineTablet(new KeyExtent(tke));
-
- if (importTablet != null) {
- try {
- importTablet.importMapFiles(tid, newFileMap, setTime);
- } catch (IOException ioe) {
- log.debug("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
- ioe.getMessage());
- }
- }
- });
- });
-
- }
-
- private ScanDispatcher getScanDispatcher(KeyExtent extent) {
- if (extent.isRootTablet() || extent.isMeta()) {
- // dispatcher is only for user tables
- return null;
- }
-
- return getContext().getTableConfiguration(extent.getTableId()).getScanDispatcher();
- }
-
- @Override
- public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
- TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
- boolean isolated, long readaheadThreshold, TSamplerConfiguration tSamplerConfig,
- long batchTimeOut, String contextArg, Map<String,String> executionHints)
- throws NotServingTabletException, ThriftSecurityException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
- TSampleNotPresentException {
-
- TableId tableId = TableId.of(new String(textent.getTable(), UTF_8));
- NamespaceId namespaceId;
- try {
- namespaceId = Tables.getNamespaceId(getContext(), tableId);
- } catch (TableNotFoundException e1) {
- throw new NotServingTabletException(textent);
- }
- if (!security.canScan(credentials, tableId, namespaceId, range, columns, ssiList, ssio,
- authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.BAD_AUTHORIZATIONS);
- }
-
- final KeyExtent extent = new KeyExtent(textent);
-
- // wait for any writes that are in flight.. this done to ensure
- // consistency across client restarts... assume a client writes
- // to accumulo and dies while waiting for a confirmation from
- // accumulo... the client process restarts and tries to read
- // data from accumulo making the assumption that it will get
- // any writes previously made... however if the server side thread
- // processing the write from the dead client is still in progress,
- // the restarted client may not see the write unless we wait here.
- // this behavior is very important when the client is reading the
- // metadata
- if (waitForWrites) {
- writeTracker.waitForWrites(TabletType.type(extent));
- }
-
- Tablet tablet = getOnlineTablet(extent);
- if (tablet == null) {
- throw new NotServingTabletException(textent);
- }
-
- HashSet<Column> columnSet = new HashSet<>();
- for (TColumn tcolumn : columns) {
- columnSet.add(new Column(tcolumn));
- }
-
- ScanParameters scanParams = new ScanParameters(batchSize, new Authorizations(authorizations),
- columnSet, ssiList, ssio, isolated, SamplerConfigurationImpl.fromThrift(tSamplerConfig),
- batchTimeOut, contextArg);
-
- final SingleScanSession scanSession = new SingleScanSession(credentials, extent, scanParams,
- readaheadThreshold, executionHints);
- scanSession.scanner =
- tablet.createScanner(new Range(range), scanParams, scanSession.interruptFlag);
-
- long sid = sessionManager.createSession(scanSession, true);
-
- ScanResult scanResult;
- try {
- scanResult = continueScan(tinfo, sid, scanSession);
- } catch (NoSuchScanIDException e) {
- log.error("The impossible happened", e);
- throw new RuntimeException();
- } finally {
- sessionManager.unreserveSession(sid);
- }
-
- return new InitialScan(sid, scanResult);
- }
-
- @Override
- public ScanResult continueScan(TInfo tinfo, long scanID)
- throws NoSuchScanIDException, NotServingTabletException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
- TSampleNotPresentException {
- SingleScanSession scanSession = (SingleScanSession) sessionManager.reserveSession(scanID);
- if (scanSession == null) {
- throw new NoSuchScanIDException();
- }
-
- try {
- return continueScan(tinfo, scanID, scanSession);
- } finally {
- sessionManager.unreserveSession(scanSession);
- }
- }
-
- private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
- throws NoSuchScanIDException, NotServingTabletException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
- TSampleNotPresentException {
-
- if (scanSession.nextBatchTask == null) {
- scanSession.nextBatchTask =
- new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
- resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
- scanSession, scanSession.nextBatchTask);
- }
-
- ScanBatch bresult;
- try {
- bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
- TimeUnit.MILLISECONDS);
- scanSession.nextBatchTask = null;
- } catch (ExecutionException e) {
- sessionManager.removeSession(scanID);
- if (e.getCause() instanceof NotServingTabletException) {
- throw (NotServingTabletException) e.getCause();
- } else if (e.getCause() instanceof TooManyFilesException) {
- throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(
- scanSession.extent.toThrift());
- } else if (e.getCause() instanceof SampleNotPresentException) {
- throw new TSampleNotPresentException(scanSession.extent.toThrift());
- } else if (e.getCause() instanceof IOException) {
- sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
- List<KVEntry> empty = Collections.emptyList();
- bresult = new ScanBatch(empty, true);
- scanSession.nextBatchTask = null;
- } else {
- throw new RuntimeException(e);
- }
- } catch (CancellationException ce) {
- sessionManager.removeSession(scanID);
- Tablet tablet = getOnlineTablet(scanSession.extent);
- if (tablet == null || tablet.isClosed()) {
- throw new NotServingTabletException(scanSession.extent.toThrift());
- } else {
- throw new NoSuchScanIDException();
- }
- } catch (TimeoutException e) {
- List<TKeyValue> param = Collections.emptyList();
- long timeout =
- TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
- sessionManager.removeIfNotAccessed(scanID, timeout);
- return new ScanResult(param, true);
- } catch (Throwable t) {
- sessionManager.removeSession(scanID);
- log.warn("Failed to get next batch", t);
- throw new RuntimeException(t);
- }
-
- ScanResult scanResult = new ScanResult(Key.compress(bresult.getResults()), bresult.isMore());
-
- scanSession.entriesReturned += scanResult.results.size();
-
- scanSession.batchCount++;
-
- if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
- // start reading next batch while current batch is transmitted
- // to client
- scanSession.nextBatchTask =
- new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
- resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
- scanSession, scanSession.nextBatchTask);
- }
-
- if (!scanResult.more) {
- closeScan(tinfo, scanID);
- }
-
- return scanResult;
- }
-
- @Override
- public void closeScan(TInfo tinfo, long scanID) {
- final SingleScanSession ss = (SingleScanSession) sessionManager.removeSession(scanID);
- if (ss != null) {
- long t2 = System.currentTimeMillis();
-
- if (log.isTraceEnabled()) {
- log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ",
- TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned,
- (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
- }
-
- scanMetrics.addScan(t2 - ss.startTime);
- scanMetrics.addResult(ss.entriesReturned);
- }
- }
-
- @Override
- public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
- Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns, List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
- TSamplerConfiguration tSamplerConfig, long batchTimeOut, String contextArg,
- Map<String,String> executionHints)
- throws ThriftSecurityException, TSampleNotPresentException {
- // find all of the tables that need to be scanned
- final HashSet<TableId> tables = new HashSet<>();
- for (TKeyExtent keyExtent : tbatch.keySet()) {
- tables.add(TableId.of(new String(keyExtent.getTable(), UTF_8)));
- }
-
- if (tables.size() != 1) {
- throw new IllegalArgumentException("Cannot batch scan over multiple tables");
- }
-
- // check if user has permission to the tables
- for (TableId tableId : tables) {
- NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canScan(credentials, tableId, namespaceId, tbatch, tcolumns, ssiList, ssio,
- authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
- }
-
- try {
- if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.BAD_AUTHORIZATIONS);
- }
- } catch (ThriftSecurityException tse) {
- log.error("{} is not authorized", credentials.getPrincipal(), tse);
- throw tse;
- }
- Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(),
- new Translator.ListTranslator<>(new TRangeTranslator()));
-
- // This is used to determine which thread pool to use
- KeyExtent threadPoolExtent = batch.keySet().iterator().next();
-
- if (waitForWrites) {
- writeTracker.waitForWrites(TabletType.type(batch.keySet()));
- }
-
- Set<Column> columnSet = tcolumns.isEmpty() ? Collections.emptySet()
- : new HashSet<>(Collections2.transform(tcolumns, Column::new));
-
- ScanParameters scanParams =
- new ScanParameters(-1, new Authorizations(authorizations), columnSet, ssiList, ssio,
- false, SamplerConfigurationImpl.fromThrift(tSamplerConfig), batchTimeOut, contextArg);
-
- final MultiScanSession mss =
- new MultiScanSession(credentials, threadPoolExtent, batch, scanParams, executionHints);
-
- mss.numTablets = batch.size();
- for (List<Range> ranges : batch.values()) {
- mss.numRanges += ranges.size();
- }
-
- long sid = sessionManager.createSession(mss, true);
-
- MultiScanResult result;
- try {
- result = continueMultiScan(sid, mss);
- } finally {
- sessionManager.unreserveSession(sid);
- }
-
- return new InitialMultiScan(sid, result);
- }
-
- @Override
- public MultiScanResult continueMultiScan(TInfo tinfo, long scanID)
- throws NoSuchScanIDException, TSampleNotPresentException {
-
- MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
-
- if (session == null) {
- throw new NoSuchScanIDException();
- }
-
- try {
- return continueMultiScan(scanID, session);
- } finally {
- sessionManager.unreserveSession(session);
- }
- }
-
- private MultiScanResult continueMultiScan(long scanID, MultiScanSession session)
- throws TSampleNotPresentException {
-
- if (session.lookupTask == null) {
- session.lookupTask = new LookupTask(TabletServer.this, scanID);
- resourceManager.executeReadAhead(session.threadPoolExtent,
- getScanDispatcher(session.threadPoolExtent), session, session.lookupTask);
- }
-
- try {
- MultiScanResult scanResult =
- session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
- session.lookupTask = null;
- return scanResult;
- } catch (ExecutionException e) {
- sessionManager.removeSession(scanID);
- if (e.getCause() instanceof SampleNotPresentException) {
- throw new TSampleNotPresentException();
- } else {
- log.warn("Failed to get multiscan result", e);
- throw new RuntimeException(e);
- }
- } catch (TimeoutException e1) {
- long timeout =
- TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
- sessionManager.removeIfNotAccessed(scanID, timeout);
- List<TKeyValue> results = Collections.emptyList();
- Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
- List<TKeyExtent> fullScans = Collections.emptyList();
- return new MultiScanResult(results, failures, fullScans, null, null, false, true);
- } catch (Throwable t) {
- sessionManager.removeSession(scanID);
- log.warn("Failed to get multiscan result", t);
- throw new RuntimeException(t);
- }
- }
-
- @Override
- public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
- MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
- if (session == null) {
- throw new NoSuchScanIDException();
- }
-
- long t2 = System.currentTimeMillis();
-
- if (log.isTraceEnabled()) {
- log.trace(String.format(
- "MultiScanSess %s %,d entries in %.2f secs"
- + " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ",
- TServerUtils.clientAddress.get(), session.numEntries, (t2 - session.startTime) / 1000.0,
- session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
- }
- }
-
- @Override
- public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty)
- throws ThriftSecurityException {
- // Make sure user is real
- Durability durability = DurabilityImpl.fromThrift(tdurabilty);
- security.authenticateUser(credentials, credentials);
- updateMetrics.addPermissionErrors(0);
-
- UpdateSession us = new UpdateSession(
- new TservConstraintEnv(getContext(), security, credentials), credentials, durability);
- return sessionManager.createSession(us, false);
- }
-
- private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
- long t1 = System.currentTimeMillis();
- if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) {
- return;
- }
- if (us.currentTablet == null
- && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
- // if there were previous failures, then do not accept additional writes
- return;
- }
-
- TableId tableId = null;
- try {
- // if user has no permission to write to this table, add it to
- // the failures list
- boolean sameTable = us.currentTablet != null
- && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
- tableId = keyExtent.getTableId();
- if (sameTable || security.canWrite(us.getCredentials(), tableId,
- Tables.getNamespaceId(getContext(), tableId))) {
- long t2 = System.currentTimeMillis();
- us.authTimes.addStat(t2 - t1);
- us.currentTablet = getOnlineTablet(keyExtent);
- if (us.currentTablet != null) {
- us.queuedMutations.put(us.currentTablet, new ArrayList<>());
- } else {
- // not serving tablet, so report all mutations as
- // failures
- us.failures.put(keyExtent, 0L);
- updateMetrics.addUnknownTabletErrors(0);
- }
- } else {
- log.warn("Denying access to table {} for user {}", keyExtent.getTableId(), us.getUser());
- long t2 = System.currentTimeMillis();
- us.authTimes.addStat(t2 - t1);
- us.currentTablet = null;
- us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
- updateMetrics.addPermissionErrors(0);
- return;
- }
- } catch (TableNotFoundException tnfe) {
- log.error("Table " + tableId + " not found ", tnfe);
- long t2 = System.currentTimeMillis();
- us.authTimes.addStat(t2 - t1);
- us.currentTablet = null;
- us.authFailures.put(keyExtent, SecurityErrorCode.TABLE_DOESNT_EXIST);
- updateMetrics.addUnknownTabletErrors(0);
- return;
- } catch (ThriftSecurityException e) {
- log.error("Denying permission to check user " + us.getUser() + " with user " + e.getUser(),
- e);
- long t2 = System.currentTimeMillis();
- us.authTimes.addStat(t2 - t1);
- us.currentTablet = null;
- us.authFailures.put(keyExtent, e.getCode());
- updateMetrics.addPermissionErrors(0);
- return;
- }
- }
-
- @Override
- public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
- List<TMutation> tmutations) {
- UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
- if (us == null) {
- return;
- }
-
- boolean reserved = true;
- try {
- KeyExtent keyExtent = new KeyExtent(tkeyExtent);
- setUpdateTablet(us, keyExtent);
-
- if (us.currentTablet != null) {
- long additionalMutationSize = 0;
- List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
- for (TMutation tmutation : tmutations) {
- Mutation mutation = new ServerMutation(tmutation);
- mutations.add(mutation);
- additionalMutationSize += mutation.numBytes();
- }
- us.queuedMutationSize += additionalMutationSize;
- long totalQueued = updateTotalQueuedMutationSize(additionalMutationSize);
- long total = TabletServer.this.getConfiguration()
- .getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
- if (totalQueued > total) {
- try {
- flush(us);
- } catch (HoldTimeoutException hte) {
- // Assumption is that the client has timed out and is gone. If that's not the case,
- // then removing the session should cause the client to fail
- // in such a way that it retries.
- log.debug("HoldTimeoutException during applyUpdates, removing session");
- sessionManager.removeSession(updateID, true);
- reserved = false;
- }
- }
- }
- } finally {
- if (reserved) {
- sessionManager.unreserveSession(us);
- }
- }
- }
-
- private void flush(UpdateSession us) {
-
- int mutationCount = 0;
- Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
- Map<CommitSession,TabletMutations> loggables = new HashMap<>();
- Throwable error = null;
-
- long pt1 = System.currentTimeMillis();
-
- boolean containsMetadataTablet = false;
- for (Tablet tablet : us.queuedMutations.keySet()) {
- if (tablet.getExtent().isMeta()) {
- containsMetadataTablet = true;
- }
- }
-
- if (!containsMetadataTablet && us.queuedMutations.size() > 0) {
- TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
- }
-
- try (TraceScope prep = Trace.startSpan("prep")) {
- for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
-
- Tablet tablet = entry.getKey();
- Durability durability =
- DurabilityImpl.resolveDurabilty(us.durability, tablet.getDurability());
- List<Mutation> mutations = entry.getValue();
- if (mutations.size() > 0) {
- try {
- updateMetrics.addMutationArraySize(mutations.size());
-
- PreparedMutations prepared = tablet.prepareMutationsForCommit(us.cenv, mutations);
-
- if (prepared.tabletClosed()) {
- if (us.currentTablet == tablet) {
- us.currentTablet = null;
- }
- us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
- } else {
- if (!prepared.getNonViolators().isEmpty()) {
- List<Mutation> validMutations = prepared.getNonViolators();
- CommitSession session = prepared.getCommitSession();
- if (durability != Durability.NONE) {
- loggables.put(session,
- new TabletMutations(session, validMutations, durability));
- }
- sendables.put(session, validMutations);
- }
-
- if (!prepared.getViolations().isEmpty()) {
- us.violations.add(prepared.getViolations());
- updateMetrics.addConstraintViolations(0);
- }
- // Use the size of the original mutation list, regardless of how many mutations
- // did not violate constraints.
- mutationCount += mutations.size();
-
- }
- } catch (Throwable t) {
- error = t;
- log.error("Unexpected error preparing for commit", error);
- break;
- }
- }
- }
- }
-
- long pt2 = System.currentTimeMillis();
- us.prepareTimes.addStat(pt2 - pt1);
- updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
-
- if (error != null) {
- sendables.forEach((commitSession, value) -> commitSession.abortCommit());
- throw new RuntimeException(error);
- }
- try {
- try (TraceScope wal = Trace.startSpan("wal")) {
- while (true) {
- try {
- long t1 = System.currentTimeMillis();
-
- logger.logManyTablets(loggables);
-
- long t2 = System.currentTimeMillis();
- us.walogTimes.addStat(t2 - t1);
- updateWalogWriteTime((t2 - t1));
- break;
- } catch (IOException | FSError ex) {
- log.warn("logging mutations failed, retrying");
- } catch (Throwable t) {
- log.error("Unknown exception logging mutations, counts"
- + " for mutations in flight not decremented!", t);
- throw new RuntimeException(t);
- }
- }
- }
-
- try (TraceScope commit = Trace.startSpan("commit")) {
- long t1 = System.currentTimeMillis();
- sendables.forEach((commitSession, mutations) -> {
- commitSession.commit(mutations);
- KeyExtent extent = commitSession.getExtent();
-
- if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
- // because constraint violations may filter out some
- // mutations, for proper accounting with the client code,
- // need to increment the count based on the original
- // number of mutations from the client NOT the filtered number
- us.successfulCommits.increment(us.currentTablet,
- us.queuedMutations.get(us.currentTablet).size());
- }
- });
- long t2 = System.currentTimeMillis();
-
- us.flushTime += (t2 - pt1);
- us.commitTimes.addStat(t2 - t1);
-
- updateAvgCommitTime(t2 - t1, sendables.size());
- }
- } finally {
- us.queuedMutations.clear();
- if (us.currentTablet != null) {
- us.queuedMutations.put(us.currentTablet, new ArrayList<>());
- }
- updateTotalQueuedMutationSize(-us.queuedMutationSize);
- us.queuedMutationSize = 0;
- }
- us.totalUpdates += mutationCount;
- }
-
- private void updateWalogWriteTime(long time) {
- updateMetrics.addWalogWriteTime(time);
- }
-
- private void updateAvgCommitTime(long time, int size) {
- if (size > 0)
- updateMetrics.addCommitTime((long) (time / (double) size));
- }
-
- private void updateAvgPrepTime(long time, int size) {
- if (size > 0)
- updateMetrics.addCommitPrep((long) (time / (double) size));
- }
-
- @Override
- public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
- final UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
- if (us == null) {
- throw new NoSuchScanIDException();
- }
-
- // clients may or may not see data from an update session while
- // it is in progress, however when the update session is closed
- // want to ensure that reads wait for the write to finish
- long opid = writeTracker.startWrite(us.queuedMutations.keySet());
-
- try {
- flush(us);
- } catch (HoldTimeoutException e) {
- // Assumption is that the client has timed out and is gone. If that's not the case throw an
- // exception that will cause it to retry.
- log.debug("HoldTimeoutException during closeUpdate, reporting no such session");
- throw new NoSuchScanIDException();
- } finally {
- writeTracker.finishWrite(opid);
- }
-
- if (log.isTraceEnabled()) {
- log.trace(
- String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)",
- TServerUtils.clientAddress.get(), us.totalUpdates,
- (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(),
- us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0,
- us.commitTimes.sum() / 1000.0));
- }
- if (us.failures.size() > 0) {
- Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
- log.debug(String.format("Failures: %d, first extent %s successful commits: %d",
- us.failures.size(), first.getKey().toString(), first.getValue()));
- }
- List<ConstraintViolationSummary> violations = us.violations.asList();
- if (violations.size() > 0) {
- ConstraintViolationSummary first = us.violations.asList().iterator().next();
- log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(),
- first.violationDescription, first.numberOfViolatingMutations));
- }
- if (us.authFailures.size() > 0) {
- KeyExtent first = us.authFailures.keySet().iterator().next();
- log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(),
- first.toString()));
- }
-
- return new UpdateErrors(Translator.translate(us.failures, Translators.KET),
- Translator.translate(violations, Translators.CVST),
- Translator.translate(us.authFailures, Translators.KET));
- }
-
- @Override
- public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
- TMutation tmutation, TDurability tdurability)
- throws NotServingTabletException, ConstraintViolationException, ThriftSecurityException {
-
- final TableId tableId = TableId.of(new String(tkeyExtent.getTable(), UTF_8));
- NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canWrite(credentials, tableId, namespaceId)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
- final KeyExtent keyExtent = new KeyExtent(tkeyExtent);
- final Tablet tablet = getOnlineTablet(new KeyExtent(keyExtent));
- if (tablet == null) {
- throw new NotServingTabletException(tkeyExtent);
- }
- Durability tabletDurability = tablet.getDurability();
-
- if (!keyExtent.isMeta()) {
- try {
- TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
- } catch (HoldTimeoutException hte) {
- // Major hack. Assumption is that the client has timed out and is gone. If that's not the
- // case, then throwing the following will let client know there
- // was a failure and it should retry.
- throw new NotServingTabletException(tkeyExtent);
- }
- }
-
- final long opid = writeTracker.startWrite(TabletType.type(keyExtent));
-
- try {
- final Mutation mutation = new ServerMutation(tmutation);
- final List<Mutation> mutations = Collections.singletonList(mutation);
-
- PreparedMutations prepared;
- try (TraceScope prep = Trace.startSpan("prep")) {
- prepared = tablet.prepareMutationsForCommit(
- new TservConstraintEnv(getContext(), security, credentials), mutations);
- }
-
- if (prepared.tabletClosed()) {
- throw new NotServingTabletException(tkeyExtent);
- } else if (!prepared.getViolators().isEmpty()) {
- throw new ConstraintViolationException(
- Translator.translate(prepared.getViolations().asList(), Translators.CVST));
- } else {
- CommitSession session = prepared.getCommitSession();
- Durability durability = DurabilityImpl
- .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability);
-
- // Instead of always looping on true, skip completely when durability is NONE.
- while (durability != Durability.NONE) {
- try {
- try (TraceScope wal = Trace.startSpan("wal")) {
- logger.log(session, mutation, durability);
- }
- break;
- } catch (IOException ex) {
- log.warn("Error writing mutations to log", ex);
- }
- }
-
- try (TraceScope commit = Trace.startSpan("commit")) {
- session.commit(mutations);
- }
- }
- } finally {
- writeTracker.finishWrite(opid);
- }
- }
-
- private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
- throws ThriftSecurityException {
- try {
- return Tables.getNamespaceId(getContext(), tableId);
- } catch (TableNotFoundException e1) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.TABLE_DOESNT_EXIST);
- }
- }
-
- private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates,
- ArrayList<TCMResult> results, ConditionalSession cs, List<String> symbols)
- throws IOException {
- Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter =
- updates.entrySet().iterator();
-
- final CompressedIterators compressedIters = new CompressedIterators(symbols);
- ConditionCheckerContext checkerContext = new ConditionCheckerContext(getContext(),
- compressedIters, getContext().getTableConfiguration(cs.tableId));
-
- while (iter.hasNext()) {
- final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
- final Tablet tablet = getOnlineTablet(entry.getKey());
-
- if (tablet == null || tablet.isClosed()) {
- for (ServerConditionalMutation scm : entry.getValue()) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- }
- iter.remove();
- } else {
- final List<ServerConditionalMutation> okMutations =
- new ArrayList<>(entry.getValue().size());
- final List<TCMResult> resultsSubList = results.subList(results.size(), results.size());
-
- ConditionChecker checker =
- checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList);
- try {
- tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
-
- if (okMutations.size() > 0) {
- entry.setValue(okMutations);
- } else {
- iter.remove();
- }
- } catch (TabletClosedException | IterationInterruptedException
- | TooManyFilesException e) {
- // clear anything added while checking conditions.
- resultsSubList.clear();
-
- for (ServerConditionalMutation scm : entry.getValue()) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- }
- iter.remove();
- }
- }
- }
- }
-
- private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates,
- ArrayList<TCMResult> results, ConditionalSession sess) {
- Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
-
- Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
- Map<CommitSession,TabletMutations> loggables = new HashMap<>();
-
- boolean sessionCanceled = sess.interruptFlag.get();
-
- try (TraceScope prepSpan = Trace.startSpan("prep")) {
- long t1 = System.currentTimeMillis();
- for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
- final Tablet tablet = getOnlineTablet(entry.getKey());
- if (tablet == null || tablet.isClosed() || sessionCanceled) {
- addMutationsAsTCMResults(results, entry.getValue(), TCMStatus.IGNORED);
- } else {
- final Durability durability =
- DurabilityImpl.resolveDurabilty(sess.durability, tablet.getDurability());
-
- @SuppressWarnings("unchecked")
- List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
- if (!mutations.isEmpty()) {
-
- PreparedMutations prepared = tablet.prepareMutationsForCommit(
- new TservConstraintEnv(getContext(), security, sess.credentials), mutations);
-
- if (prepared.tabletClosed()) {
- addMutationsAsTCMResults(results, mutations, TCMStatus.IGNORED);
- } else {
- if (!prepared.getNonViolators().isEmpty()) {
- // Only log and commit mutations that did not violate constraints.
- List<Mutation> validMutations = prepared.getNonViolators();
- addMutationsAsTCMResults(results, validMutations, TCMStatus.ACCEPTED);
- CommitSession session = prepared.getCommitSession();
- if (durability != Durability.NONE) {
- loggables.put(session,
- new TabletMutations(session, validMutations, durability));
- }
- sendables.put(session, validMutations);
- }
-
- if (!prepared.getViolators().isEmpty()) {
- addMutationsAsTCMResults(results, prepared.getViolators(), TCMStatus.VIOLATED);
- }
- }
- }
- }
- }
-
- long t2 = System.currentTimeMillis();
- updateAvgPrepTime(t2 - t1, es.size());
- }
-
- try (TraceScope walSpan = Trace.startSpan("wal")) {
- while (loggables.size() > 0) {
- try {
- long t1 = System.currentTimeMillis();
- logger.logManyTablets(loggables);
- long t2 = System.currentTimeMillis();
- updateWalogWriteTime(t2 - t1);
- break;
- } catch (IOException | FSError ex) {
- log.warn("logging mutations failed, retrying");
- } catch (Throwable t) {
- log.error("Unknown exception logging mutations, counts for"
- + " mutations in flight not decremented!", t);
- throw new RuntimeException(t);
- }
- }
- }
-
- try (TraceScope commitSpan = Trace.startSpan("commit")) {
- long t1 = System.currentTimeMillis();
- sendables.forEach(CommitSession::commit);
- long t2 = System.currentTimeMillis();
- updateAvgCommitTime(t2 - t1, sendables.size());
- }
- }
-
- /**
- * Transform and add each mutation as a {@link TCMResult} with the mutation's ID and the
- * specified status to the {@link TCMResult} list.
- */
- private void addMutationsAsTCMResults(final List<TCMResult> list,
- final Collection<? extends Mutation> mutations, final TCMStatus status) {
- mutations.stream()
- .map(mutation -> new TCMResult(((ServerConditionalMutation) mutation).getID(), status))
- .forEach(list::add);
- }
-
- private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs,
- Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results,
- List<String> symbols) throws IOException {
- // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is
- // more efficient and detect duplicate rows.
- ConditionalMutationSet.sortConditionalMutations(updates);
-
- Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<>();
-
- // can not process two mutations for the same row, because one will not see what the other
- // writes
- ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
-
- // get as many locks as possible w/o blocking... defer any rows that are locked
- List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
- try {
- try (TraceScope checkSpan = Trace.startSpan("Check conditions")) {
- checkConditions(updates, results, cs, symbols);
- }
-
- try (TraceScope updateSpan = Trace.startSpan("apply conditional mutations")) {
- writeConditionalMutations(updates, results, cs);
- }
- } finally {
- rowLocks.releaseRowLocks(locks);
- }
- return deferred;
- }
-
- @Override
- public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
- List<ByteBuffer> authorizations, String tableIdStr, TDurability tdurabilty,
- String classLoaderContext) throws ThriftSecurityException, TException {
-
- TableId tableId = TableId.of(tableIdStr);
- Authorizations userauths = null;
- NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canConditionallyUpdate(credentials, tableId, namespaceId)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- userauths = security.getUserAuthorizations(credentials);
- for (ByteBuffer auth : authorizations) {
- if (!userauths.contains(ByteBufferUtil.toBytes(auth))) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.BAD_AUTHORIZATIONS);
- }
- }
-
- ConditionalSession cs = new ConditionalSession(credentials,
- new Authorizations(authorizations), tableId, DurabilityImpl.fromThrift(tdurabilty));
-
- long sid = sessionManager.createSession(cs, false);
- return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
- }
-
- @Override
- public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID,
- Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
- throws NoSuchScanIDException, TException {
-
- ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
-
- if (cs == null || cs.interruptFlag.get()) {
- throw new NoSuchScanIDException();
- }
-
- if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) {
- try {
- TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
- } catch (HoldTimeoutException hte) {
- // Assumption is that the client has timed out and is gone. If that's not the case throw
- // an exception that will cause it to retry.
- log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session");
- throw new NoSuchScanIDException();
- }
- }
-
- TableId tid = cs.tableId;
- long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
-
- try {
- Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations,
- Translators.TKET, new Translator.ListTranslator<>(ServerConditionalMutation.TCMT));
-
- for (KeyExtent ke : updates.keySet()) {
- if (!ke.getTableId().equals(tid)) {
- throw new IllegalArgumentException(
- "Unexpected table id " + tid + " != " + ke.getTableId());
- }
- }
-
- ArrayList<TCMResult> results = new ArrayList<>();
-
- Map<KeyExtent,List<ServerConditionalMutation>> deferred =
- conditionalUpdate(cs, updates, results, symbols);
-
- while (deferred.size() > 0) {
- deferred = conditionalUpdate(cs, deferred, results, symbols);
- }
-
- return results;
- } catch (IOException ioe) {
- throw new TException(ioe);
- } finally {
- writeTracker.finishWrite(opid);
- sessionManager.unreserveSession(sessID);
- }
- }
-
- @Override
- public void invalidateConditionalUpdate(TInfo tinfo, long sessID) {
- // this method should wait for any running conditional update to complete
- // after this method returns a conditional update should not be able to start
-
- ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
- if (cs != null) {
- cs.interruptFlag.set(true);
- }
-
- cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
- if (cs != null) {
- sessionManager.removeSession(sessID, true);
- }
- }
-
- @Override
- public void closeConditionalUpdate(TInfo tinfo, long sessID) {
- sessionManager.removeSession(sessID, false);
- }
-
- @Override
- public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
- ByteBuffer splitPoint) throws NotServingTabletException, ThriftSecurityException {
-
- TableId tableId = TableId.of(new String(ByteBufferUtil.toBytes(tkeyExtent.table)));
- NamespaceId namespaceId = getNamespaceId(credentials, tableId);
-
- if (!security.canSplitTablet(credentials, tableId, namespaceId)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-
- Tablet tablet = getOnlineTablet(keyExtent);
- if (tablet == null) {
- throw new NotServingTabletException(tkeyExtent);
- }
-
- if (keyExtent.getEndRow() == null
- || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
- try {
- if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
- throw new NotServingTabletException(tkeyExtent);
- }
- } catch (IOException e) {
- log.warn("Failed to split " + keyExtent, e);
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) {
- return getStats(sessionManager.getActiveScansPerTable());
- }
-
- @Override
- public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) {
- List<TabletStats> result = new ArrayList<>();
- TableId text = TableId.of(tableId);
- KeyExtent start = new KeyExtent(text, new Text(), null);
- for (Entry<KeyExtent,Tablet> entry : getOnlineTablets().tailMap(start).entrySet()) {
- KeyExtent ke = entry.getKey();
- if (ke.getTableId().compareTo(text) == 0) {
- Tablet tablet = entry.getValue();
- TabletStats stats = tablet.getTabletStats();
- stats.extent = ke.toThrift();
- stats.ingestRate = tablet.ingestRate();
- stats.queryRate = tablet.queryRate();
- stats.splitCreationTime = tablet.getSplitCreationTime();
- stats.numEntries = tablet.getNumEntries();
- result.add(stats);
- }
- }
- return result;
- }
-
- private void checkPermission(TCredentials credentials, String lock, final String request)
- throws ThriftSecurityException {
- try {
- log.trace("Got {} message from user: {}", request, credentials.getPrincipal());
- if (!security.canPerformSystemActions(credentials)) {
- log.warn("Got {} message from user: {}", request, credentials.getPrincipal());
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
- } catch (ThriftSecurityException e) {
- log.warn("Got {} message from unauthenticatable user: {}", request, e.getUser());
- if (getContext().getCredentials().getToken().getClass().getName()
- .equals(credentials.getTokenClassName())) {
- log.error("Got message from a service with a mismatched configuration."
- + " Please ensure a compatible configuration.", e);
- }
- throw e;
- }
-
- if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
- log.debug("Got {} message before my lock was acquired, ignoring...", request);
- throw new RuntimeException("Lock not acquired");
- }
-
- if (tabletServerLock != null && tabletServerLock.wasLockAcquired()
- && !tabletServerLock.isLocked()) {
- Halt.halt(1, () -> {
- log.info("Tablet server no longer holds lock during checkPermission() : {}, exiting",
- request);
- gcLogger.logGCInfo(TabletServer.this.getConfiguration());
- });
- }
-
- if (lock != null) {
- ZooUtil.LockID lid =
- new ZooUtil.LockID(getContext().getZooKeeperRoot() + Constants.ZMASTER_LOCK, lock);
-
- try {
- if (!ZooLock.isLockHeld(masterLockCache, lid)) {
- // maybe the cache is out of date and a new master holds the
- // lock?
- masterLockCache.clear();
- if (!ZooLock.isLockHeld(masterLockCache, lid)) {
- log.warn("Got {} message from a master that does not hold the current lock {}",
- request, lock);
- throw new RuntimeException("bad master lock");
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("bad master lock", e);
- }
- }
- }
-
- @Override
- public void loadTablet(TInfo tinfo, TCredentials credentials, String lock,
- final TKeyExtent textent) {
-
- try {
- checkPermission(credentials, lock, "loadTablet");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to load a tablet", e);
- throw new RuntimeException(e);
- }
-
- final KeyExtent extent = new KeyExtent(textent);
-
- synchronized (unopenedTablets) {
- synchronized (openingTablets) {
- synchronized (onlineTablets) {
-
- // checking if this exact tablet is in any of the sets
- // below is not a strong enough check
- // when splits and fix splits occurring
-
- Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
- Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
- Set<KeyExtent> onlineOverlapping =
- KeyExtent.findOverlapping(extent, onlineTablets.snapshot());
-
- Set<KeyExtent> all = new HashSet<>();
- all.addAll(unopenedOverlapping);
- all.addAll(openingOverlapping);
- all.addAll(onlineOverlapping);
-
- if (!all.isEmpty()) {
-
- // ignore any tablets that have recently split, for error logging
- for (KeyExtent e2 : onlineOverlapping) {
- Tablet tablet = getOnlineTablet(e2);
- if (System.currentTimeMillis() - tablet.getSplitCreationTime()
- < RECENTLY_SPLIT_MILLIES) {
- all.remove(e2);
- }
- }
-
- // ignore self, for error logging
- all.remove(extent);
-
- if (all.size() > 0) {
- log.error("Tablet {} overlaps previously assigned {} {} {}", extent,
- unopenedOverlapping, openingOverlapping, onlineOverlapping + " " + all);
- }
- return;
- }
-
- unopenedTablets.add(extent);
- }
- }
- }
-
- TabletLogger.loading(extent, TabletServer.this.getTabletSession());
-
- final AssignmentHandler ah = new AssignmentHandler(extent);
- // final Runnable ah = new LoggingRunnable(log, );
- // Root tablet assignment must take place immediately
-
- if (extent.isRootTablet()) {
- new Daemon("Root Tablet Assignment") {
- @Override
- public void run() {
- ah.run();
- if (onlineTablets.snapshot().containsKey(extent)) {
- log.info("Root tablet loaded: {}", extent);
- } else {
- log.info("Root tablet failed to load");
- }
-
- }
- }.start();
- } else {
- if (extent.isMeta()) {
- resourceManager.addMetaDataAssignment(extent, log, ah);
- } else {
- resourceManager.addAssignment(extent, log, ah);
- }
- }
- }
-
- @Override
- public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent,
- TUnloadTabletGoal goal, long requestTime) {
- try {
- checkPermission(credentials, lock, "unloadTablet");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to unload a tablet", e);
- throw new RuntimeException(e);
- }
-
- KeyExtent extent = new KeyExtent(textent);
-
- resourceManager.addMigration(extent,
- new LoggingRunnable(log, new UnloadTabletHandler(extent, goal, requestTime)));
- }
-
- @Override
- public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId,
- ByteBuffer startRow, ByteBuffer endRow) {
- try {
- checkPermission(credentials, lock, "flush");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to flush a table", e);
- throw new RuntimeException(e);
- }
-
- ArrayList<Tablet> tabletsToFlush = new ArrayList<>();
-
- KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
- ByteBufferUtil.toText(startRow));
-
- for (Tablet tablet : getOnlineTablets().values()) {
- if (ke.overlaps(tablet.getExtent())) {
- tabletsToFlush.add(tablet);
- }
- }
-
- Long flushID = null;
-
- for (Tablet tablet : tabletsToFlush) {
- if (flushID == null) {
- // read the flush id once from zookeeper instead of reading
- // it for each tablet
- try {
- flushID = tablet.getFlushID();
- } catch (NoNodeException e) {
- // table was probably deleted
- log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
- return;
- }
- }
- tablet.flush(flushID);
- }
- }
-
- @Override
- public void flushTablet(TInfo tinfo, TCredentials credentials, String lock,
- TKeyExtent textent) {
- try {
- checkPermission(credentials, lock, "flushTablet");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to flush a tablet", e);
- throw new RuntimeException(e);
- }
-
- Tablet tablet = getOnlineTablet(new KeyExtent(textent));
- if (tablet != null) {
- log.info("Flushing {}", tablet.getExtent());
- try {
- tablet.flush(tablet.getFlushID());
- } catch (NoNodeException nne) {
- log.info("Asked to flush tablet that has no flush id {} {}", new KeyExtent(textent),
- nne.getMessage());
- }
- }
- }
-
- @Override
- public void halt(TInfo tinfo, TCredentials credentials, String lock)
- throws ThriftSecurityException {
-
- checkPermission(credentials, lock, "halt");
-
- Halt.halt(0, () -> {
- log.info("Master requested tablet server halt");
- gcLogger.logGCInfo(TabletServer.this.getConfiguration());
- serverStopRequested = true;
- try {
- tabletServerLock.unlock();
- } catch (Exception e) {
- log.error("Caught exception unlocking TabletServer lock", e);
- }
- });
- }
-
- @Override
- public void fastHalt(TInfo info, TCredentials credentials, String lock) {
- try {
- halt(info, credentials, lock);
- } catch (Exception e) {
- log.warn("Error halting", e);
- }
- }
-
- @Override
- public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) {
- return statsKeeper.getTabletStats();
- }
-
- @Override
- public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials)
- throws ThriftSecurityException, TException {
- try {
- checkPermission(credentials, null, "getScans");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to get active scans", e);
- throw e;
- }
-
- return sessionManager.getActiveScans();
- }
-
- @Override
- public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) {
- try {
- checkPermission(credentials, lock, "chop");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to chop extent", e);
- throw new RuntimeException(e);
- }
-
- KeyExtent ke = new KeyExtent(textent);
-
- Tablet tablet = getOnlineTablet(ke);
- if (tablet != null) {
- tablet.chopFiles();
- }
- }
-
- @Override
- public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId,
- ByteBuffer startRow, ByteBuffer endRow) {
- try {
- checkPermission(credentials, lock, "compact");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to compact a table", e);
- throw new RuntimeException(e);
- }
-
- KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
- ByteBufferUtil.toText(startRow));
-
- ArrayList<Tablet> tabletsToCompact = new ArrayList<>();
-
- for (Tablet tablet : getOnlineTablets().values()) {
- if (ke.overlaps(tablet.getExtent())) {
- tabletsToCompact.add(tablet);
- }
- }
-
- Pair<Long,UserCompactionConfig> compactionInfo = null;
-
- for (Tablet tablet : tabletsToCompact) {
- // all for the same table id, so only need to read
- // compaction id once
- if (compactionInfo == null) {
- try {
- compactionInfo = tablet.getCompactionID();
- } catch (NoNodeException e) {
- log.info("Asked to compact table with no compaction id {} {}", ke, e.getMessage());
- return;
- }
- }
- tablet.compactAll(compactionInfo.getFirst(), compactionInfo.getSecond());
- }
-
- }
-
- @Override
- public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials)
- throws ThriftSecurityException, TException {
- try {
- checkPermission(credentials, null, "getActiveCompactions");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to get active compactions", e);
- throw e;
- }
-
- List<CompactionInfo> compactions = Compactor.getRunningCompactions();
- List<ActiveCompaction> ret = new ArrayList<>(compactions.size());
-
- for (CompactionInfo compactionInfo : compactions) {
- ret.add(compactionInfo.toThrift());
- }
-
- return ret;
- }
-
- @Override
- public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) {
- String log = logger.getLogFile();
- // Might be null if there no active logger
- if (log == null) {
- return Collections.emptyList();
- }
- return Collections.singletonList(log);
- }
-
- @Override
- public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) {
- log.warn("Garbage collector is attempting to remove logs through the tablet server");
- log.warn("This is probably because your file"
- + " Garbage Collector is an older version than your tablet servers.\n"
- + "Restart your file Garbage Collector.");
- }
-
- private TSummaries getSummaries(Future<SummaryCollection> future) throws TimeoutException {
- try {
- SummaryCollection sc =
- future.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
- return sc.toThrift();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- private TSummaries handleTimeout(long sessionId) {
- long timeout =
- TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
- sessionManager.removeIfNotAccessed(sessionId, timeout);
- return new TSummaries(false, sessionId, -1, -1, null);
- }
-
- private TSummaries startSummaryOperation(TCredentials credentials,
- Future<SummaryCollection> future) {
- try {
- return getSummaries(future);
- } catch (TimeoutException e) {
- long sid = sessionManager.createSession(new SummarySession(credentials, future), false);
- while (sid == 0) {
- sessionManager.removeSession(sid);
- sid = sessionManager.createSession(new SummarySession(credentials, future), false);
- }
- return handleTimeout(sid);
- }
- }
-
- @Override
- public TSummaries startGetSummaries(TInfo tinfo, TCredentials credentials,
- TSummaryRequest request)
- throws ThriftSecurityException, ThriftTableOperationException, TException {
- NamespaceId namespaceId;
- TableId tableId = TableId.of(request.getTableId());
- try {
- namespaceId = Tables.getNamespaceId(getContext(), tableId);
- } catch (TableNotFoundException e1) {
- throw new ThriftTableOperationException(tableId.canonical(), null, null,
- TableOperationExceptionType.NOTFOUND, null);
- }
-
- if (!security.canGetSummaries(credentials, tableId, namespaceId)) {
- throw new AccumuloSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED).asThriftException();
- }
-
- ExecutorService es = resourceManager.getSummaryPartitionExecutor();
- Future<SummaryCollection> future = new Gatherer(getContext(), request,
- getContext().getTableConfiguration(tableId), getContext().getCryptoService()).gather(es);
-
- return startSummaryOperation(credentials, future);
- }
-
- @Override
- public TSummaries startGetSummariesForPartition(TInfo tinfo, TCredentials credentials,
- TSummaryRequest request, int modulus, int remainder)
- throws ThriftSecurityException, TException {
- // do not expect users to call this directly, expect other tservers to call this method
- if (!security.canPerformSystemActions(credentials)) {
- throw new AccumuloSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED).asThriftException();
- }
-
- ExecutorService spe = resourceManager.getSummaryRemoteExecutor();
- TableConfiguration tableConfig =
- getContext().getTableConfiguration(TableId.of(request.getTableId()));
- Future<SummaryCollection> future =
- new Gatherer(getContext(), request, tableConfig, getContext().getCryptoService())
- .processPartition(spe, modulus, remainder);
-
- return startSummaryOperation(credentials, future);
- }
-
- @Override
- public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials credentials,
- TSummaryRequest request, Map<String,List<TRowRange>> files)
- throws ThriftSecurityException, TException {
- // do not expect users to call this directly, expect other tservers to call this method
- if (!security.canPerformSystemActions(credentials)) {
- throw new AccumuloSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED).asThriftException();
- }
-
- ExecutorService srp = resourceManager.getSummaryRetrievalExecutor();
- TableConfiguration tableCfg =
- getContext().getTableConfiguration(TableId.of(request.getTableId()));
- BlockCache summaryCache = resourceManager.getSummaryCache();
- BlockCache indexCache = resourceManager.getIndexCache();
- Cache<String,Long> fileLenCache = resourceManager.getFileLenCache();
- FileSystemResolver volMgr = p -> fs.getFileSystemByPath(p);
- Future<SummaryCollection> future =
- new Gatherer(getContext(), request, tableCfg, getContext().getCryptoService())
- .processFiles(volMgr, files, summaryCache, indexCache, fileLenCache, srp);
-
- return startSummaryOperation(credentials, future);
- }
+ private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
+ private final ReentrantLock recoveryLock = new ReentrantLock(true);
+ private ThriftClientHandler clientHandler;
+ private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
- @Override
- public TSummaries contiuneGetSummaries(TInfo tinfo, long sessionId)
- throws NoSuchScanIDException, TException {
- SummarySession session = (SummarySession) sessionManager.getSession(sessionId);
- if (session == null) {
- throw new NoSuchScanIDException();
- }
+ String getLockID() {
+ return lockID;
+ }
- Future<SummaryCollection> future = session.getFuture();
- try {
- TSummaries tsums = getSummaries(future);
- sessionManager.removeSession(sessionId);
- return tsums;
- } catch (TimeoutException e) {
- return handleTimeout(sessionId);
- }
- }
+ void requestStop() {
+ serverStopRequested = true;
}
private class SplitRunner implements Runnable {
@@ -2228,8 +479,7 @@ public class TabletServer extends AbstractServer {
}
}
- private TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint)
- throws IOException {
+ TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException {
long t1 = System.currentTimeMillis();
TreeMap<KeyExtent,TabletData> tabletInfo = tablet.split(splitPoint);
@@ -2278,301 +528,13 @@ public class TabletServer extends AbstractServer {
masterMessages.addLast(m);
}
- private class UnloadTabletHandler implements Runnable {
- private final KeyExtent extent;
- private final TUnloadTabletGoal goalState;
- private final long requestTimeSkew;
-
- public UnloadTabletHandler(KeyExtent extent, TUnloadTabletGoal goalState, long requestTime) {
- this.extent = extent;
- this.goalState = goalState;
- this.requestTimeSkew = requestTime - MILLISECONDS.convert(System.nanoTime(), NANOSECONDS);
- }
-
- @Override
- public void run() {
-
- Tablet t = null;
-
- synchronized (unopenedTablets) {
- if (unopenedTablets.contains(extent)) {
- unopenedTablets.remove(extent);
- // enqueueMasterMessage(new TabletUnloadedMessage(extent));
- return;
- }
- }
- synchronized (openingTablets) {
- while (openingTablets.contains(extent)) {
- try {
- openingTablets.wait();
- } catch (InterruptedException e) {}
- }
- }
- synchronized (onlineTablets) {
- if (onlineTablets.snapshot().containsKey(extent)) {
- t = onlineTablets.snapshot().get(extent);
- }
- }
-
- if (t == null) {
- // Tablet has probably been recently unloaded: repeated master
- // unload request is crossing the successful unloaded message
- if (!recentlyUnloadedCache.containsKey(extent)) {
- log.info("told to unload tablet that was not being served {}", extent);
- enqueueMasterMessage(
- new TabletStatusMessage(TabletLoadState.UNLOAD_FAILURE_NOT_SERVING, extent));
- }
- return;
- }
-
- try {
- t.close(!goalState.equals(TUnloadTabletGoal.DELETED));
- } catch (Throwable e) {
-
- if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) {
- log.debug("Failed to unload tablet {}... it was already closing or closed : {}", extent,
- e.getMessage());
- } else {
- log.error("Failed to close tablet {}... Aborting migration", extent, e);
- enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOAD_ERROR, extent));
- }
- return;
- }
-
- // stop serving tablet - client will get not serving tablet
- // exceptions
- recentlyUnloadedCache.put(extent, System.currentTimeMillis());
- onlineTablets.remove(extent);
-
- try {
- TServerInstance instance = new TServerInstance(clientAddress, getLock().getSessionId());
- TabletLocationState tls = null;
- try {
- tls = new TabletLocationState(extent, null, instance, null, null, null, false);
- } catch (BadLocationStateException e) {
- log.error("Unexpected error", e);
- }
- if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || extent.isRootTablet()
- || (extent.isMeta()
- && !getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) {
- TabletStateStore.unassign(getContext(), tls, null);
- } else {
- TabletStateStore.suspend(getContext(), tls, null,
- requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS));
- }
- } catch (DistributedStoreException ex) {
- log.warn("Unable to update storage", ex);
- } catch (KeeperException e) {
- log.warn("Unable determine our zookeeper session information", e);
- } catch (InterruptedException e) {
- log.warn("Interrupted while getting our zookeeper session information", e);
- }
-
- // tell the master how it went
- enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOADED, extent));
-
- // roll tablet stats over into tablet server's statsKeeper object as
- // historical data
- statsKeeper.saveMajorMinorTimes(t.getTabletStats());
- }
- }
-
- protected class AssignmentHandler implements Runnable {
- private final KeyExtent extent;
- private final int retryAttempt;
-
- public AssignmentHandler(KeyExtent extent) {
- this(extent, 0);
- }
-
- public AssignmentHandler(KeyExtent extent, int retryAttempt) {
- this.extent = extent;
- this.retryAttempt = retryAttempt;
- }
-
- @Override
- public void run() {
- synchronized (unopenedTablets) {
- synchronized (openingTablets) {
- synchronized (onlineTablets) {
- // nothing should be moving between sets, do a sanity
- // check
- Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
- Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
- Set<KeyExtent> onlineOverlapping =
- KeyExtent.findOverlapping(extent, onlineTablets.snapshot());
-
- if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent)) {
- return;
- }
-
- if (!unopenedOverlapping.contains(extent)) {
- log.info("assignment {} no longer in the unopened set", extent);
- return;
- }
-
- if (unopenedOverlapping.size() != 1 || openingOverlapping.size() > 0
- || onlineOverlapping.size() > 0) {
- throw new IllegalStateException(
- "overlaps assigned " + extent + " " + !unopenedTablets.contains(extent) + " "
- + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
- }
- }
-
- unopenedTablets.remove(extent);
- openingTablets.add(extent);
- }
- }
-
- // check Metadata table before accepting assignment
- Text locationToOpen = null;
- TabletMetadata tabletMetadata = null;
- boolean canLoad = false;
- try {
- tabletMetadata = getContext().getAmple().readTablet(extent);
-
- canLoad = checkTabletMetadata(extent, TabletServer.this.getTabletSession(), tabletMetadata);
-
- if (canLoad && tabletMetadata.sawOldPrevEndRow()) {
- KeyExtent fixedExtent =
- MasterMetadataUtil.fixSplit(getContext(), tabletMetadata, getLock());
-
- synchronized (openingTablets) {
- openingTablets.remove(extent);
- openingTablets.notifyAll();
- // it expected that the new extent will overlap the old one... if it does not, it
- // should not be added to unopenedTablets
- if (!KeyExtent.findOverlapping(extent, new TreeSet<>(Arrays.asList(fixedExtent)))
- .contains(fixedExtent)) {
- throw new IllegalStateException(
- "Fixed split does not overlap " + extent + " " + fixedExtent);
- }
- unopenedTablets.add(fixedExtent);
- }
- // split was rolled back... try again
- new AssignmentHandler(fixedExtent).run();
- return;
-
- }
- } catch (Exception e) {
- synchronized (openingTablets) {
- openingTablets.remove(extent);
- openingTablets.notifyAll();
- }
- log.warn("Failed to verify tablet " + extent, e);
- enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
- throw new RuntimeException(e);
- }
-
- if (!canLoad) {
- log.debug("Reporting tablet {} assignment failure: unable to verify Tablet Information",
- extent);
- synchronized (openingTablets) {
- openingTablets.remove(extent);
- openingTablets.notifyAll();
- }
- enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
- return;
- }
-
- Tablet tablet = null;
- boolean successful = false;
-
- try {
- acquireRecoveryMemory(extent);
-
- TabletResourceManager trm =
- resourceManager.createTabletResourceManager(extent, getTableConfiguration(extent));
- TabletData data = new TabletData(tabletMetadata);
-
- tablet = new Tablet(TabletServer.this, extent, trm, data);
- // If a minor compaction starts after a tablet opens, this indicates a log recovery
- // occurred. This recovered data must be minor compacted.
- // There are three reasons to wait for this minor compaction to finish before placing the
- // tablet in online tablets.
- //
- // 1) The log recovery code does not handle data written to the tablet on multiple tablet
- // servers.
- // 2) The log recovery code does not block if memory is full. Therefore recovering lots of
- // tablets that use a lot of memory could run out of memory.
- // 3) The minor compaction finish event did not make it to the logs (the file will be in
- // metadata, preventing replay of compacted data)... but do not
- // want a majc to wipe the file out from metadata and then have another process failure...
- // this could cause duplicate data to replay.
- if (tablet.getNumEntriesInMemory() > 0
- && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) {
- throw new RuntimeException("Minor compaction after recovery fails for " + extent);
- }
- Assignment assignment = new Assignment(extent, getTabletSession());
- TabletStateStore.setLocation(getContext(), assignment);
-
- synchronized (openingTablets) {
- synchronized (onlineTablets) {
- openingTablets.remove(extent);
- onlineTablets.put(extent, tablet);
- openingTablets.notifyAll();
- recentlyUnloadedCache.remove(tablet.getExtent());
- }
- }
- tablet = null; // release this reference
- successful = true;
- } catch (Throwable e) {
- log.warn("exception trying to assign tablet {} {}", extent, locationToOpen, e);
-
- if (e.getMessage() != null) {
- log.warn("{}", e.getMessage());
- }
-
- TableId tableId = extent.getTableId();
- ProblemReports.getInstance(getContext()).report(new ProblemReport(tableId, TABLET_LOAD,
- extent.getUUID().toString(), getClientAddressString(), e));
- } finally {
- releaseRecoveryMemory(extent);
- }
-
- if (!successful) {
- synchronized (unopenedTablets) {
- synchronized (openingTablets) {
- openingTablets.remove(extent);
- unopenedTablets.add(extent);
- openingTablets.notifyAll();
- }
- }
- log.warn("failed to open tablet {} reporting failure to master", extent);
- enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
- long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, 10 * 60 * 1000L);
- log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
- SimpleTimer.getInstance(getConfiguration()).schedule(new TimerTask() {
- @Override
- public void run() {
- log.info("adding tablet {} back to the assignment pool (retry {})", extent,
- retryAttempt);
- AssignmentHandler handler = new AssignmentHandler(extent, retryAttempt + 1);
- if (extent.isMeta()) {
- if (extent.isRootTablet()) {
- new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry")
- .start();
- } else {
- resourceManager.addMetaDataAssignment(extent, log, handler);
- }
- } else {
- resourceManager.addAssignment(extent, log, handler);
- }
- }
- }, reschedule);
- } else {
- enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOADED, extent));
- }
- }
- }
-
- private void acquireRecoveryMemory(KeyExtent extent) {
+ void acquireRecoveryMemory(KeyExtent extent) {
if (!extent.isMeta()) {
recoveryLock.lock();
}
}
- private void releaseRecoveryMemory(KeyExtent extent) {
+ void releaseRecoveryMemory(KeyExtent extent) {
if (!extent.isMeta()) {
recoveryLock.unlock();
}
@@ -2624,7 +586,7 @@ public class TabletServer extends AbstractServer {
private HostAndPort startTabletClientService() throws UnknownHostException {
// start listening for client connection last
- clientHandler = new ThriftClientHandler();
+ clientHandler = new ThriftClientHandler(this);
Iface rpcProxy = TraceUtil.wrapService(clientHandler);
final Processor<Iface> processor;
if (getContext().getThriftServerType() == ThriftServerType.SASL) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 41a748b..0c68658 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -78,7 +78,6 @@ import org.apache.accumulo.server.tabletserver.MemoryManager;
import org.apache.accumulo.server.tabletserver.TabletState;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.tserver.FileManager.ScanFileManager;
-import org.apache.accumulo.tserver.TabletServer.AssignmentHandler;
import org.apache.accumulo.tserver.compaction.CompactionStrategy;
import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
new file mode 100644
index 0000000..ba667d7
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -0,0 +1,1792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.CompressedIterators;
+import org.apache.accumulo.core.clientImpl.DurabilityImpl;
+import org.apache.accumulo.core.clientImpl.Tables;
+import org.apache.accumulo.core.clientImpl.TabletType;
+import org.apache.accumulo.core.clientImpl.Translator;
+import org.apache.accumulo.core.clientImpl.Translator.TKeyExtentTranslator;
+import org.apache.accumulo.core.clientImpl.Translator.TRangeTranslator;
+import org.apache.accumulo.core.clientImpl.Translators;
+import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMStatus;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.logging.TabletLogger;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.summary.Gatherer;
+import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
+import org.apache.accumulo.core.summary.SummaryCollection;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.fate.util.LoggingRunnable;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
+import org.apache.accumulo.tserver.RowLocks.RowLock;
+import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+import org.apache.accumulo.tserver.scan.LookupTask;
+import org.apache.accumulo.tserver.scan.NextBatchTask;
+import org.apache.accumulo.tserver.scan.ScanParameters;
+import org.apache.accumulo.tserver.session.ConditionalSession;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.session.SummarySession;
+import org.apache.accumulo.tserver.session.UpdateSession;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.apache.accumulo.tserver.tablet.CompactionInfo;
+import org.apache.accumulo.tserver.tablet.Compactor;
+import org.apache.accumulo.tserver.tablet.KVEntry;
+import org.apache.accumulo.tserver.tablet.PreparedMutations;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletClosedException;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.collect.Collections2;
+
+class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
+
+ private static final Logger log = LoggerFactory.getLogger(ThriftClientHandler.class);
+ private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
+ private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
+ private final TabletServer server;
+ private final WriteTracker writeTracker = new WriteTracker();
+ private final RowLocks rowLocks = new RowLocks();
+
+ ThriftClientHandler(TabletServer server) {
+ super(server.getContext(), new TransactionWatcher(server.getContext()), server.getFileSystem());
+ this.server = server;
+ log.debug("{} created", ThriftClientHandler.class.getName());
+ }
+
+ @Override
+ public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, final long tid,
+ final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
+ throws ThriftSecurityException {
+
+ if (!security.canPerformSystemActions(credentials)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED);
+ }
+
+ try {
+ return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
+ List<TKeyExtent> failures = new ArrayList<>();
+
+ for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+ TKeyExtent tke = entry.getKey();
+ Map<String,MapFileInfo> fileMap = entry.getValue();
+ Map<TabletFile,MapFileInfo> fileRefMap = new HashMap<>();
+ for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+ Path path = new Path(mapping.getKey());
+ FileSystem ns = fs.getFileSystemByPath(path);
+ path = ns.makeQualified(path);
+ fileRefMap.put(new TabletFile(path), mapping.getValue());
+ }
+
+ Tablet importTablet = server.getOnlineTablet(new KeyExtent(tke));
+
+ if (importTablet == null) {
+ failures.add(tke);
+ } else {
+ try {
+ importTablet.importMapFiles(tid, fileRefMap, setTime);
+ } catch (IOException ioe) {
+ log.info("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
+ ioe.getMessage());
+ failures.add(tke);
+ }
+ }
+ }
+ return failures;
+ });
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String dir,
+ Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
+ throws ThriftSecurityException {
+ if (!security.canPerformSystemActions(credentials)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED);
+ }
+
+ transactionWatcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
+ tabletImports.forEach((tke, fileMap) -> {
+ Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>();
+ for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+ Path path = new Path(dir, mapping.getKey());
+ FileSystem ns = fs.getFileSystemByPath(path);
+ path = ns.makeQualified(path);
+ newFileMap.put(new TabletFile(path), mapping.getValue());
+ }
+
+ Tablet importTablet = server.getOnlineTablet(new KeyExtent(tke));
+
+ if (importTablet != null) {
+ try {
+ importTablet.importMapFiles(tid, newFileMap, setTime);
+ } catch (IOException ioe) {
+ log.debug("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
+ ioe.getMessage());
+ }
+ }
+ });
+ });
+
+ }
+
+ private ScanDispatcher getScanDispatcher(KeyExtent extent) {
+ if (extent.isRootTablet() || extent.isMeta()) {
+ // dispatcher is only for user tables
+ return null;
+ }
+
+ return server.getContext().getTableConfiguration(extent.getTableId()).getScanDispatcher();
+ }
+
+ @Override
+ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
+ TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
+ Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
+ boolean isolated, long readaheadThreshold, TSamplerConfiguration tSamplerConfig,
+ long batchTimeOut, String contextArg, Map<String,String> executionHints)
+ throws NotServingTabletException, ThriftSecurityException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
+ TSampleNotPresentException {
+
+ TableId tableId = TableId.of(new String(textent.getTable(), UTF_8));
+ NamespaceId namespaceId;
+ try {
+ namespaceId = Tables.getNamespaceId(server.getContext(), tableId);
+ } catch (TableNotFoundException e1) {
+ throw new NotServingTabletException(textent);
+ }
+ if (!security.canScan(credentials, tableId, namespaceId, range, columns, ssiList, ssio,
+ authorizations)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED);
+ }
+
+ if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.BAD_AUTHORIZATIONS);
+ }
+
+ final KeyExtent extent = new KeyExtent(textent);
+
+ // wait for any writes that are in flight.. this done to ensure
+ // consistency across client restarts... assume a client writes
+ // to accumulo and dies while waiting for a confirmation from
+ // accumulo... the client process restarts and tries to read
+ // data from accumulo making the assumption that it will get
+ // any writes previously made... however if the server side thread
+ // processing the write from the dead client is still in progress,
+ // the restarted client may not see the write unless we wait here.
+ // this behavior is very important when the client is reading the
+ // metadata
+ if (waitForWrites) {
+ writeTracker.waitForWrites(TabletType.type(extent));
+ }
+
+ Tablet tablet = server.getOnlineTablet(extent);
+ if (tablet == null) {
+ throw new NotServingTabletException(textent);
+ }
+
+ HashSet<Column> columnSet = new HashSet<>();
+ for (TColumn tcolumn : columns) {
+ columnSet.add(new Column(tcolumn));
+ }
+
+ ScanParameters scanParams = new ScanParameters(batchSize, new Authorizations(authorizations),
+ columnSet, ssiList, ssio, isolated, SamplerConfigurationImpl.fromThrift(tSamplerConfig),
+ batchTimeOut, contextArg);
+
+ final SingleScanSession scanSession =
+ new SingleScanSession(credentials, extent, scanParams, readaheadThreshold, executionHints);
+ scanSession.scanner =
+ tablet.createScanner(new Range(range), scanParams, scanSession.interruptFlag);
+
+ long sid = server.sessionManager.createSession(scanSession, true);
+
+ ScanResult scanResult;
+ try {
+ scanResult = continueScan(tinfo, sid, scanSession);
+ } catch (NoSuchScanIDException e) {
+ log.error("The impossible happened", e);
+ throw new RuntimeException();
+ } finally {
+ server.sessionManager.unreserveSession(sid);
+ }
+
+ return new InitialScan(sid, scanResult);
+ }
+
+ @Override
+ public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException,
+ NotServingTabletException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
+ TSampleNotPresentException {
+ SingleScanSession scanSession =
+ (SingleScanSession) server.sessionManager.reserveSession(scanID);
+ if (scanSession == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ try {
+ return continueScan(tinfo, scanID, scanSession);
+ } finally {
+ server.sessionManager.unreserveSession(scanSession);
+ }
+ }
+
+ private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
+ throws NoSuchScanIDException, NotServingTabletException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
+ TSampleNotPresentException {
+
+ if (scanSession.nextBatchTask == null) {
+ scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag);
+ server.resourceManager.executeReadAhead(scanSession.extent,
+ getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask);
+ }
+
+ ScanBatch bresult;
+ try {
+ bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
+ TimeUnit.MILLISECONDS);
+ scanSession.nextBatchTask = null;
+ } catch (ExecutionException e) {
+ server.sessionManager.removeSession(scanID);
+ if (e.getCause() instanceof NotServingTabletException) {
+ throw (NotServingTabletException) e.getCause();
+ } else if (e.getCause() instanceof TooManyFilesException) {
+ throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(
+ scanSession.extent.toThrift());
+ } else if (e.getCause() instanceof SampleNotPresentException) {
+ throw new TSampleNotPresentException(scanSession.extent.toThrift());
+ } else if (e.getCause() instanceof IOException) {
+ sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ List<KVEntry> empty = Collections.emptyList();
+ bresult = new ScanBatch(empty, true);
+ scanSession.nextBatchTask = null;
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (CancellationException ce) {
+ server.sessionManager.removeSession(scanID);
+ Tablet tablet = server.getOnlineTablet(scanSession.extent);
+ if (tablet == null || tablet.isClosed()) {
+ throw new NotServingTabletException(scanSession.extent.toThrift());
+ } else {
+ throw new NoSuchScanIDException();
+ }
+ } catch (TimeoutException e) {
+ List<TKeyValue> param = Collections.emptyList();
+ long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ server.sessionManager.removeIfNotAccessed(scanID, timeout);
+ return new ScanResult(param, true);
+ } catch (Throwable t) {
+ server.sessionManager.removeSession(scanID);
+ log.warn("Failed to get next batch", t);
+ throw new RuntimeException(t);
+ }
+
+ ScanResult scanResult = new ScanResult(Key.compress(bresult.getResults()), bresult.isMore());
+
+ scanSession.entriesReturned += scanResult.results.size();
+
+ scanSession.batchCount++;
+
+ if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
+ // start reading next batch while current batch is transmitted
+ // to client
+ scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag);
+ server.resourceManager.executeReadAhead(scanSession.extent,
+ getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask);
+ }
+
+ if (!scanResult.more) {
+ closeScan(tinfo, scanID);
+ }
+
+ return scanResult;
+ }
+
+ @Override
+ public void closeScan(TInfo tinfo, long scanID) {
+ final SingleScanSession ss = (SingleScanSession) server.sessionManager.removeSession(scanID);
+ if (ss != null) {
+ long t2 = System.currentTimeMillis();
+
+ if (log.isTraceEnabled()) {
+ log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ",
+ TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned,
+ (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
+ }
+
+ server.scanMetrics.addScan(t2 - ss.startTime);
+ server.scanMetrics.addResult(ss.entriesReturned);
+ }
+ }
+
+ @Override
+ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
+ Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns, List<IterInfo> ssiList,
+ Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
+ TSamplerConfiguration tSamplerConfig, long batchTimeOut, String contextArg,
+ Map<String,String> executionHints)
+ throws ThriftSecurityException, TSampleNotPresentException {
+ // find all of the tables that need to be scanned
+ final HashSet<TableId> tables = new HashSet<>();
+ for (TKeyExtent keyExtent : tbatch.keySet()) {
+ tables.add(TableId.of(new String(keyExtent.getTable(), UTF_8)));
+ }
+
+ if (tables.size() != 1) {
+ throw new IllegalArgumentException("Cannot batch scan over multiple tables");
+ }
+
+ // check if user has permission to the tables
+ for (TableId tableId : tables) {
+ NamespaceId namespaceId = getNamespaceId(credentials, tableId);
+ if (!security.canScan(credentials, tableId, namespaceId, tbatch, tcolumns, ssiList, ssio,
+ authorizations)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED);
+ }
+ }
+
+ try {
+ if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.BAD_AUTHORIZATIONS);
+ }
+ } catch (ThriftSecurityException tse) {
+ log.error("{} is not authorized", credentials.getPrincipal(), tse);
+ throw tse;
+ }
+ Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(),
+ new Translator.ListTranslator<>(new TRangeTranslator()));
+
+ // This is used to determine which thread pool to use
+ KeyExtent threadPoolExtent = batch.keySet().iterator().next();
+
+ if (waitForWrites) {
+ writeTracker.waitForWrites(TabletType.type(batch.keySet()));
+ }
+
+ Set<Column> columnSet = tcolumns.isEmpty() ? Collections.emptySet()
+ : new HashSet<>(Collections2.transform(tcolumns, Column::new));
+
+ ScanParameters scanParams =
+ new ScanParameters(-1, new Authorizations(authorizations), columnSet, ssiList, ssio, false,
+ SamplerConfigurationImpl.fromThrift(tSamplerConfig), batchTimeOut, contextArg);
+
+ final MultiScanSession mss =
+ new MultiScanSession(credentials, threadPoolExtent, batch, scanParams, executionHints);
+
+ mss.numTablets = batch.size();
+ for (List<Range> ranges : batch.values()) {
+ mss.numRanges += ranges.size();
+ }
+
+ long sid = server.sessionManager.createSession(mss, true);
+
+ MultiScanResult result;
+ try {
+ result = continueMultiScan(sid, mss);
+ } finally {
+ server.sessionManager.unreserveSession(sid);
+ }
+
+ return new InitialMultiScan(sid, result);
+ }
+
+ @Override
+ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID)
+ throws NoSuchScanIDException, TSampleNotPresentException {
+
+ MultiScanSession session = (MultiScanSession) server.sessionManager.reserveSession(scanID);
+
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ try {
+ return continueMultiScan(scanID, session);
+ } finally {
+ server.sessionManager.unreserveSession(session);
+ }
+ }
+
+ private MultiScanResult continueMultiScan(long scanID, MultiScanSession session)
+ throws TSampleNotPresentException {
+
+ if (session.lookupTask == null) {
+ session.lookupTask = new LookupTask(server, scanID);
+ server.resourceManager.executeReadAhead(session.threadPoolExtent,
+ getScanDispatcher(session.threadPoolExtent), session, session.lookupTask);
+ }
+
+ try {
+ MultiScanResult scanResult =
+ session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ session.lookupTask = null;
+ return scanResult;
+ } catch (ExecutionException e) {
+ server.sessionManager.removeSession(scanID);
+ if (e.getCause() instanceof SampleNotPresentException) {
+ throw new TSampleNotPresentException();
+ } else {
+ log.warn("Failed to get multiscan result", e);
+ throw new RuntimeException(e);
+ }
+ } catch (TimeoutException e1) {
+ long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ server.sessionManager.removeIfNotAccessed(scanID, timeout);
+ List<TKeyValue> results = Collections.emptyList();
+ Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
+ List<TKeyExtent> fullScans = Collections.emptyList();
+ return new MultiScanResult(results, failures, fullScans, null, null, false, true);
+ } catch (Throwable t) {
+ server.sessionManager.removeSession(scanID);
+ log.warn("Failed to get multiscan result", t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ @Override
+ public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+ MultiScanSession session = (MultiScanSession) server.sessionManager.removeSession(scanID);
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ if (log.isTraceEnabled()) {
+ log.trace(String.format(
+ "MultiScanSess %s %,d entries in %.2f secs"
+ + " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ",
+ TServerUtils.clientAddress.get(), session.numEntries, (t2 - session.startTime) / 1000.0,
+ session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
+ }
+ }
+
+ @Override
+ public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty)
+ throws ThriftSecurityException {
+ // Make sure user is real
+ Durability durability = DurabilityImpl.fromThrift(tdurabilty);
+ security.authenticateUser(credentials, credentials);
+ server.updateMetrics.addPermissionErrors(0);
+
+ UpdateSession us =
+ new UpdateSession(new TservConstraintEnv(server.getContext(), security, credentials),
+ credentials, durability);
+ return server.sessionManager.createSession(us, false);
+ }
+
+ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
+ long t1 = System.currentTimeMillis();
+ if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) {
+ return;
+ }
+ if (us.currentTablet == null
+ && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
+ // if there were previous failures, then do not accept additional writes
+ return;
+ }
+
+ TableId tableId = null;
+ try {
+ // if user has no permission to write to this table, add it to
+ // the failures list
+ boolean sameTable = us.currentTablet != null
+ && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
+ tableId = keyExtent.getTableId();
+ if (sameTable || security.canWrite(us.getCredentials(), tableId,
+ Tables.getNamespaceId(server.getContext(), tableId))) {
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = server.getOnlineTablet(keyExtent);
+ if (us.currentTablet != null) {
+ us.queuedMutations.put(us.currentTablet, new ArrayList<>());
+ } else {
+ // not serving tablet, so report all mutations as
+ // failures
+ us.failures.put(keyExtent, 0L);
+ server.updateMetrics.addUnknownTabletErrors(0);
+ }
+ } else {
+ log.warn("Denying access to table {} for user {}", keyExtent.getTableId(), us.getUser());
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
+ server.updateMetrics.addPermissionErrors(0);
+ return;
+ }
+ } catch (TableNotFoundException tnfe) {
+ log.error("Table " + tableId + " not found ", tnfe);
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, SecurityErrorCode.TABLE_DOESNT_EXIST);
+ server.updateMetrics.addUnknownTabletErrors(0);
+ return;
+ } catch (ThriftSecurityException e) {
+ log.error("Denying permission to check user " + us.getUser() + " with user " + e.getUser(),
+ e);
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, e.getCode());
+ server.updateMetrics.addPermissionErrors(0);
+ return;
+ }
+ }
+
+ @Override
+ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
+ List<TMutation> tmutations) {
+ UpdateSession us = (UpdateSession) server.sessionManager.reserveSession(updateID);
+ if (us == null) {
+ return;
+ }
+
+ boolean reserved = true;
+ try {
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+ setUpdateTablet(us, keyExtent);
+
+ if (us.currentTablet != null) {
+ long additionalMutationSize = 0;
+ List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
+ for (TMutation tmutation : tmutations) {
+ Mutation mutation = new ServerMutation(tmutation);
+ mutations.add(mutation);
+ additionalMutationSize += mutation.numBytes();
+ }
+ us.queuedMutationSize += additionalMutationSize;
+ long totalQueued = server.updateTotalQueuedMutationSize(additionalMutationSize);
+ long total = server.getConfiguration().getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
+ if (totalQueued > total) {
+ try {
+ flush(us);
+ } catch (HoldTimeoutException hte) {
+ // Assumption is that the client has timed out and is gone. If that's not the case,
+ // then removing the session should cause the client to fail
+ // in such a way that it retries.
+ log.debug("HoldTimeoutException during applyUpdates, removing session");
+ server.sessionManager.removeSession(updateID, true);
+ reserved = false;
+ }
+ }
+ }
+ } finally {
+ if (reserved) {
+ server.sessionManager.unreserveSession(us);
+ }
+ }
+ }
+
+ private void flush(UpdateSession us) {
+
+ int mutationCount = 0;
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+ Map<CommitSession,TabletMutations> loggables = new HashMap<>();
+ Throwable error = null;
+
+ long pt1 = System.currentTimeMillis();
+
+ boolean containsMetadataTablet = false;
+ for (Tablet tablet : us.queuedMutations.keySet()) {
+ if (tablet.getExtent().isMeta()) {
+ containsMetadataTablet = true;
+ }
+ }
+
+ if (!containsMetadataTablet && us.queuedMutations.size() > 0) {
+ server.resourceManager.waitUntilCommitsAreEnabled();
+ }
+
+ try (TraceScope prep = Trace.startSpan("prep")) {
+ for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
+
+ Tablet tablet = entry.getKey();
+ Durability durability =
+ DurabilityImpl.resolveDurabilty(us.durability, tablet.getDurability());
+ List<Mutation> mutations = entry.getValue();
+ if (mutations.size() > 0) {
+ try {
+ server.updateMetrics.addMutationArraySize(mutations.size());
+
+ PreparedMutations prepared = tablet.prepareMutationsForCommit(us.cenv, mutations);
+
+ if (prepared.tabletClosed()) {
+ if (us.currentTablet == tablet) {
+ us.currentTablet = null;
+ }
+ us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
+ } else {
+ if (!prepared.getNonViolators().isEmpty()) {
+ List<Mutation> validMutations = prepared.getNonViolators();
+ CommitSession session = prepared.getCommitSession();
+ if (durability != Durability.NONE) {
+ loggables.put(session, new TabletMutations(session, validMutations, durability));
+ }
+ sendables.put(session, validMutations);
+ }
+
+ if (!prepared.getViolations().isEmpty()) {
+ us.violations.add(prepared.getViolations());
+ server.updateMetrics.addConstraintViolations(0);
+ }
+ // Use the size of the original mutation list, regardless of how many mutations
+ // did not violate constraints.
+ mutationCount += mutations.size();
+
+ }
+ } catch (Throwable t) {
+ error = t;
+ log.error("Unexpected error preparing for commit", error);
+ break;
+ }
+ }
+ }
+ }
+
+ long pt2 = System.currentTimeMillis();
+ us.prepareTimes.addStat(pt2 - pt1);
+ updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
+
+ if (error != null) {
+ sendables.forEach((commitSession, value) -> commitSession.abortCommit());
+ throw new RuntimeException(error);
+ }
+ try {
+ try (TraceScope wal = Trace.startSpan("wal")) {
+ while (true) {
+ try {
+ long t1 = System.currentTimeMillis();
+
+ server.logger.logManyTablets(loggables);
+
+ long t2 = System.currentTimeMillis();
+ us.walogTimes.addStat(t2 - t1);
+ updateWalogWriteTime((t2 - t1));
+ break;
+ } catch (IOException | FSError ex) {
+ log.warn("logging mutations failed, retrying");
+ } catch (Throwable t) {
+ log.error("Unknown exception logging mutations, counts"
+ + " for mutations in flight not decremented!", t);
+ throw new RuntimeException(t);
+ }
+ }
+ }
+
+ try (TraceScope commit = Trace.startSpan("commit")) {
+ long t1 = System.currentTimeMillis();
+ sendables.forEach((commitSession, mutations) -> {
+ commitSession.commit(mutations);
+ KeyExtent extent = commitSession.getExtent();
+
+ if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
+ // because constraint violations may filter out some
+ // mutations, for proper accounting with the client code,
+ // need to increment the count based on the original
+ // number of mutations from the client NOT the filtered number
+ us.successfulCommits.increment(us.currentTablet,
+ us.queuedMutations.get(us.currentTablet).size());
+ }
+ });
+ long t2 = System.currentTimeMillis();
+
+ us.flushTime += (t2 - pt1);
+ us.commitTimes.addStat(t2 - t1);
+
+ updateAvgCommitTime(t2 - t1, sendables.size());
+ }
+ } finally {
+ us.queuedMutations.clear();
+ if (us.currentTablet != null) {
+ us.queuedMutations.put(us.currentTablet, new ArrayList<>());
+ }
+ server.updateTotalQueuedMutationSize(-us.queuedMutationSize);
+ us.queuedMutationSize = 0;
+ }
+ us.totalUpdates += mutationCount;
+ }
+
+ private void updateWalogWriteTime(long time) {
+ server.updateMetrics.addWalogWriteTime(time);
+ }
+
+ private void updateAvgCommitTime(long time, int size) {
+ if (size > 0)
+ server.updateMetrics.addCommitTime((long) (time / (double) size));
+ }
+
+ private void updateAvgPrepTime(long time, int size) {
+ if (size > 0)
+ server.updateMetrics.addCommitPrep((long) (time / (double) size));
+ }
+
+ @Override
+ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
+ final UpdateSession us = (UpdateSession) server.sessionManager.removeSession(updateID);
+ if (us == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ // clients may or may not see data from an update session while
+ // it is in progress, however when the update session is closed
+ // want to ensure that reads wait for the write to finish
+ long opid = writeTracker.startWrite(us.queuedMutations.keySet());
+
+ try {
+ flush(us);
+ } catch (HoldTimeoutException e) {
+ // Assumption is that the client has timed out and is gone. If that's not the case throw an
+ // exception that will cause it to retry.
+ log.debug("HoldTimeoutException during closeUpdate, reporting no such session");
+ throw new NoSuchScanIDException();
+ } finally {
+ writeTracker.finishWrite(opid);
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace(
+ String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)",
+ TServerUtils.clientAddress.get(), us.totalUpdates,
+ (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(),
+ us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0,
+ us.commitTimes.sum() / 1000.0));
+ }
+ if (us.failures.size() > 0) {
+ Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
+ log.debug(String.format("Failures: %d, first extent %s successful commits: %d",
+ us.failures.size(), first.getKey().toString(), first.getValue()));
+ }
+ List<ConstraintViolationSummary> violations = us.violations.asList();
+ if (violations.size() > 0) {
+ ConstraintViolationSummary first = us.violations.asList().iterator().next();
+ log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(),
+ first.violationDescription, first.numberOfViolatingMutations));
+ }
+ if (us.authFailures.size() > 0) {
+ KeyExtent first = us.authFailures.keySet().iterator().next();
+ log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(),
+ first.toString()));
+ }
+
+ return new UpdateErrors(Translator.translate(us.failures, Translators.KET),
+ Translator.translate(violations, Translators.CVST),
+ Translator.translate(us.authFailures, Translators.KET));
+ }
+
+ @Override
+ public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
+ TMutation tmutation, TDurability tdurability)
+ throws NotServingTabletException, ConstraintViolationException, ThriftSecurityException {
+
+ final TableId tableId = TableId.of(new String(tkeyExtent.getTable(), UTF_8));
+ NamespaceId namespaceId = getNamespaceId(credentials, tableId);
+ if (!security.canWrite(credentials, tableId, namespaceId)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED);
+ }
+ final KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+ final Tablet tablet = server.getOnlineTablet(new KeyExtent(keyExtent));
+ if (tablet == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+ Durability tabletDurability = tablet.getDurability();
+
+ if (!keyExtent.isMeta()) {
+ try {
+ server.resourceManager.waitUntilCommitsAreEnabled();
+ } catch (HoldTimeoutException hte) {
+ // Major hack. Assumption is that the client has timed out and is gone. If that's not the
+ // case, then throwing the following will let client know there
+ // was a failure and it should retry.
+ throw new NotServingTabletException(tkeyExtent);
+ }
+ }
+
+ final long opid = writeTracker.startWrite(TabletType.type(keyExtent));
+
+ try {
+ final Mutation mutation = new ServerMutation(tmutation);
+ final List<Mutation> mutations = Collections.singletonList(mutation);
+
+ PreparedMutations prepared;
+ try (TraceScope prep = Trace.startSpan("prep")) {
+ prepared = tablet.prepareMutationsForCommit(
+ new TservConstraintEnv(server.getContext(), security, credentials), mutations);
+ }
+
+ if (prepared.tabletClosed()) {
+ throw new NotServingTabletException(tkeyExtent);
+ } else if (!prepared.getViolators().isEmpty()) {
+ throw new ConstraintViolationException(
+ Translator.translate(prepared.getViolations().asList(), Translators.CVST));
+ } else {
+ CommitSession session = prepared.getCommitSession();
+ Durability durability = DurabilityImpl
+ .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability);
+
+ // Instead of always looping on true, skip completely when durability is NONE.
+ while (durability != Durability.NONE) {
+ try {
+ try (TraceScope wal = Trace.startSpan("wal")) {
+ server.logger.log(session, mutation, durability);
+ }
+ break;
+ } catch (IOException ex) {
+ log.warn("Error writing mutations to log", ex);
+ }
+ }
+
+ try (TraceScope commit = Trace.startSpan("commit")) {
+ session.commit(mutations);
+ }
+ }
+ } finally {
+ writeTracker.finishWrite(opid);
+ }
+ }
+
+ private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
+ throws ThriftSecurityException {
+ try {
+ return Tables.getNamespaceId(server.getContext(), tableId);
+ } catch (TableNotFoundException e1) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.TABLE_DOESNT_EXIST);
+ }
+ }
+
+ private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates,
+ ArrayList<TCMResult> results, ConditionalSession cs, List<String> symbols)
+ throws IOException {
+ Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
+
+ final CompressedIterators compressedIters = new CompressedIterators(symbols);
+ ConditionCheckerContext checkerContext = new ConditionCheckerContext(server.getContext(),
+ compressedIters, server.getContext().getTableConfiguration(cs.tableId));
+
+ while (iter.hasNext()) {
+ final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
+ final Tablet tablet = server.getOnlineTablet(entry.getKey());
+
+ if (tablet == null || tablet.isClosed()) {
+ for (ServerConditionalMutation scm : entry.getValue()) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ }
+ iter.remove();
+ } else {
+ final List<ServerConditionalMutation> okMutations =
+ new ArrayList<>(entry.getValue().size());
+ final List<TCMResult> resultsSubList = results.subList(results.size(), results.size());
+
+ ConditionChecker checker =
+ checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList);
+ try {
+ tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
+
+ if (okMutations.size() > 0) {
+ entry.setValue(okMutations);
+ } else {
+ iter.remove();
+ }
+ } catch (TabletClosedException | IterationInterruptedException | TooManyFilesException e) {
+ // clear anything added while checking conditions.
+ resultsSubList.clear();
+
+ for (ServerConditionalMutation scm : entry.getValue()) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ }
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates,
+ ArrayList<TCMResult> results, ConditionalSession sess) {
+ Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
+
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+ Map<CommitSession,TabletMutations> loggables = new HashMap<>();
+
+ boolean sessionCanceled = sess.interruptFlag.get();
+
+ try (TraceScope prepSpan = Trace.startSpan("prep")) {
+ long t1 = System.currentTimeMillis();
+ for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+ final Tablet tablet = server.getOnlineTablet(entry.getKey());
+ if (tablet == null || tablet.isClosed() || sessionCanceled) {
+ addMutationsAsTCMResults(results, entry.getValue(), TCMStatus.IGNORED);
+ } else {
+ final Durability durability =
+ DurabilityImpl.resolveDurabilty(sess.durability, tablet.getDurability());
+
+ @SuppressWarnings("unchecked")
+ List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
+ if (!mutations.isEmpty()) {
+
+ PreparedMutations prepared = tablet.prepareMutationsForCommit(
+ new TservConstraintEnv(server.getContext(), security, sess.credentials), mutations);
+
+ if (prepared.tabletClosed()) {
+ addMutationsAsTCMResults(results, mutations, TCMStatus.IGNORED);
+ } else {
+ if (!prepared.getNonViolators().isEmpty()) {
+ // Only log and commit mutations that did not violate constraints.
+ List<Mutation> validMutations = prepared.getNonViolators();
+ addMutationsAsTCMResults(results, validMutations, TCMStatus.ACCEPTED);
+ CommitSession session = prepared.getCommitSession();
+ if (durability != Durability.NONE) {
+ loggables.put(session, new TabletMutations(session, validMutations, durability));
+ }
+ sendables.put(session, validMutations);
+ }
+
+ if (!prepared.getViolators().isEmpty()) {
+ addMutationsAsTCMResults(results, prepared.getViolators(), TCMStatus.VIOLATED);
+ }
+ }
+ }
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+ updateAvgPrepTime(t2 - t1, es.size());
+ }
+
+ try (TraceScope walSpan = Trace.startSpan("wal")) {
+ while (loggables.size() > 0) {
+ try {
+ long t1 = System.currentTimeMillis();
+ server.logger.logManyTablets(loggables);
+ long t2 = System.currentTimeMillis();
+ updateWalogWriteTime(t2 - t1);
+ break;
+ } catch (IOException | FSError ex) {
+ log.warn("logging mutations failed, retrying");
+ } catch (Throwable t) {
+ log.error("Unknown exception logging mutations, counts for"
+ + " mutations in flight not decremented!", t);
+ throw new RuntimeException(t);
+ }
+ }
+ }
+
+ try (TraceScope commitSpan = Trace.startSpan("commit")) {
+ long t1 = System.currentTimeMillis();
+ sendables.forEach(CommitSession::commit);
+ long t2 = System.currentTimeMillis();
+ updateAvgCommitTime(t2 - t1, sendables.size());
+ }
+ }
+
+ /**
+ * Transform and add each mutation as a {@link TCMResult} with the mutation's ID and the specified
+ * status to the {@link TCMResult} list.
+ */
+ private void addMutationsAsTCMResults(final List<TCMResult> list,
+ final Collection<? extends Mutation> mutations, final TCMStatus status) {
+ mutations.stream()
+ .map(mutation -> new TCMResult(((ServerConditionalMutation) mutation).getID(), status))
+ .forEach(list::add);
+ }
+
+ private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs,
+ Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results,
+ List<String> symbols) throws IOException {
+ // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is
+ // more efficient and detect duplicate rows.
+ ConditionalMutationSet.sortConditionalMutations(updates);
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<>();
+
+ // can not process two mutations for the same row, because one will not see what the other
+ // writes
+ ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
+
+ // get as many locks as possible w/o blocking... defer any rows that are locked
+ List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+ try {
+ try (TraceScope checkSpan = Trace.startSpan("Check conditions")) {
+ checkConditions(updates, results, cs, symbols);
+ }
+
+ try (TraceScope updateSpan = Trace.startSpan("apply conditional mutations")) {
+ writeConditionalMutations(updates, results, cs);
+ }
+ } finally {
+ rowLocks.releaseRowLocks(locks);
+ }
+ return deferred;
+ }
+
+ @Override
+ public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
+ List<ByteBuffer> authorizations, String tableIdStr, TDurability tdurabilty,
+ String classLoaderContext) throws ThriftSecurityException, TException {
+
+ TableId tableId = TableId.of(tableIdStr);
+ Authorizations userauths = null;
+ NamespaceId namespaceId = getNamespaceId(credentials, tableId);
+ if (!security.canConditionallyUpdate(credentials, tableId, namespaceId)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED);
+ }
+
+ userauths = security.getUserAuthorizations(credentials);
+ for (ByteBuffer auth : authorizations) {
+ if (!userauths.contains(ByteBufferUtil.toBytes(auth))) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.BAD_AUTHORIZATIONS);
+ }
+ }
+
+ ConditionalSession cs = new ConditionalSession(credentials, new Authorizations(authorizations),
+ tableId, DurabilityImpl.fromThrift(tdurabilty));
+
+ long sid = server.sessionManager.createSession(cs, false);
+ return new TConditionalSession(sid, server.getLockID(), server.sessionManager.getMaxIdleTime());
+ }
+
+ @Override
+ public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID,
+ Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+ throws NoSuchScanIDException, TException {
+
+ ConditionalSession cs = (ConditionalSession) server.sessionManager.reserveSession(sessID);
+
+ if (cs == null || cs.interruptFlag.get()) {
+ throw new NoSuchScanIDException();
+ }
+
+ if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) {
+ try {
+ server.resourceManager.waitUntilCommitsAreEnabled();
+ } catch (HoldTimeoutException hte) {
+ // Assumption is that the client has timed out and is gone. If that's not the case throw
+ // an exception that will cause it to retry.
+ log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session");
+ throw new NoSuchScanIDException();
+ }
+ }
+
+ TableId tid = cs.tableId;
+ long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
+
+ try {
+ Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations,
+ Translators.TKET, new Translator.ListTranslator<>(ServerConditionalMutation.TCMT));
+
+ for (KeyExtent ke : updates.keySet()) {
+ if (!ke.getTableId().equals(tid)) {
+ throw new IllegalArgumentException(
+ "Unexpected table id " + tid + " != " + ke.getTableId());
+ }
+ }
+
+ ArrayList<TCMResult> results = new ArrayList<>();
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred =
+ conditionalUpdate(cs, updates, results, symbols);
+
+ while (deferred.size() > 0) {
+ deferred = conditionalUpdate(cs, deferred, results, symbols);
+ }
+
+ return results;
+ } catch (IOException ioe) {
+ throw new TException(ioe);
+ } finally {
+ writeTracker.finishWrite(opid);
+ server.sessionManager.unreserveSession(sessID);
+ }
+ }
+
+ @Override
+ public void invalidateConditionalUpdate(TInfo tinfo, long sessID) {
+ // this method should wait for any running conditional update to complete
+ // after this method returns a conditional update should not be able to start
+
+ ConditionalSession cs = (ConditionalSession) server.sessionManager.getSession(sessID);
+ if (cs != null) {
+ cs.interruptFlag.set(true);
+ }
+
+ cs = (ConditionalSession) server.sessionManager.reserveSession(sessID, true);
+ if (cs != null) {
+ server.sessionManager.removeSession(sessID, true);
+ }
+ }
+
+ @Override
+ public void closeConditionalUpdate(TInfo tinfo, long sessID) {
+ server.sessionManager.removeSession(sessID, false);
+ }
+
+ @Override
+ public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
+ ByteBuffer splitPoint) throws NotServingTabletException, ThriftSecurityException {
+
+ TableId tableId = TableId.of(new String(ByteBufferUtil.toBytes(tkeyExtent.table)));
+ NamespaceId namespaceId = getNamespaceId(credentials, tableId);
+
+ if (!security.canSplitTablet(credentials, tableId, namespaceId)) {
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED);
+ }
+
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+
+ Tablet tablet = server.getOnlineTablet(keyExtent);
+ if (tablet == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ if (keyExtent.getEndRow() == null
+ || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
+ try {
+ if (server.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+ } catch (IOException e) {
+ log.warn("Failed to split " + keyExtent, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) {
+ return server.getStats(server.sessionManager.getActiveScansPerTable());
+ }
+
+ @Override
+ public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) {
+ List<TabletStats> result = new ArrayList<>();
+ TableId text = TableId.of(tableId);
+ KeyExtent start = new KeyExtent(text, new Text(), null);
+ for (Entry<KeyExtent,Tablet> entry : server.getOnlineTablets().tailMap(start).entrySet()) {
+ KeyExtent ke = entry.getKey();
+ if (ke.getTableId().compareTo(text) == 0) {
+ Tablet tablet = entry.getValue();
+ TabletStats stats = tablet.getTabletStats();
+ stats.extent = ke.toThrift();
+ stats.ingestRate = tablet.ingestRate();
+ stats.queryRate = tablet.queryRate();
+ stats.splitCreationTime = tablet.getSplitCreationTime();
+ stats.numEntries = tablet.getNumEntries();
+ result.add(stats);
+ }
+ }
+ return result;
+ }
+
+ private void checkPermission(TCredentials credentials, String lock, final String request)
+ throws ThriftSecurityException {
+ try {
+ log.trace("Got {} message from user: {}", request, credentials.getPrincipal());
+ if (!security.canPerformSystemActions(credentials)) {
+ log.warn("Got {} message from user: {}", request, credentials.getPrincipal());
+ throw new ThriftSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED);
+ }
+ } catch (ThriftSecurityException e) {
+ log.warn("Got {} message from unauthenticatable user: {}", request, e.getUser());
+ if (server.getContext().getCredentials().getToken().getClass().getName()
+ .equals(credentials.getTokenClassName())) {
+ log.error("Got message from a service with a mismatched configuration."
+ + " Please ensure a compatible configuration.", e);
+ }
+ throw e;
+ }
+
+ if (server.getLock() == null || !server.getLock().wasLockAcquired()) {
+ log.debug("Got {} message before my lock was acquired, ignoring...", request);
+ throw new RuntimeException("Lock not acquired");
+ }
+
+ if (server.getLock() != null && server.getLock().wasLockAcquired()
+ && !server.getLock().isLocked()) {
+ Halt.halt(1, () -> {
+ log.info("Tablet server no longer holds lock during checkPermission() : {}, exiting",
+ request);
+ server.gcLogger.logGCInfo(server.getConfiguration());
+ });
+ }
+
+ if (lock != null) {
+ ZooUtil.LockID lid =
+ new ZooUtil.LockID(server.getContext().getZooKeeperRoot() + Constants.ZMASTER_LOCK, lock);
+
+ try {
+ if (!ZooLock.isLockHeld(server.masterLockCache, lid)) {
+ // maybe the cache is out of date and a new master holds the
+ // lock?
+ server.masterLockCache.clear();
+ if (!ZooLock.isLockHeld(server.masterLockCache, lid)) {
+ log.warn("Got {} message from a master that does not hold the current lock {}", request,
+ lock);
+ throw new RuntimeException("bad master lock");
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("bad master lock", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadTablet(TInfo tinfo, TCredentials credentials, String lock,
+ final TKeyExtent textent) {
+
+ try {
+ checkPermission(credentials, lock, "loadTablet");
+ } catch (ThriftSecurityException e) {
+ log.error("Caller doesn't have permission to load a tablet", e);
+ throw new RuntimeException(e);
+ }
+
+ final KeyExtent extent = new KeyExtent(textent);
+
+ synchronized (server.unopenedTablets) {
+ synchronized (server.openingTablets) {
+ synchronized (server.onlineTablets) {
+
+ // checking if this exact tablet is in any of the sets
+ // below is not a strong enough check
+ // when splits and fix splits occurring
+
+ Set<KeyExtent> unopenedOverlapping =
+ KeyExtent.findOverlapping(extent, server.unopenedTablets);
+ Set<KeyExtent> openingOverlapping =
+ KeyExtent.findOverlapping(extent, server.openingTablets);
+ Set<KeyExtent> onlineOverlapping =
+ KeyExtent.findOverlapping(extent, server.getOnlineTablets());
+
+ Set<KeyExtent> all = new HashSet<>();
+ all.addAll(unopenedOverlapping);
+ all.addAll(openingOverlapping);
+ all.addAll(onlineOverlapping);
+
+ if (!all.isEmpty()) {
+
+ // ignore any tablets that have recently split, for error logging
+ for (KeyExtent e2 : onlineOverlapping) {
+ Tablet tablet = server.getOnlineTablet(e2);
+ if (System.currentTimeMillis() - tablet.getSplitCreationTime()
+ < RECENTLY_SPLIT_MILLIES) {
+ all.remove(e2);
+ }
+ }
+
+ // ignore self, for error logging
+ all.remove(extent);
+
+ if (all.size() > 0) {
+ log.error("Tablet {} overlaps previously assigned {} {} {}", extent,
+ unopenedOverlapping, openingOverlapping, onlineOverlapping + " " + all);
+ }
+ return;
+ }
+
+ server.unopenedTablets.add(extent);
+ }
+ }
+ }
+
+ TabletLogger.loading(extent, server.getTabletSession());
+
+ final AssignmentHandler ah = new AssignmentHandler(server, extent);
+ // final Runnable ah = new LoggingRunnable(log, );
+ // Root tablet assignment must take place immediately
+
+ if (extent.isRootTablet()) {
+ new Daemon("Root Tablet Assignment") {
+ @Override
+ public void run() {
+ ah.run();
+ if (server.getOnlineTablets().containsKey(extent)) {
+ log.info("Root tablet loaded: {}", extent);
+ } else {
+ log.info("Root tablet failed to load");
+ }
+
+ }
+ }.start();
+ } else {
+ if (extent.isMeta()) {
+ server.resourceManager.addMetaDataAssignment(extent, log, ah);
+ } else {
+ server.resourceManager.addAssignment(extent, log, ah);
+ }
+ }
+ }
+
+ @Override
+ public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent,
+ TUnloadTabletGoal goal, long requestTime) {
+ try {
+ checkPermission(credentials, lock, "unloadTablet");
+ } catch (ThriftSecurityException e) {
+ log.error("Caller doesn't have permission to unload a tablet", e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent extent = new KeyExtent(textent);
+
+ server.resourceManager.addMigration(extent,
+ new LoggingRunnable(log, new UnloadTabletHandler(server, extent, goal, requestTime)));
+ }
+
+ @Override
+ public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId,
+ ByteBuffer startRow, ByteBuffer endRow) {
+ try {
+ checkPermission(credentials, lock, "flush");
+ } catch (ThriftSecurityException e) {
+ log.error("Caller doesn't have permission to flush a table", e);
+ throw new RuntimeException(e);
+ }
+
+ ArrayList<Tablet> tabletsToFlush = new ArrayList<>();
+
+ KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
+ ByteBufferUtil.toText(startRow));
+
+ for (Tablet tablet : server.getOnlineTablets().values()) {
+ if (ke.overlaps(tablet.getExtent())) {
+ tabletsToFlush.add(tablet);
+ }
+ }
+
+ Long flushID = null;
+
+ for (Tablet tablet : tabletsToFlush) {
+ if (flushID == null) {
+ // read the flush id once from zookeeper instead of reading
+ // it for each tablet
+ try {
+ flushID = tablet.getFlushID();
+ } catch (NoNodeException e) {
+ // table was probably deleted
+ log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
+ return;
+ }
+ }
+ tablet.flush(flushID);
+ }
+ }
+
+ @Override
+ public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) {
+ try {
+ checkPermission(credentials, lock, "flushTablet");
+ } catch (ThriftSecurityException e) {
+ log.error("Caller doesn't have permission to flush a tablet", e);
+ throw new RuntimeException(e);
+ }
+
+ Tablet tablet = server.getOnlineTablet(new KeyExtent(textent));
+ if (tablet != null) {
+ log.info("Flushing {}", tablet.getExtent());
+ try {
+ tablet.flush(tablet.getFlushID());
+ } catch (NoNodeException nne) {
+ log.info("Asked to flush tablet that has no flush id {} {}", new KeyExtent(textent),
+ nne.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void halt(TInfo tinfo, TCredentials credentials, String lock)
+ throws ThriftSecurityException {
+
+ checkPermission(credentials, lock, "halt");
+
+ Halt.halt(0, () -> {
+ log.info("Master requested tablet server halt");
+ server.gcLogger.logGCInfo(server.getConfiguration());
+ server.requestStop();
+ try {
+ server.getLock().unlock();
+ } catch (Exception e) {
+ log.error("Caught exception unlocking TabletServer lock", e);
+ }
+ });
+ }
+
+ @Override
+ public void fastHalt(TInfo info, TCredentials credentials, String lock) {
+ try {
+ halt(info, credentials, lock);
+ } catch (Exception e) {
+ log.warn("Error halting", e);
+ }
+ }
+
+ @Override
+ public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) {
+ return server.statsKeeper.getTabletStats();
+ }
+
+ @Override
+ public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials)
+ throws ThriftSecurityException, TException {
+ try {
+ checkPermission(credentials, null, "getScans");
+ } catch (ThriftSecurityException e) {
+ log.error("Caller doesn't have permission to get active scans", e);
+ throw e;
+ }
+
+ return server.sessionManager.getActiveScans();
+ }
+
+ @Override
+ public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) {
+ try {
+ checkPermission(credentials, lock, "chop");
+ } catch (ThriftSecurityException e) {
+ log.error("Caller doesn't have permission to chop extent", e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent ke = new KeyExtent(textent);
+
+ Tablet tablet = server.getOnlineTablet(ke);
+ if (tablet != null) {
+ tablet.chopFiles();
+ }
+ }
+
+ @Override
+ public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId,
+ ByteBuffer startRow, ByteBuffer endRow) {
+ try {
+ checkPermission(credentials, lock, "compact");
+ } catch (ThriftSecurityException e) {
+ log.error("Caller doesn't have permission to compact a table", e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
+ ByteBufferUtil.toText(startRow));
+
+ ArrayList<Tablet> tabletsToCompact = new ArrayList<>();
+
+ for (Tablet tablet : server.getOnlineTablets().values()) {
+ if (ke.overlaps(tablet.getExtent())) {
+ tabletsToCompact.add(tablet);
+ }
+ }
+
+ Pair<Long,UserCompactionConfig> compactionInfo = null;
+
+ for (Tablet tablet : tabletsToCompact) {
+ // all for the same table id, so only need to read
+ // compaction id once
+ if (compactionInfo == null) {
+ try {
+ compactionInfo = tablet.getCompactionID();
+ } catch (NoNodeException e) {
+ log.info("Asked to compact table with no compaction id {} {}", ke, e.getMessage());
+ return;
+ }
+ }
+ tablet.compactAll(compactionInfo.getFirst(), compactionInfo.getSecond());
+ }
+
+ }
+
+ @Override
+ public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials)
+ throws ThriftSecurityException, TException {
+ try {
+ checkPermission(credentials, null, "getActiveCompactions");
+ } catch (ThriftSecurityException e) {
+ log.error("Caller doesn't have permission to get active compactions", e);
+ throw e;
+ }
+
+ List<CompactionInfo> compactions = Compactor.getRunningCompactions();
+ List<ActiveCompaction> ret = new ArrayList<>(compactions.size());
+
+ for (CompactionInfo compactionInfo : compactions) {
+ ret.add(compactionInfo.toThrift());
+ }
+
+ return ret;
+ }
+
+ @Override
+ public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) {
+ String log = server.logger.getLogFile();
+ // Might be null if there no active logger
+ if (log == null) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(log);
+ }
+
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) {
+ log.warn("Garbage collector is attempting to remove logs through the tablet server");
+ log.warn("This is probably because your file"
+ + " Garbage Collector is an older version than your tablet servers.\n"
+ + "Restart your file Garbage Collector.");
+ }
+
+ private TSummaries getSummaries(Future<SummaryCollection> future) throws TimeoutException {
+ try {
+ SummaryCollection sc =
+ future.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ return sc.toThrift();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private TSummaries handleTimeout(long sessionId) {
+ long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ server.sessionManager.removeIfNotAccessed(sessionId, timeout);
+ return new TSummaries(false, sessionId, -1, -1, null);
+ }
+
+ private TSummaries startSummaryOperation(TCredentials credentials,
+ Future<SummaryCollection> future) {
+ try {
+ return getSummaries(future);
+ } catch (TimeoutException e) {
+ long sid =
+ server.sessionManager.createSession(new SummarySession(credentials, future), false);
+ while (sid == 0) {
+ server.sessionManager.removeSession(sid);
+ sid = server.sessionManager.createSession(new SummarySession(credentials, future), false);
+ }
+ return handleTimeout(sid);
+ }
+ }
+
+ @Override
+ public TSummaries startGetSummaries(TInfo tinfo, TCredentials credentials,
+ TSummaryRequest request)
+ throws ThriftSecurityException, ThriftTableOperationException, TException {
+ NamespaceId namespaceId;
+ TableId tableId = TableId.of(request.getTableId());
+ try {
+ namespaceId = Tables.getNamespaceId(server.getContext(), tableId);
+ } catch (TableNotFoundException e1) {
+ throw new ThriftTableOperationException(tableId.canonical(), null, null,
+ TableOperationExceptionType.NOTFOUND, null);
+ }
+
+ if (!security.canGetSummaries(credentials, tableId, namespaceId)) {
+ throw new AccumuloSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+ }
+
+ ExecutorService es = server.resourceManager.getSummaryPartitionExecutor();
+ Future<SummaryCollection> future = new Gatherer(server.getContext(), request,
+ server.getContext().getTableConfiguration(tableId), server.getContext().getCryptoService())
+ .gather(es);
+
+ return startSummaryOperation(credentials, future);
+ }
+
+ @Override
+ public TSummaries startGetSummariesForPartition(TInfo tinfo, TCredentials credentials,
+ TSummaryRequest request, int modulus, int remainder)
+ throws ThriftSecurityException, TException {
+ // do not expect users to call this directly, expect other tservers to call this method
+ if (!security.canPerformSystemActions(credentials)) {
+ throw new AccumuloSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+ }
+
+ ExecutorService spe = server.resourceManager.getSummaryRemoteExecutor();
+ TableConfiguration tableConfig =
+ server.getContext().getTableConfiguration(TableId.of(request.getTableId()));
+ Future<SummaryCollection> future = new Gatherer(server.getContext(), request, tableConfig,
+ server.getContext().getCryptoService()).processPartition(spe, modulus, remainder);
+
+ return startSummaryOperation(credentials, future);
+ }
+
+ @Override
+ public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials credentials,
+ TSummaryRequest request, Map<String,List<TRowRange>> files)
+ throws ThriftSecurityException, TException {
+ // do not expect users to call this directly, expect other tservers to call this method
+ if (!security.canPerformSystemActions(credentials)) {
+ throw new AccumuloSecurityException(credentials.getPrincipal(),
+ SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+ }
+
+ ExecutorService srp = server.resourceManager.getSummaryRetrievalExecutor();
+ TableConfiguration tableCfg =
+ server.getContext().getTableConfiguration(TableId.of(request.getTableId()));
+ BlockCache summaryCache = server.resourceManager.getSummaryCache();
+ BlockCache indexCache = server.resourceManager.getIndexCache();
+ Cache<String,Long> fileLenCache = server.resourceManager.getFileLenCache();
+ FileSystemResolver volMgr = p -> fs.getFileSystemByPath(p);
+ Future<SummaryCollection> future =
+ new Gatherer(server.getContext(), request, tableCfg, server.getContext().getCryptoService())
+ .processFiles(volMgr, files, summaryCache, indexCache, fileLenCache, srp);
+
+ return startSummaryOperation(credentials, future);
+ }
+
+ @Override
+ public TSummaries contiuneGetSummaries(TInfo tinfo, long sessionId)
+ throws NoSuchScanIDException, TException {
+ SummarySession session = (SummarySession) server.sessionManager.getSession(sessionId);
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ Future<SummaryCollection> future = session.getFuture();
+ try {
+ TSummaries tsums = getSummaries(future);
+ server.sessionManager.removeSession(sessionId);
+ return tsums;
+ } catch (TimeoutException e) {
+ return handleTimeout(sessionId);
+ }
+ }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java
new file mode 100644
index 0000000..3f9bbec
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class UnloadTabletHandler implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(UnloadTabletHandler.class);
+ private final KeyExtent extent;
+ private final TUnloadTabletGoal goalState;
+ private final long requestTimeSkew;
+ private final TabletServer server;
+
+ public UnloadTabletHandler(TabletServer server, KeyExtent extent, TUnloadTabletGoal goalState,
+ long requestTime) {
+ this.extent = extent;
+ this.goalState = goalState;
+ this.server = server;
+ this.requestTimeSkew = requestTime - MILLISECONDS.convert(System.nanoTime(), NANOSECONDS);
+ }
+
+ @Override
+ public void run() {
+
+ Tablet t = null;
+
+ synchronized (server.unopenedTablets) {
+ if (server.unopenedTablets.contains(extent)) {
+ server.unopenedTablets.remove(extent);
+ // enqueueMasterMessage(new TabletUnloadedMessage(extent));
+ return;
+ }
+ }
+ synchronized (server.openingTablets) {
+ while (server.openingTablets.contains(extent)) {
+ try {
+ server.openingTablets.wait();
+ } catch (InterruptedException e) {}
+ }
+ }
+ synchronized (server.onlineTablets) {
+ if (server.onlineTablets.snapshot().containsKey(extent)) {
+ t = server.onlineTablets.snapshot().get(extent);
+ }
+ }
+
+ if (t == null) {
+ // Tablet has probably been recently unloaded: repeated master
+ // unload request is crossing the successful unloaded message
+ if (!server.recentlyUnloadedCache.containsKey(extent)) {
+ log.info("told to unload tablet that was not being served {}", extent);
+ server.enqueueMasterMessage(
+ new TabletStatusMessage(TabletLoadState.UNLOAD_FAILURE_NOT_SERVING, extent));
+ }
+ return;
+ }
+
+ try {
+ t.close(!goalState.equals(TUnloadTabletGoal.DELETED));
+ } catch (Throwable e) {
+
+ if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) {
+ log.debug("Failed to unload tablet {}... it was already closing or closed : {}", extent,
+ e.getMessage());
+ } else {
+ log.error("Failed to close tablet {}... Aborting migration", extent, e);
+ server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOAD_ERROR, extent));
+ }
+ return;
+ }
+
+ // stop serving tablet - client will get not serving tablet
+ // exceptions
+ server.recentlyUnloadedCache.put(extent, System.currentTimeMillis());
+ server.onlineTablets.remove(extent);
+
+ try {
+ TServerInstance instance =
+ new TServerInstance(server.clientAddress, server.getLock().getSessionId());
+ TabletLocationState tls = null;
+ try {
+ tls = new TabletLocationState(extent, null, instance, null, null, null, false);
+ } catch (BadLocationStateException e) {
+ log.error("Unexpected error", e);
+ }
+ if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || extent.isRootTablet()
+ || (extent.isMeta()
+ && !server.getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) {
+ TabletStateStore.unassign(server.getContext(), tls, null);
+ } else {
+ TabletStateStore.suspend(server.getContext(), tls, null,
+ requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS));
+ }
+ } catch (DistributedStoreException ex) {
+ log.warn("Unable to update storage", ex);
+ } catch (KeeperException e) {
+ log.warn("Unable determine our zookeeper session information", e);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while getting our zookeeper session information", e);
+ }
+
+ // tell the master how it went
+ server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOADED, extent));
+
+ // roll tablet stats over into tablet server's statsKeeper object as
+ // historical data
+ server.statsKeeper.saveMajorMinorTimes(t.getTabletStats());
+ }
+}