You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/04/28 20:07:03 UTC
[accumulo] branch main updated: Add base objects for Tablet and TabletServer for the upcoming ScanServer feature (#2661)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 39bc7a0524 Add base objects for Tablet and TabletServer for the upcoming ScanServer feature (#2661)
39bc7a0524 is described below
commit 39bc7a0524862a3d39a818e9ac7748874dc06949
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Apr 28 16:06:57 2022 -0400
Add base objects for Tablet and TabletServer for the upcoming ScanServer feature (#2661)
This commit introduces TabletBase, a base class for Tablets, and TabletHostingServer,
an interface for server components that host Tablets. These changes will be used by
the ScanServer feature that is in the works.
Related to #2411
Co-authored-by: Keith Turner <kt...@apache.org>
---
.../system/SourceSwitchingIterator.java | 2 +
.../accumulo/tserver/TabletClientHandler.java | 10 +-
.../accumulo/tserver/TabletHostingServer.java | 57 +++
.../org/apache/accumulo/tserver/TabletServer.java | 27 +-
.../accumulo/tserver/ThriftScanClientHandler.java | 51 +--
.../apache/accumulo/tserver/scan/LookupTask.java | 4 +-
.../accumulo/tserver/scan/NextBatchTask.java | 4 +-
.../org/apache/accumulo/tserver/scan/ScanTask.java | 6 +-
.../accumulo/tserver/tablet/ScanDataSource.java | 33 +-
.../apache/accumulo/tserver/tablet/Scanner.java | 8 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 455 +++-----------------
.../apache/accumulo/tserver/tablet/TabletBase.java | 462 +++++++++++++++++++++
12 files changed, 670 insertions(+), 449 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SourceSwitchingIterator.java
index 27bf74e6b4..92bffc6555 100644
--- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SourceSwitchingIterator.java
@@ -54,6 +54,8 @@ public class SourceSwitchingIterator implements InterruptibleIterator {
SortedKeyValueIterator<Key,Value> iterator() throws IOException;
void setInterruptFlag(AtomicBoolean flag);
+
+ default void close(boolean sawErrors) {}
}
private DataSource source;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 5ffa3a7311..242b52ba55 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -1040,7 +1040,7 @@ public class TabletClientHandler implements TabletClientService.Iface {
}
static void checkPermission(SecurityOperation security, ServerContext context,
- TabletServer server, TCredentials credentials, String lock, final String request)
+ TabletHostingServer server, TCredentials credentials, String lock, final String request)
throws ThriftSecurityException {
try {
log.trace("Got {} message from user: {}", request, credentials.getPrincipal());
@@ -1069,7 +1069,7 @@ public class TabletClientHandler implements TabletClientService.Iface {
Halt.halt(1, () -> {
log.info("Tablet server no longer holds lock during checkPermission() : {}, exiting",
request);
- server.gcLogger.logGCInfo(server.getConfiguration());
+ server.getGcLogger().logGCInfo(server.getConfiguration());
});
}
@@ -1078,11 +1078,11 @@ public class TabletClientHandler implements TabletClientService.Iface {
new ZooUtil.LockID(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK, lock);
try {
- if (!ServiceLock.isLockHeld(server.managerLockCache, lid)) {
+ if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) {
// maybe the cache is out of date and a new manager holds the
// lock?
- server.managerLockCache.clear();
- if (!ServiceLock.isLockHeld(server.managerLockCache, lid)) {
+ server.getManagerLockCache().clear();
+ if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) {
log.warn("Got {} message from a manager that does not hold the current lock {}",
request, lock);
throw new RuntimeException("bad manager lock");
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
new file mode 100644
index 0000000000..15c43d6625
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.tablet.Tablet;
+
+public interface TabletHostingServer {
+
+ ServerContext getContext();
+
+ AccumuloConfiguration getConfiguration();
+
+ Tablet getOnlineTablet(KeyExtent extent);
+
+ SessionManager getSessionManager();
+
+ TabletServerResourceManager getResourceManager();
+
+ TabletServerScanMetrics getScanMetrics();
+
+ Session getSession(long scanID);
+
+ TableConfiguration getTableConfiguration(KeyExtent threadPoolExtent);
+
+ ServiceLock getLock();
+
+ ZooCache getManagerLockCache();
+
+ GarbageCollectionLogger getGcLogger();
+
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 910c4e9303..6508d9e718 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -170,7 +170,7 @@ import com.google.common.collect.Iterators;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
-public class TabletServer extends AbstractServer {
+public class TabletServer extends AbstractServer implements TabletHostingServer {
private static final SecureRandom random = new SecureRandom();
private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
@@ -188,6 +188,7 @@ public class TabletServer extends AbstractServer {
TabletServerMinCMetrics mincMetrics;
CompactionExecutorsMetrics ceMetrics;
+ @Override
public TabletServerScanMetrics getScanMetrics() {
return scanMetrics;
}
@@ -422,6 +423,7 @@ public class TabletServer extends AbstractServer {
return totalQueuedMutationSize.addAndGet(additionalMutationSize);
}
+ @Override
public Session getSession(long sessionId) {
return sessionManager.getSession(sessionId);
}
@@ -651,10 +653,21 @@ public class TabletServer extends AbstractServer {
}
}
+ @Override
public ServiceLock getLock() {
return tabletServerLock;
}
+ @Override
+ public ZooCache getManagerLockCache() {
+ return managerLockCache;
+ }
+
+ @Override
+ public GarbageCollectionLogger getGcLogger() {
+ return gcLogger;
+ }
+
private void announceExistence() {
ZooReaderWriter zoo = getContext().getZooReaderWriter();
try {
@@ -1204,6 +1217,7 @@ public class TabletServer extends AbstractServer {
return logId;
}
+ @Override
public TableConfiguration getTableConfiguration(KeyExtent extent) {
return getContext().getTableConfiguration(extent.tableId());
}
@@ -1227,10 +1241,21 @@ public class TabletServer extends AbstractServer {
return onlineTablets.snapshot();
}
+ @Override
public Tablet getOnlineTablet(KeyExtent extent) {
return onlineTablets.snapshot().get(extent);
}
+ @Override
+ public SessionManager getSessionManager() {
+ return sessionManager;
+ }
+
+ @Override
+ public TabletServerResourceManager getResourceManager() {
+ return resourceManager;
+ }
+
public VolumeManager getVolumeManager() {
return getContext().getVolumeManager();
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
index 4e071cba5e..7c0c47fea2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
@@ -88,13 +88,13 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
private static final Logger log = LoggerFactory.getLogger(ThriftScanClientHandler.class);
- private final TabletServer server;
+ private final TabletHostingServer server;
protected final ServerContext context;
protected final SecurityOperation security;
private final WriteTracker writeTracker;
private final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS;
- public ThriftScanClientHandler(TabletServer server, WriteTracker writeTracker) {
+ public ThriftScanClientHandler(TabletHostingServer server, WriteTracker writeTracker) {
this.server = server;
this.context = server.getContext();
this.writeTracker = writeTracker;
@@ -185,7 +185,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
scanSession.scanner =
tablet.createScanner(new Range(range), scanParams, scanSession.interruptFlag);
- long sid = server.sessionManager.createSession(scanSession, true);
+ long sid = server.getSessionManager().createSession(scanSession, true);
ScanResult scanResult;
try {
@@ -194,7 +194,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
log.error("The impossible happened", e);
throw new RuntimeException();
} finally {
- server.sessionManager.unreserveSession(sid);
+ server.getSessionManager().unreserveSession(sid);
}
return new InitialScan(sid, scanResult);
@@ -205,7 +205,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
NotServingTabletException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
TSampleNotPresentException {
SingleScanSession scanSession =
- (SingleScanSession) server.sessionManager.reserveSession(scanID);
+ (SingleScanSession) server.getSessionManager().reserveSession(scanID);
if (scanSession == null) {
throw new NoSuchScanIDException();
}
@@ -213,7 +213,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
try {
return continueScan(tinfo, scanID, scanSession);
} finally {
- server.sessionManager.unreserveSession(scanSession);
+ server.getSessionManager().unreserveSession(scanSession);
}
}
@@ -224,7 +224,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag);
- server.resourceManager.executeReadAhead(scanSession.extent,
+ server.getResourceManager().executeReadAhead(scanSession.extent,
getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask);
}
@@ -234,7 +234,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
TimeUnit.MILLISECONDS);
scanSession.nextBatchTask = null;
} catch (ExecutionException e) {
- server.sessionManager.removeSession(scanID);
+ server.getSessionManager().removeSession(scanID);
if (e.getCause() instanceof NotServingTabletException) {
throw (NotServingTabletException) e.getCause();
} else if (e.getCause() instanceof TooManyFilesException) {
@@ -251,7 +251,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
throw new RuntimeException(e);
}
} catch (CancellationException ce) {
- server.sessionManager.removeSession(scanID);
+ server.getSessionManager().removeSession(scanID);
Tablet tablet = server.getOnlineTablet(scanSession.extent);
if (tablet == null || tablet.isClosed()) {
throw new NotServingTabletException(scanSession.extent.toThrift());
@@ -261,10 +261,10 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
} catch (TimeoutException e) {
List<TKeyValue> param = Collections.emptyList();
long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
- server.sessionManager.removeIfNotAccessed(scanID, timeout);
+ server.getSessionManager().removeIfNotAccessed(scanID, timeout);
return new ScanResult(param, true);
} catch (Exception t) {
- server.sessionManager.removeSession(scanID);
+ server.getSessionManager().removeSession(scanID);
log.warn("Failed to get next batch", t);
throw new RuntimeException(t);
}
@@ -279,7 +279,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
// start reading next batch while current batch is transmitted
// to client
scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag);
- server.resourceManager.executeReadAhead(scanSession.extent,
+ server.getResourceManager().executeReadAhead(scanSession.extent,
getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask);
}
@@ -292,7 +292,8 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
@Override
public void closeScan(TInfo tinfo, long scanID) {
- final SingleScanSession ss = (SingleScanSession) server.sessionManager.removeSession(scanID);
+ final SingleScanSession ss =
+ (SingleScanSession) server.getSessionManager().removeSession(scanID);
if (ss != null) {
long t2 = System.currentTimeMillis();
@@ -302,8 +303,8 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
(t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
}
- server.scanMetrics.addScan(t2 - ss.startTime);
- server.scanMetrics.addResult(ss.entriesReturned);
+ server.getScanMetrics().addScan(t2 - ss.startTime);
+ server.getScanMetrics().addResult(ss.entriesReturned);
}
}
@@ -373,13 +374,13 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
mss.numRanges += ranges.size();
}
- long sid = server.sessionManager.createSession(mss, true);
+ long sid = server.getSessionManager().createSession(mss, true);
MultiScanResult result;
try {
result = continueMultiScan(sid, mss);
} finally {
- server.sessionManager.unreserveSession(sid);
+ server.getSessionManager().unreserveSession(sid);
}
return new InitialMultiScan(sid, result);
@@ -389,7 +390,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
public MultiScanResult continueMultiScan(TInfo tinfo, long scanID)
throws NoSuchScanIDException, TSampleNotPresentException {
- MultiScanSession session = (MultiScanSession) server.sessionManager.reserveSession(scanID);
+ MultiScanSession session = (MultiScanSession) server.getSessionManager().reserveSession(scanID);
if (session == null) {
throw new NoSuchScanIDException();
@@ -398,7 +399,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
try {
return continueMultiScan(scanID, session);
} finally {
- server.sessionManager.unreserveSession(session);
+ server.getSessionManager().unreserveSession(session);
}
}
@@ -407,7 +408,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(server, scanID);
- server.resourceManager.executeReadAhead(session.threadPoolExtent,
+ server.getResourceManager().executeReadAhead(session.threadPoolExtent,
getScanDispatcher(session.threadPoolExtent), session, session.lookupTask);
}
@@ -417,7 +418,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
session.lookupTask = null;
return scanResult;
} catch (ExecutionException e) {
- server.sessionManager.removeSession(scanID);
+ server.getSessionManager().removeSession(scanID);
if (e.getCause() instanceof SampleNotPresentException) {
throw new TSampleNotPresentException();
} else {
@@ -426,13 +427,13 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
}
} catch (TimeoutException e1) {
long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
- server.sessionManager.removeIfNotAccessed(scanID, timeout);
+ server.getSessionManager().removeIfNotAccessed(scanID, timeout);
List<TKeyValue> results = Collections.emptyList();
Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
List<TKeyExtent> fullScans = Collections.emptyList();
return new MultiScanResult(results, failures, fullScans, null, null, false, true);
} catch (Exception t) {
- server.sessionManager.removeSession(scanID);
+ server.getSessionManager().removeSession(scanID);
log.warn("Failed to get multiscan result", t);
throw new RuntimeException(t);
}
@@ -440,7 +441,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
@Override
public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
- MultiScanSession session = (MultiScanSession) server.sessionManager.removeSession(scanID);
+ MultiScanSession session = (MultiScanSession) server.getSessionManager().removeSession(scanID);
if (session == null) {
throw new NoSuchScanIDException();
}
@@ -466,7 +467,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface {
throw e;
}
- return server.sessionManager.getActiveScans();
+ return server.getSessionManager().getActiveScans();
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index 10786f7061..3131196fb9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -40,7 +40,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.TabletHostingServer;
import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.tablet.KVEntry;
import org.apache.accumulo.tserver.tablet.Tablet;
@@ -54,7 +54,7 @@ public class LookupTask extends ScanTask<MultiScanResult> {
private final long scanID;
- public LookupTask(TabletServer server, long scanID) {
+ public LookupTask(TabletHostingServer server, long scanID) {
super(server);
this.scanID = scanID;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index 14ecf68c02..6a1030dfaf 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.server.fs.TooManyFilesException;
-import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.TabletHostingServer;
import org.apache.accumulo.tserver.session.SingleScanSession;
import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.apache.accumulo.tserver.tablet.Tablet;
@@ -38,7 +38,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> {
private final long scanID;
- public NextBatchTask(TabletServer server, long scanID, AtomicBoolean interruptFlag) {
+ public NextBatchTask(TabletHostingServer server, long scanID, AtomicBoolean interruptFlag) {
super(server);
this.scanID = scanID;
this.interruptFlag = interruptFlag;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
index d814410ec2..a8469e04cc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
@@ -28,11 +28,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.TabletHostingServer;
public abstract class ScanTask<T> implements RunnableFuture<T> {
- protected final TabletServer server;
+ protected final TabletHostingServer server;
protected AtomicBoolean interruptFlag;
protected ArrayBlockingQueue<Object> resultQueue;
protected AtomicInteger state;
@@ -42,7 +42,7 @@ public abstract class ScanTask<T> implements RunnableFuture<T> {
private static final int ADDED = 2;
private static final int CANCELED = 3;
- ScanTask(TabletServer server) {
+ ScanTask(TabletHostingServer server) {
this.server = server;
interruptFlag = new AtomicBoolean(false);
runState = new AtomicReference<>(ScanRunState.QUEUED);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 54129ccee5..e2411c3458 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -57,7 +57,7 @@ class ScanDataSource implements DataSource {
private static final Logger log = LoggerFactory.getLogger(ScanDataSource.class);
// data source state
- private final Tablet tablet;
+ private final TabletBase tablet;
private ScanFileManager fileManager;
private SortedKeyValueIterator<Key,Value> iter;
private long expectedDeletionCount;
@@ -70,7 +70,7 @@ class ScanDataSource implements DataSource {
private final boolean loadIters;
private final byte[] defaultLabels;
- ScanDataSource(Tablet tablet, ScanParameters scanParams, boolean loadIters,
+ ScanDataSource(TabletBase tablet, ScanParameters scanParams, boolean loadIters,
AtomicBoolean interruptFlag) {
this.tablet = tablet;
this.expectedDeletionCount = tablet.getDataSourceDeletions();
@@ -91,14 +91,14 @@ class ScanDataSource implements DataSource {
else {
// log.debug("Switching data sources during a scan");
if (memIters != null) {
- tablet.getTabletMemory().returnIterators(memIters);
+ tablet.returnMemIterators(memIters);
memIters = null;
- tablet.getDatafileManager().returnFilesForScan(fileReservationId);
+ tablet.returnFilesForScan(fileReservationId);
fileReservationId = -1;
}
if (fileManager != null) {
- tablet.getTabletServer().getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles());
+ tablet.getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles());
fileManager.releaseOpenFiles(false);
}
@@ -142,7 +142,7 @@ class ScanDataSource implements DataSource {
// only acquire the file manager when we know the tablet is open
if (fileManager == null) {
fileManager = tablet.getTabletResources().newScanFileManager(scanParams.getScanDispatch());
- tablet.getTabletServer().getScanMetrics().incrementOpenFiles(fileManager.getNumOpenFiles());
+ tablet.getScanMetrics().incrementOpenFiles(fileManager.getNumOpenFiles());
tablet.addActiveScans(this);
}
@@ -153,9 +153,8 @@ class ScanDataSource implements DataSource {
// getIterators() throws an exception
expectedDeletionCount = tablet.getDataSourceDeletions();
- memIters = tablet.getTabletMemory().getIterators(samplerConfig);
- Pair<Long,Map<TabletFile,DataFileValue>> reservation =
- tablet.getDatafileManager().reserveFilesForScan();
+ memIters = tablet.getMemIterators(samplerConfig);
+ Pair<Long,Map<TabletFile,DataFileValue>> reservation = tablet.reserveFilesForScan();
fileReservationId = reservation.getFirst();
files = reservation.getSecond();
}
@@ -173,10 +172,9 @@ class ScanDataSource implements DataSource {
MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent());
- TabletIteratorEnvironment iterEnv =
- new TabletIteratorEnvironment(tablet.getTabletServer().getContext(), IteratorScope.scan,
- tablet.getTableConfiguration(), tablet.getExtent().tableId(), fileManager, files,
- scanParams.getAuthorizations(), samplerConfig, new ArrayList<>());
+ TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(tablet.getContext(),
+ IteratorScope.scan, tablet.getTableConfiguration(), tablet.getExtent().tableId(),
+ fileManager, files, scanParams.getAuthorizations(), samplerConfig, new ArrayList<>());
statsIterator =
new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter());
@@ -229,12 +227,13 @@ class ScanDataSource implements DataSource {
}
}
- void close(boolean sawErrors) {
+ @Override
+ public void close(boolean sawErrors) {
if (memIters != null) {
- tablet.getTabletMemory().returnIterators(memIters);
+ tablet.returnMemIterators(memIters);
memIters = null;
- tablet.getDatafileManager().returnFilesForScan(fileReservationId);
+ tablet.returnFilesForScan(fileReservationId);
fileReservationId = -1;
}
@@ -244,7 +243,7 @@ class ScanDataSource implements DataSource {
}
if (fileManager != null) {
- tablet.getTabletServer().getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles());
+ tablet.getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles());
fileManager.releaseOpenFiles(sawErrors);
fileManager = null;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index b09407774b..d25068bf23 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
public class Scanner {
private static final Logger log = LoggerFactory.getLogger(Scanner.class);
- private final Tablet tablet;
+ private final TabletBase tablet;
private final ScanParameters scanParams;
private Range range;
private SortedKeyValueIterator<Key,Value> isolatedIter;
@@ -55,7 +55,7 @@ public class Scanner {
private AtomicBoolean interruptFlag;
- Scanner(Tablet tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) {
+ Scanner(TabletBase tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) {
this.tablet = tablet;
this.range = range;
this.scanParams = scanParams;
@@ -87,10 +87,10 @@ public class Scanner {
if (scanParams.isIsolated()) {
if (isolatedDataSource == null)
- isolatedDataSource = new ScanDataSource(tablet, scanParams, true, interruptFlag);
+ isolatedDataSource = tablet.createDataSource(scanParams, true, interruptFlag);
dataSource = isolatedDataSource;
} else {
- dataSource = new ScanDataSource(tablet, scanParams, true, interruptFlag);
+ dataSource = tablet.createDataSource(scanParams, true, interruptFlag);
}
SortedKeyValueIterator<Key,Value> iter;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index eefc71cfa5..027af6a408 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.tserver.tablet;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
@@ -56,7 +55,6 @@ import org.apache.accumulo.core.clientImpl.UserCompactionUtils;
import org.apache.accumulo.core.conf.AccumuloConfiguration.Deriver;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.Violations;
-import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -66,8 +64,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.YieldCallback;
-import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator;
import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.manager.state.tables.TableState;
@@ -84,21 +80,17 @@ import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
import org.apache.accumulo.core.spi.scan.ScanDispatch;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionStats;
-import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.fs.TooManyFilesException;
import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles;
@@ -124,6 +116,7 @@ import org.apache.accumulo.tserver.compactions.Compactable;
import org.apache.accumulo.tserver.constraints.ConstraintChecker;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.scan.ScanParameters;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
@@ -144,17 +137,12 @@ import io.opentelemetry.context.Scope;
/**
* Provide access to a single row range in a living TabletServer.
*/
-public class Tablet {
+public class Tablet extends TabletBase {
private static final Logger log = LoggerFactory.getLogger(Tablet.class);
- private static final byte[] EMPTY_BYTES = new byte[0];
-
private final TabletServer tabletServer;
- private final ServerContext context;
- private final KeyExtent extent;
private final TabletResourceManager tabletResources;
private final DatafileManager datafileManager;
- private final TableConfiguration tableConfiguration;
private final String dirName;
private final TabletMemory tabletMemory;
@@ -168,12 +156,11 @@ public class Tablet {
private final AtomicLong dataSourceDeletions = new AtomicLong(0);
+ @Override
public long getDataSourceDeletions() {
return dataSourceDeletions.get();
}
- private final Set<ScanDataSource> activeScans = new HashSet<>();
-
private enum CloseState {
OPEN, CLOSING, CLOSED, COMPLETE
}
@@ -182,8 +169,8 @@ public class Tablet {
private boolean updatingFlushID = false;
- private long lastFlushID = -1;
- private long lastCompactID = -1;
+ private AtomicLong lastFlushID = new AtomicLong(-1);
+ private AtomicLong lastCompactID = new AtomicLong(-1);
private static class CompactionWaitInfo {
long flushID = -1;
@@ -212,12 +199,8 @@ public class Tablet {
* Counts are maintained in this object and reported out with the Micrometer metrics via
* TabletServerMetricsUtil
*/
- private long lookupCount = 0;
- private long queryResultCount = 0;
- private long queryResultBytes = 0;
private long ingestCount = 0;
private long ingestBytes = 0;
- private final AtomicLong scannedCount = new AtomicLong(0);
/**
* Rates are calculated here in the Tablet for use in the Monitor but we do not emit them as
@@ -229,8 +212,6 @@ public class Tablet {
private final Rate ingestByteRate = new Rate(0.95);
private final Rate scannedRate = new Rate(0.95);
- private final Deriver<byte[]> defaultSecurityLabel;
-
private long lastMinorCompactionFinishTime = 0;
private long lastMapFileImportTime = 0;
@@ -303,27 +284,18 @@ public class Tablet {
final TabletResourceManager trm, TabletData data)
throws IOException, IllegalArgumentException {
+ super(tabletServer.getContext(), extent);
+
this.tabletServer = tabletServer;
- this.context = tabletServer.getContext();
- this.extent = extent;
this.tabletResources = trm;
this.lastLocation = data.getLastLocation();
- this.lastFlushID = data.getFlushID();
- this.lastCompactID = data.getCompactID();
+ this.lastFlushID.set(data.getFlushID());
+ this.lastCompactID.set(data.getCompactID());
this.splitCreationTime = data.getSplitTime();
this.tabletTime = TabletTime.getInstance(data.getTime());
this.persistedTime = tabletTime.getTime();
this.logId = tabletServer.createLogId();
- TableConfiguration tblConf = tabletServer.getTableConfiguration(extent);
- if (tblConf == null) {
- tabletServer.getContext().clearTableListCache();
- tblConf = tabletServer.getTableConfiguration(extent);
- requireNonNull(tblConf, "Could not get table configuration for " + extent.tableId());
- }
-
- this.tableConfiguration = tblConf;
-
// translate any volume changes
@SuppressWarnings("deprecation")
boolean replicationEnabled = org.apache.accumulo.core.replication.ReplicationConfigurationUtil
@@ -344,14 +316,6 @@ public class Tablet {
constraintChecker = tableConfiguration.newDeriver(ConstraintChecker::new);
- if (extent.isMeta()) {
- defaultSecurityLabel = () -> EMPTY_BYTES;
- } else {
- defaultSecurityLabel = tableConfiguration.newDeriver(
- conf -> new ColumnVisibility(conf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY))
- .getExpression());
- }
-
tabletMemory = new TabletMemory(this);
// don't bother examining WALs for recovery if Table is being deleted
@@ -446,10 +410,6 @@ public class Tablet {
data.getExternalCompactions());
}
- public ServerContext getContext() {
- return context;
- }
-
private void removeOldTemporaryFiles(
Map<ExternalCompactionId,ExternalCompactionMetadata> externalCompactions) {
// remove any temporary files created by a previous tablet server
@@ -481,152 +441,6 @@ public class Tablet {
}
}
- private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges,
- List<KVEntry> results, ScanParameters scanParams, long maxResultsSize) throws IOException {
-
- LookupResult lookupResult = new LookupResult();
-
- boolean exceededMemoryUsage = false;
- boolean tabletClosed = false;
-
- Set<ByteSequence> cfset = null;
- if (!scanParams.getColumnSet().isEmpty()) {
- cfset = LocalityGroupUtil.families(scanParams.getColumnSet());
- }
-
- long batchTimeOut = scanParams.getBatchTimeOut();
-
- long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
- long startNanos = System.nanoTime();
-
- if (batchTimeOut <= 0 || batchTimeOut == Long.MAX_VALUE) {
- batchTimeOut = 0;
- }
-
- // determine if the iterator supported yielding
- YieldCallback<Key> yield = new YieldCallback<>();
- mmfi.enableYielding(yield);
- boolean yielded = false;
-
- for (Range range : ranges) {
-
- boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun;
-
- if (exceededMemoryUsage || tabletClosed || timesUp || yielded) {
- lookupResult.unfinishedRanges.add(range);
- continue;
- }
-
- int entriesAdded = 0;
-
- try {
- if (cfset != null) {
- mmfi.seek(range, cfset, true);
- } else {
- mmfi.seek(range, Set.of(), false);
- }
-
- while (mmfi.hasTop()) {
- if (yield.hasYielded()) {
- throw new IOException("Coding error: hasTop returned true but has yielded at "
- + yield.getPositionAndReset());
- }
- Key key = mmfi.getTopKey();
-
- KVEntry kve = new KVEntry(key, mmfi.getTopValue());
- results.add(kve);
- entriesAdded++;
- lookupResult.bytesAdded += kve.estimateMemoryUsed();
- lookupResult.dataSize += kve.numBytes();
-
- exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
-
- timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun;
-
- if (exceededMemoryUsage || timesUp) {
- addUnfinishedRange(lookupResult, range, key);
- break;
- }
-
- mmfi.next();
- }
-
- if (yield.hasYielded()) {
- yielded = true;
- Key yieldPosition = yield.getPositionAndReset();
- if (!range.contains(yieldPosition)) {
- throw new IOException("Underlying iterator yielded to a position outside of its range: "
- + yieldPosition + " not in " + range);
- }
- if (!results.isEmpty()
- && yieldPosition.compareTo(results.get(results.size() - 1).getKey()) <= 0) {
- throw new IOException("Underlying iterator yielded to a position"
- + " that does not follow the last key returned: " + yieldPosition + " <= "
- + results.get(results.size() - 1).getKey());
- }
- addUnfinishedRange(lookupResult, range, yieldPosition);
-
- log.debug("Scan yield detected at position " + yieldPosition);
- getTabletServer().getScanMetrics().addYield(1);
- }
- } catch (TooManyFilesException tmfe) {
- // treat this as a closed tablet, and let the client retry
- log.warn("Tablet {} has too many files, batch lookup can not run", getExtent());
- handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
- entriesAdded);
- tabletClosed = true;
- } catch (IOException ioe) {
- if (ShutdownUtil.isShutdownInProgress()) {
- // assume HDFS shutdown hook caused this exception
- log.debug("IOException while shutdown in progress", ioe);
- handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
- entriesAdded);
- tabletClosed = true;
- } else {
- throw ioe;
- }
- } catch (IterationInterruptedException iie) {
- if (isClosed()) {
- handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
- entriesAdded);
- tabletClosed = true;
- } else {
- throw iie;
- }
- } catch (TabletClosedException tce) {
- handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
- entriesAdded);
- tabletClosed = true;
- }
-
- }
-
- return lookupResult;
- }
-
- private void handleTabletClosedDuringScan(List<KVEntry> results, LookupResult lookupResult,
- boolean exceededMemoryUsage, Range range, int entriesAdded) {
- if (exceededMemoryUsage) {
- throw new IllegalStateException(
- "Tablet " + extent + "should not exceed memory usage or close, not both");
- }
-
- if (entriesAdded > 0) {
- addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey());
- } else {
- lookupResult.unfinishedRanges.add(range);
- }
-
- lookupResult.closed = true;
- }
-
- private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key) {
- if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
- Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive());
- lookupResult.unfinishedRanges.add(nlur);
- }
- }
-
public void checkConditions(ConditionChecker checker, Authorizations authorizations,
AtomicBoolean iFlag) throws IOException {
@@ -634,7 +448,7 @@ public class Tablet {
null, false, null, -1, null);
scanParams.setScanDispatch(ScanDispatch.builder().build());
- ScanDataSource dataSource = new ScanDataSource(this, scanParams, false, iFlag);
+ ScanDataSource dataSource = createDataSource(scanParams, false, iFlag);
try {
SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
@@ -649,149 +463,6 @@ public class Tablet {
}
}
- public LookupResult lookup(List<Range> ranges, List<KVEntry> results, ScanParameters scanParams,
- long maxResultSize, AtomicBoolean interruptFlag) throws IOException {
-
- if (ranges.isEmpty()) {
- return new LookupResult();
- }
-
- ranges = Range.mergeOverlapping(ranges);
- if (ranges.size() > 1) {
- Collections.sort(ranges);
- }
-
- Range tabletRange = extent.toDataRange();
- for (Range range : ranges) {
- // do a test to see if this range falls within the tablet, if it does not
- // then clip will throw an exception
- tabletRange.clip(range);
- }
-
- ScanDataSource dataSource = new ScanDataSource(this, scanParams, true, interruptFlag);
-
- LookupResult result = null;
-
- try {
- SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
- lookupCount++;
- result = lookup(iter, ranges, results, scanParams, maxResultSize);
- return result;
- } catch (IOException ioe) {
- dataSource.close(true);
- throw ioe;
- } finally {
- // code in finally block because always want
- // to return mapfiles, even when exception is thrown
- dataSource.close(false);
-
- synchronized (this) {
- queryResultCount += results.size();
- if (result != null) {
- queryResultBytes += result.dataSize;
- }
- }
- }
- }
-
- Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams)
- throws IOException {
-
- // log.info("In nextBatch..");
-
- long batchTimeOut = scanParams.getBatchTimeOut();
-
- long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
- long startNanos = System.nanoTime();
-
- if (batchTimeOut == Long.MAX_VALUE || batchTimeOut <= 0) {
- batchTimeOut = 0;
- }
- List<KVEntry> results = new ArrayList<>();
- Key key = null;
-
- Value value;
- long resultSize = 0L;
- long resultBytes = 0L;
-
- long maxResultsSize = tableConfiguration.getAsBytes(Property.TABLE_SCAN_MAXMEM);
-
- Key continueKey = null;
- boolean skipContinueKey = false;
-
- YieldCallback<Key> yield = new YieldCallback<>();
-
- // we cannot yield if we are in isolation mode
- if (!scanParams.isIsolated()) {
- iter.enableYielding(yield);
- }
-
- if (scanParams.getColumnSet().isEmpty()) {
- iter.seek(range, Set.of(), false);
- } else {
- iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true);
- }
-
- while (iter.hasTop()) {
- if (yield.hasYielded()) {
- throw new IOException(
- "Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset());
- }
- value = iter.getTopValue();
- key = iter.getTopKey();
-
- KVEntry kvEntry = new KVEntry(key, value); // copies key and value
- results.add(kvEntry);
- resultSize += kvEntry.estimateMemoryUsed();
- resultBytes += kvEntry.numBytes();
-
- boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun;
-
- if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) {
- continueKey = new Key(key);
- skipContinueKey = true;
- break;
- }
-
- iter.next();
- }
-
- if (yield.hasYielded()) {
- continueKey = new Key(yield.getPositionAndReset());
- skipContinueKey = true;
- if (!range.contains(continueKey)) {
- throw new IOException("Underlying iterator yielded to a position outside of its range: "
- + continueKey + " not in " + range);
- }
- if (!results.isEmpty()
- && continueKey.compareTo(results.get(results.size() - 1).getKey()) <= 0) {
- throw new IOException(
- "Underlying iterator yielded to a position that does not follow the last key returned: "
- + continueKey + " <= " + results.get(results.size() - 1).getKey());
- }
-
- log.debug("Scan yield detected at position " + continueKey);
- getTabletServer().getScanMetrics().addYield(1);
- } else if (!iter.hasTop()) {
- // end of tablet has been reached
- continueKey = null;
- if (results.isEmpty()) {
- results = null;
- }
- }
-
- return new Batch(skipContinueKey, results, continueKey, resultBytes);
- }
-
- public Scanner createScanner(Range range, ScanParameters scanParams,
- AtomicBoolean interruptFlag) {
- // do a test to see if this range falls within the tablet, if it does not
- // then clip will throw an exception
- extent.toDataRange().clip(range);
-
- return new Scanner(this, range, scanParams, interruptFlag);
- }
-
DataFileValue minorCompact(InMemoryMap memTable, TabletFile tmpDatafile, TabletFile newDatafile,
long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
boolean failed = false;
@@ -879,7 +550,7 @@ public class Tablet {
return;
}
- if (lastFlushID >= tableFlushID) {
+ if (lastFlushID.get() >= tableFlushID) {
return;
}
@@ -889,7 +560,7 @@ public class Tablet {
}
if (getTabletMemory().getMemTable().getNumEntries() == 0) {
- lastFlushID = tableFlushID;
+ lastFlushID.set(tableFlushID);
updatingFlushID = true;
updateMetadata = true;
} else {
@@ -1216,6 +887,7 @@ public class Tablet {
* Closes the mapfiles associated with a Tablet. If saveState is true, a minor compaction is
* performed.
*/
+ @Override
public void close(boolean saveState) throws IOException {
initiateClose(saveState);
completeClose(saveState, true);
@@ -1402,7 +1074,7 @@ public class Tablet {
}
tabletMeta.getFlushId().ifPresent(flushId -> {
- if (flushId != lastFlushID) {
+ if (flushId != lastFlushID.get()) {
String msg = "Closed tablet " + extent + " lastFlushID is inconsistent with metadata : "
+ flushId + " != " + lastFlushID;
log.error(msg);
@@ -1411,7 +1083,7 @@ public class Tablet {
});
tabletMeta.getCompactId().ifPresent(compactId -> {
- if (compactId != lastCompactID) {
+ if (compactId != lastCompactID.get()) {
String msg = "Closed tablet " + extent + " lastCompactID is inconsistent with metadata : "
+ compactId + " != " + lastCompactID;
log.error(msg);
@@ -1672,10 +1344,6 @@ public class Tablet {
return findSplitRow(getDatafileManager().getFiles()) != null;
}
- public KeyExtent getExtent() {
- return extent;
- }
-
synchronized void computeNumEntries() {
Collection<DataFileValue> vals = getDatafileManager().getDatafileSizes().values();
@@ -1704,6 +1372,7 @@ public class Tablet {
return closeState == CloseState.CLOSING;
}
+ @Override
public boolean isClosed() {
// Assign to a local var to avoid race conditions since closeState is volatile and two
// comparisons are done.
@@ -1822,17 +1491,17 @@ public class Tablet {
MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio,
getTabletServer().getContext(), getTabletServer().getLock(), ecids);
ManagerMetadataUtil.addNewTablet(getTabletServer().getContext(), low, lowDirectoryName,
- getTabletServer().getTabletSession(), lowDatafileSizes, bulkImported, time, lastFlushID,
- lastCompactID, getTabletServer().getLock());
+ getTabletServer().getTabletSession(), lowDatafileSizes, bulkImported, time,
+ lastFlushID.get(), lastCompactID.get(), getTabletServer().getLock());
MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove,
getTabletServer().getContext(), getTabletServer().getLock());
TabletLogger.split(extent, low, high, getTabletServer().getTabletSession());
- newTablets.put(high, new TabletData(dirName, highDatafileSizes, time, lastFlushID,
- lastCompactID, lastLocation, bulkImported));
- newTablets.put(low, new TabletData(lowDirectoryName, lowDatafileSizes, time, lastFlushID,
- lastCompactID, lastLocation, bulkImported));
+ newTablets.put(high, new TabletData(dirName, highDatafileSizes, time, lastFlushID.get(),
+ lastCompactID.get(), lastLocation, bulkImported));
+ newTablets.put(low, new TabletData(lowDirectoryName, lowDatafileSizes, time,
+ lastFlushID.get(), lastCompactID.get(), lastLocation, bulkImported));
long t2 = System.currentTimeMillis();
@@ -1843,10 +1512,16 @@ public class Tablet {
}
}
+ @Override
public SortedMap<StoredTabletFile,DataFileValue> getDatafiles() {
return getDatafileManager().getDatafileSizes();
}
+ @Override
+ public void addToYieldMetric(int i) {
+ getTabletServer().getScanMetrics().addYield(i);
+ }
+
public double queryRate() {
return queryRate.rate();
}
@@ -1868,7 +1543,7 @@ public class Tablet {
}
public long totalQueriesResults() {
- return this.queryResultCount;
+ return this.queryResultCount.get();
}
public long totalIngest() {
@@ -1880,7 +1555,7 @@ public class Tablet {
}
public long totalQueryResultsBytes() {
- return this.queryResultBytes;
+ return this.queryResultBytes.get();
}
public long totalScannedCount() {
@@ -1888,13 +1563,13 @@ public class Tablet {
}
public long totalLookupCount() {
- return this.lookupCount;
+ return this.lookupCount.get();
}
// synchronized?
public void updateRates(long now) {
- queryRate.update(now, queryResultCount);
- queryByteRate.update(now, queryResultBytes);
+ queryRate.update(now, queryResultCount.get());
+ queryByteRate.update(now, queryResultBytes.get());
ingestRate.update(now, ingestCount);
ingestByteRate.update(now, ingestBytes);
scannedRate.update(now, scannedCount.get());
@@ -2206,19 +1881,19 @@ public class Tablet {
public void compactAll(long compactionId, CompactionConfig compactionConfig) {
synchronized (this) {
- if (lastCompactID >= compactionId) {
+ if (lastCompactID.get() >= compactionId) {
return;
}
if (isMinorCompactionRunning()) {
// want to wait for running minc to finish before starting majc, see ACCUMULO-3041
if (compactionWaitInfo.compactionID == compactionId) {
- if (lastFlushID == compactionWaitInfo.flushID) {
+ if (lastFlushID.get() == compactionWaitInfo.flushID) {
return;
}
} else {
compactionWaitInfo.compactionID = compactionId;
- compactionWaitInfo.flushID = lastFlushID;
+ compactionWaitInfo.flushID = lastFlushID.get();
return;
}
}
@@ -2232,10 +1907,6 @@ public class Tablet {
compactable.initiateUserCompaction(compactionId, compactionConfig);
}
- public TableConfiguration getTableConfiguration() {
- return tableConfiguration;
- }
-
public Durability getDurability() {
return DurabilityImpl.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY));
}
@@ -2248,11 +1919,6 @@ public class Tablet {
return dataSourceDeletions.incrementAndGet();
}
- public synchronized void updateQueryStats(int size, long numBytes) {
- queryResultCount += size;
- queryResultBytes += numBytes;
- }
-
public void updateTimer(Operation operation, long queued, long start, long count,
boolean failed) {
timer.updateTime(operation, queued, start, count, failed);
@@ -2298,14 +1964,30 @@ public class Tablet {
}
+ @Override
TabletResourceManager getTabletResources() {
return tabletResources;
}
+ @Override
+ public TabletServerScanMetrics getScanMetrics() {
+ return getTabletServer().getScanMetrics();
+ }
+
DatafileManager getDatafileManager() {
return datafileManager;
}
+ @Override
+ public Pair<Long,Map<TabletFile,DataFileValue>> reserveFilesForScan() {
+ return getDatafileManager().reserveFilesForScan();
+ }
+
+ @Override
+ public void returnFilesForScan(long scanId) {
+ getDatafileManager().returnFilesForScan(scanId);
+ }
+
public MetadataUpdateCount getUpdateCount() {
return getDatafileManager().getUpdateCount();
}
@@ -2314,19 +1996,25 @@ public class Tablet {
return tabletMemory;
}
- public long getAndUpdateTime() {
- return tabletTime.getAndUpdateTime();
+ @Override
+ public List<InMemoryMap.MemoryIterator> getMemIterators(SamplerConfigurationImpl samplerConfig) {
+ return getTabletMemory().getIterators(samplerConfig);
}
- public byte[] getDefaultSecurityLabels() {
- return defaultSecurityLabel.derive();
+ @Override
+ public void returnMemIterators(List<InMemoryMap.MemoryIterator> iters) {
+ getTabletMemory().returnIterators(iters);
+ }
+
+ public long getAndUpdateTime() {
+ return tabletTime.getAndUpdateTime();
}
public void flushComplete(long flushId) {
lastLocation = null;
dataSourceDeletions.incrementAndGet();
tabletMemory.finishedMinC();
- lastFlushID = flushId;
+ lastFlushID.set(flushId);
computeNumEntries();
}
@@ -2336,18 +2024,9 @@ public class Tablet {
return result;
}
- public synchronized void addActiveScans(ScanDataSource scanDataSource) {
- activeScans.add(scanDataSource);
- }
-
- public int removeScan(ScanDataSource scanDataSource) {
- activeScans.remove(scanDataSource);
- return activeScans.size();
- }
-
public synchronized void setLastCompactionID(Long compactionId) {
if (compactionId != null) {
- this.lastCompactID = compactionId;
+ this.lastCompactID.set(compactionId);
}
}
@@ -2367,10 +2046,6 @@ public class Tablet {
return timer.getTabletStats();
}
- public AtomicLong getScannedCounter() {
- return scannedCount;
- }
-
private static String createTabletDirectoryName(ServerContext context, Text endRow) {
if (endRow == null) {
return ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
new file mode 100644
index 0000000000..eb6cf8fc83
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
+import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.ShutdownUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.TooManyFilesException;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.scan.ScanParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class exists to share code for scanning a tablet between TabletHostingServer implementations
+ */
+public abstract class TabletBase {
+
+ private static final Logger log = LoggerFactory.getLogger(TabletBase.class);
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
+ protected final KeyExtent extent;
+ protected final ServerContext context;
+
+ protected AtomicLong lookupCount = new AtomicLong(0);
+ protected AtomicLong queryResultCount = new AtomicLong(0);
+ protected AtomicLong queryResultBytes = new AtomicLong(0);
+
+ protected final Set<ScanDataSource> activeScans = new HashSet<>();
+
+ private final AccumuloConfiguration.Deriver<byte[]> defaultSecurityLabel;
+
+ protected final TableConfiguration tableConfiguration;
+
+ protected final AtomicLong scannedCount = new AtomicLong(0);
+
+ public TabletBase(ServerContext context, KeyExtent extent) {
+ this.context = context;
+ this.extent = extent;
+
+ TableConfiguration tblConf = context.getTableConfiguration(extent.tableId());
+ if (tblConf == null) {
+ context.clearTableListCache();
+ tblConf = context.getTableConfiguration(extent.tableId());
+ requireNonNull(tblConf, "Could not get table configuration for " + extent.tableId());
+ }
+
+ this.tableConfiguration = tblConf;
+
+ if (extent.isMeta()) {
+ defaultSecurityLabel = () -> EMPTY_BYTES;
+ } else {
+ defaultSecurityLabel = tableConfiguration.newDeriver(
+ conf -> new ColumnVisibility(conf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY))
+ .getExpression());
+ }
+ }
+
+ public abstract boolean isClosed();
+
+ public abstract SortedMap<StoredTabletFile,DataFileValue> getDatafiles();
+
+ public abstract void addToYieldMetric(int i);
+
+ public abstract long getDataSourceDeletions();
+
+ abstract TabletServerResourceManager.TabletResourceManager getTabletResources();
+
+ public abstract List<InMemoryMap.MemoryIterator>
+ getMemIterators(SamplerConfigurationImpl samplerConfig);
+
+ public abstract void returnMemIterators(List<InMemoryMap.MemoryIterator> iters);
+
+ public abstract Pair<Long,Map<TabletFile,DataFileValue>> reserveFilesForScan();
+
+ public abstract void returnFilesForScan(long scanId);
+
+ public abstract TabletServerScanMetrics getScanMetrics();
+
+ protected ScanDataSource createDataSource(ScanParameters scanParams, boolean loadIters,
+ AtomicBoolean interruptFlag) {
+ return new ScanDataSource(this, scanParams, loadIters, interruptFlag);
+ }
+
+ public Scanner createScanner(Range range, ScanParameters scanParams,
+ AtomicBoolean interruptFlag) {
+ // do a test to see if this range falls within the tablet, if it does not
+ // then clip will throw an exception
+ extent.toDataRange().clip(range);
+
+ return new Scanner(this, range, scanParams, interruptFlag);
+ }
+
+ public AtomicLong getScannedCounter() {
+ return scannedCount;
+ }
+
+ public ServerContext getContext() {
+ return context;
+ }
+
+ public TableConfiguration getTableConfiguration() {
+ return tableConfiguration;
+ }
+
+ public KeyExtent getExtent() {
+ return extent;
+ }
+
+ public byte[] getDefaultSecurityLabels() {
+ return defaultSecurityLabel.derive();
+ }
+
+ public synchronized void addActiveScans(ScanDataSource scanDataSource) {
+ activeScans.add(scanDataSource);
+ }
+
+ public int removeScan(ScanDataSource scanDataSource) {
+ activeScans.remove(scanDataSource);
+ return activeScans.size();
+ }
+
+ public abstract void close(boolean b) throws IOException;
+
+ public Tablet.LookupResult lookup(List<Range> ranges, List<KVEntry> results,
+ ScanParameters scanParams, long maxResultSize, AtomicBoolean interruptFlag)
+ throws IOException {
+
+ if (ranges.isEmpty()) {
+ return new Tablet.LookupResult();
+ }
+
+ ranges = Range.mergeOverlapping(ranges);
+ if (ranges.size() > 1) {
+ Collections.sort(ranges);
+ }
+
+ Range tabletRange = getExtent().toDataRange();
+ for (Range range : ranges) {
+ // do a test to see if this range falls within the tablet, if it does not
+ // then clip will throw an exception
+ tabletRange.clip(range);
+ }
+
+ SourceSwitchingIterator.DataSource dataSource =
+ createDataSource(scanParams, true, interruptFlag);
+
+ Tablet.LookupResult result = null;
+
+ try {
+ SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
+ lookupCount.incrementAndGet();
+ result = lookup(iter, ranges, results, scanParams, maxResultSize);
+ return result;
+ } catch (IOException ioe) {
+ dataSource.close(true);
+ throw ioe;
+ } finally {
+ // code in finally block because always want
+ // to return mapfiles, even when exception is thrown
+ dataSource.close(false);
+
+ synchronized (this) {
+ queryResultCount.addAndGet(results.size());
+ if (result != null) {
+ queryResultBytes.addAndGet(result.dataSize);
+ }
+ }
+ }
+ }
+
+ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams)
+ throws IOException {
+
+ // log.info("In nextBatch..");
+
+ long batchTimeOut = scanParams.getBatchTimeOut();
+
+ long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
+ long startNanos = System.nanoTime();
+
+ if (batchTimeOut == Long.MAX_VALUE || batchTimeOut <= 0) {
+ batchTimeOut = 0;
+ }
+ List<KVEntry> results = new ArrayList<>();
+ Key key = null;
+
+ Value value;
+ long resultSize = 0L;
+ long resultBytes = 0L;
+
+ long maxResultsSize = getTableConfiguration().getAsBytes(Property.TABLE_SCAN_MAXMEM);
+
+ Key continueKey = null;
+ boolean skipContinueKey = false;
+
+ YieldCallback<Key> yield = new YieldCallback<>();
+
+ // we cannot yield if we are in isolation mode
+ if (!scanParams.isIsolated()) {
+ iter.enableYielding(yield);
+ }
+
+ if (scanParams.getColumnSet().isEmpty()) {
+ iter.seek(range, Set.of(), false);
+ } else {
+ iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true);
+ }
+
+ while (iter.hasTop()) {
+ if (yield.hasYielded()) {
+ throw new IOException(
+ "Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset());
+ }
+ value = iter.getTopValue();
+ key = iter.getTopKey();
+
+ KVEntry kvEntry = new KVEntry(key, value); // copies key and value
+ results.add(kvEntry);
+ resultSize += kvEntry.estimateMemoryUsed();
+ resultBytes += kvEntry.numBytes();
+
+ boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun;
+
+ if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) {
+ continueKey = new Key(key);
+ skipContinueKey = true;
+ break;
+ }
+
+ iter.next();
+ }
+
+ if (yield.hasYielded()) {
+ continueKey = new Key(yield.getPositionAndReset());
+ skipContinueKey = true;
+ if (!range.contains(continueKey)) {
+ throw new IOException("Underlying iterator yielded to a position outside of its range: "
+ + continueKey + " not in " + range);
+ }
+ if (!results.isEmpty()
+ && continueKey.compareTo(results.get(results.size() - 1).getKey()) <= 0) {
+ throw new IOException(
+ "Underlying iterator yielded to a position that does not follow the last key returned: "
+ + continueKey + " <= " + results.get(results.size() - 1).getKey());
+ }
+
+ log.debug("Scan yield detected at position " + continueKey);
+ addToYieldMetric(1);
+ } else if (!iter.hasTop()) {
+ // end of tablet has been reached
+ continueKey = null;
+ if (results.isEmpty()) {
+ results = null;
+ }
+ }
+
+ return new Batch(skipContinueKey, results, continueKey, resultBytes);
+ }
+
+ private Tablet.LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges,
+ List<KVEntry> results, ScanParameters scanParams, long maxResultsSize) throws IOException {
+
+ Tablet.LookupResult lookupResult = new Tablet.LookupResult();
+
+ boolean exceededMemoryUsage = false;
+ boolean tabletClosed = false;
+
+ Set<ByteSequence> cfset = null;
+ if (!scanParams.getColumnSet().isEmpty()) {
+ cfset = LocalityGroupUtil.families(scanParams.getColumnSet());
+ }
+
+ long batchTimeOut = scanParams.getBatchTimeOut();
+
+ long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
+ long startNanos = System.nanoTime();
+
+ if (batchTimeOut <= 0 || batchTimeOut == Long.MAX_VALUE) {
+ batchTimeOut = 0;
+ }
+
+ // determine if the iterator supported yielding
+ YieldCallback<Key> yield = new YieldCallback<>();
+ mmfi.enableYielding(yield);
+ boolean yielded = false;
+
+ for (Range range : ranges) {
+
+ boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun;
+
+ if (exceededMemoryUsage || tabletClosed || timesUp || yielded) {
+ lookupResult.unfinishedRanges.add(range);
+ continue;
+ }
+
+ int entriesAdded = 0;
+
+ try {
+ if (cfset != null) {
+ mmfi.seek(range, cfset, true);
+ } else {
+ mmfi.seek(range, Set.of(), false);
+ }
+
+ while (mmfi.hasTop()) {
+ if (yield.hasYielded()) {
+ throw new IOException("Coding error: hasTop returned true but has yielded at "
+ + yield.getPositionAndReset());
+ }
+ Key key = mmfi.getTopKey();
+
+ KVEntry kve = new KVEntry(key, mmfi.getTopValue());
+ results.add(kve);
+ entriesAdded++;
+ lookupResult.bytesAdded += kve.estimateMemoryUsed();
+ lookupResult.dataSize += kve.numBytes();
+
+ exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
+
+ timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun;
+
+ if (exceededMemoryUsage || timesUp) {
+ addUnfinishedRange(lookupResult, range, key);
+ break;
+ }
+
+ mmfi.next();
+ }
+
+ if (yield.hasYielded()) {
+ yielded = true;
+ Key yieldPosition = yield.getPositionAndReset();
+ if (!range.contains(yieldPosition)) {
+ throw new IOException("Underlying iterator yielded to a position outside of its range: "
+ + yieldPosition + " not in " + range);
+ }
+ if (!results.isEmpty()
+ && yieldPosition.compareTo(results.get(results.size() - 1).getKey()) <= 0) {
+ throw new IOException("Underlying iterator yielded to a position"
+ + " that does not follow the last key returned: " + yieldPosition + " <= "
+ + results.get(results.size() - 1).getKey());
+ }
+ addUnfinishedRange(lookupResult, range, yieldPosition);
+
+ log.debug("Scan yield detected at position " + yieldPosition);
+ addToYieldMetric(1);
+ }
+ } catch (TooManyFilesException tmfe) {
+ // treat this as a closed tablet, and let the client retry
+ log.warn("Tablet {} has too many files, batch lookup can not run", getExtent());
+ handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
+ entriesAdded);
+ tabletClosed = true;
+ } catch (IOException ioe) {
+ if (ShutdownUtil.isShutdownInProgress()) {
+ // assume HDFS shutdown hook caused this exception
+ log.debug("IOException while shutdown in progress", ioe);
+ handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
+ entriesAdded);
+ tabletClosed = true;
+ } else {
+ throw ioe;
+ }
+ } catch (IterationInterruptedException iie) {
+ if (isClosed()) {
+ handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
+ entriesAdded);
+ tabletClosed = true;
+ } else {
+ throw iie;
+ }
+ } catch (TabletClosedException tce) {
+ handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range,
+ entriesAdded);
+ tabletClosed = true;
+ }
+
+ }
+
+ return lookupResult;
+ }
+
+ private void handleTabletClosedDuringScan(List<KVEntry> results, Tablet.LookupResult lookupResult,
+ boolean exceededMemoryUsage, Range range, int entriesAdded) {
+ if (exceededMemoryUsage) {
+ throw new IllegalStateException(
+ "Tablet " + getExtent() + "should not exceed memory usage or close, not both");
+ }
+
+ if (entriesAdded > 0) {
+ addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey());
+ } else {
+ lookupResult.unfinishedRanges.add(range);
+ }
+
+ lookupResult.closed = true;
+ }
+
+ private void addUnfinishedRange(Tablet.LookupResult lookupResult, Range range, Key key) {
+ if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
+ Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive());
+ lookupResult.unfinishedRanges.add(nlur);
+ }
+ }
+
+ public synchronized void updateQueryStats(int size, long numBytes) {
+ queryResultCount.addAndGet(size);
+ queryResultBytes.addAndGet(numBytes);
+ }
+}