You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2022/09/29 18:20:39 UTC
[accumulo] branch main updated: Refactor hot methods (#2811)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new cd0113eb19 Refactor hot methods (#2811)
cd0113eb19 is described below
commit cd0113eb1953cc4ca785e12d172c3f17437c3899
Author: Dom G <do...@apache.org>
AuthorDate: Thu Sep 29 14:20:34 2022 -0400
Refactor hot methods (#2811)
* Reduce size and time complexity of TabletClientHandler.flush
* exract new method for determining locality group in LocalityGroupIterator
* remove questionable use of streams
* refactor ScanServer.reserveFiles - no longer hot
* refactor SimpleLoadBalancer.move() - no longer hot
Co-authored-by: Keith Turner <kt...@apache.org>
---
.../accumulo/core/file/rfile/RelativeKey.java | 94 +++++--------
.../system/LocalityGroupIterator.java | 71 +++++-----
.../core/spi/balancer/SimpleLoadBalancer.java | 56 ++++----
.../org/apache/accumulo/server/fs/FileManager.java | 29 ++--
.../org/apache/accumulo/tserver/ScanServer.java | 29 ++--
.../accumulo/tserver/TabletClientHandler.java | 36 ++---
.../apache/accumulo/tserver/scan/LookupTask.java | 74 +++++-----
.../accumulo/tserver/tablet/CompactableImpl.java | 151 +++++++++++----------
8 files changed, 263 insertions(+), 277 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
index ef37ac1b9e..1398522ee0 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.file.rfile;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.function.Supplier;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -84,42 +85,24 @@ public class RelativeKey implements Writable {
fieldsSame = 0;
fieldsPrefixed = 0;
- ByteSequence prevKeyScratch;
- ByteSequence keyScratch;
-
if (prevKey != null) {
- prevKeyScratch = prevKey.getRowData();
- keyScratch = key.getRowData();
- rowCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
- if (rowCommonPrefixLen == -1)
- fieldsSame |= ROW_SAME;
- else if (rowCommonPrefixLen > 1)
- fieldsPrefixed |= ROW_COMMON_PREFIX;
+ ByteSequence prevKeyScratch = prevKey.getRowData();
+ ByteSequence keyScratch = key.getRowData();
+ rowCommonPrefixLen =
+ getCommonPrefixLen(prevKeyScratch, keyScratch, ROW_SAME, ROW_COMMON_PREFIX);
prevKeyScratch = prevKey.getColumnFamilyData();
keyScratch = key.getColumnFamilyData();
- cfCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
- if (cfCommonPrefixLen == -1)
- fieldsSame |= CF_SAME;
- else if (cfCommonPrefixLen > 1)
- fieldsPrefixed |= CF_COMMON_PREFIX;
+ cfCommonPrefixLen = getCommonPrefixLen(prevKeyScratch, keyScratch, CF_SAME, CF_COMMON_PREFIX);
prevKeyScratch = prevKey.getColumnQualifierData();
keyScratch = key.getColumnQualifierData();
- cqCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
- if (cqCommonPrefixLen == -1)
- fieldsSame |= CQ_SAME;
- else if (cqCommonPrefixLen > 1)
- fieldsPrefixed |= CQ_COMMON_PREFIX;
+ cqCommonPrefixLen = getCommonPrefixLen(prevKeyScratch, keyScratch, CQ_SAME, CQ_COMMON_PREFIX);
prevKeyScratch = prevKey.getColumnVisibilityData();
keyScratch = key.getColumnVisibilityData();
- cvCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
- if (cvCommonPrefixLen == -1)
- fieldsSame |= CV_SAME;
- else if (cvCommonPrefixLen > 1)
- fieldsPrefixed |= CV_COMMON_PREFIX;
+ cvCommonPrefixLen = getCommonPrefixLen(prevKeyScratch, keyScratch, CV_SAME, CV_COMMON_PREFIX);
tsDiff = key.getTimestamp() - prevKey.getTimestamp();
if (tsDiff == 0)
@@ -135,6 +118,17 @@ public class RelativeKey implements Writable {
fieldsSame |= DELETED;
}
+ private int getCommonPrefixLen(ByteSequence prevKeyScratch, ByteSequence keyScratch,
+ byte fieldBit, byte commonPrefix) {
+ int commonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+ if (commonPrefixLen == -1) {
+ fieldsSame |= fieldBit;
+ } else if (commonPrefixLen > 1) {
+ fieldsPrefixed |= commonPrefix;
+ }
+ return commonPrefixLen;
+ }
+
/**
*
* @return -1 (exact match) or the number of bytes in common
@@ -174,40 +168,13 @@ public class RelativeKey implements Writable {
fieldsPrefixed = 0;
}
- byte[] row, cf, cq, cv;
- long ts;
-
- if ((fieldsSame & ROW_SAME) == ROW_SAME) {
- row = prevKey.getRowData().toArray();
- } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) {
- row = readPrefix(in, prevKey.getRowData());
- } else {
- row = read(in);
- }
+ final byte[] row, cf, cq, cv;
+ final long ts;
- if ((fieldsSame & CF_SAME) == CF_SAME) {
- cf = prevKey.getColumnFamilyData().toArray();
- } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) {
- cf = readPrefix(in, prevKey.getColumnFamilyData());
- } else {
- cf = read(in);
- }
-
- if ((fieldsSame & CQ_SAME) == CQ_SAME) {
- cq = prevKey.getColumnQualifierData().toArray();
- } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) {
- cq = readPrefix(in, prevKey.getColumnQualifierData());
- } else {
- cq = read(in);
- }
-
- if ((fieldsSame & CV_SAME) == CV_SAME) {
- cv = prevKey.getColumnVisibilityData().toArray();
- } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) {
- cv = readPrefix(in, prevKey.getColumnVisibilityData());
- } else {
- cv = read(in);
- }
+ row = getData(in, ROW_SAME, ROW_COMMON_PREFIX, () -> prevKey.getRowData());
+ cf = getData(in, CF_SAME, CF_COMMON_PREFIX, () -> prevKey.getColumnFamilyData());
+ cq = getData(in, CQ_SAME, CQ_COMMON_PREFIX, () -> prevKey.getColumnQualifierData());
+ cv = getData(in, CV_SAME, CV_COMMON_PREFIX, () -> prevKey.getColumnVisibilityData());
if ((fieldsSame & TS_SAME) == TS_SAME) {
ts = prevKey.getTimestamp();
@@ -221,6 +188,17 @@ public class RelativeKey implements Writable {
this.prevKey = this.key;
}
+ private byte[] getData(DataInput in, byte fieldBit, byte commonPrefix,
+ Supplier<ByteSequence> data) throws IOException {
+ if ((fieldsSame & fieldBit) == fieldBit) {
+ return data.get().toArray();
+ } else if ((fieldsPrefixed & commonPrefix) == commonPrefix) {
+ return readPrefix(in, data.get());
+ } else {
+ return read(in);
+ }
+ }
+
public static class SkippR {
RelativeKey rk;
int skipped;
diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java
index 010428797f..58c358acd5 100644
--- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -163,26 +164,26 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible
Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
hiter.clear();
- Set<ByteSequence> cfSet;
- if (columnFamilies.isEmpty()) {
- cfSet = Collections.emptySet();
- } else {
- if (columnFamilies instanceof Set<?>) {
- cfSet = (Set<ByteSequence>) columnFamilies;
- } else {
- cfSet = new HashSet<>();
- cfSet.addAll(columnFamilies);
- }
- }
+ final Set<ByteSequence> cfSet = getCfSet(columnFamilies);
// determine the set of groups to use
- Collection<LocalityGroup> groups = Collections.emptyList();
+ final Collection<LocalityGroup> groups = getLocalityGroups(lgContext, inclusive, cfSet);
+ for (LocalityGroup lgr : groups) {
+ lgr.getIterator().seek(range, EMPTY_CF_SET, false);
+ hiter.addSource(lgr.getIterator());
+ }
+
+ return groups;
+ }
+
+ private static Collection<LocalityGroup> getLocalityGroups(LocalityGroupContext lgContext,
+ boolean inclusive, Set<ByteSequence> cfSet) {
+
+ final Collection<LocalityGroup> groups;
// if no column families specified, then include all groups unless !inclusive
if (cfSet.isEmpty()) {
- if (!inclusive) {
- groups = lgContext.groups;
- }
+ groups = inclusive ? List.of() : lgContext.groups;
} else {
groups = new HashSet<>();
@@ -207,35 +208,33 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible
* other
*/
if (!inclusive) {
- for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet()) {
- if (!cfSet.contains(entry.getKey())) {
- groups.add(entry.getValue());
- }
- }
+ lgContext.groupByCf.entrySet().stream().filter(entry -> !cfSet.contains(entry.getKey()))
+ .map(Entry::getValue).forEach(groups::add);
} else if (lgContext.groupByCf.size() <= cfSet.size()) {
- for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet()) {
- if (cfSet.contains(entry.getKey())) {
- groups.add(entry.getValue());
- }
- }
+ lgContext.groupByCf.entrySet().stream().filter(entry -> cfSet.contains(entry.getKey()))
+ .map(Entry::getValue).forEach(groups::add);
} else {
- for (ByteSequence cf : cfSet) {
- LocalityGroup group = lgContext.groupByCf.get(cf);
- if (group != null) {
- groups.add(group);
- }
- }
+ cfSet.stream().map(lgContext.groupByCf::get).filter(Objects::nonNull).forEach(groups::add);
}
}
- for (LocalityGroup lgr : groups) {
- lgr.getIterator().seek(range, EMPTY_CF_SET, false);
- hiter.addSource(lgr.getIterator());
- }
-
return groups;
}
+ private static Set<ByteSequence> getCfSet(Collection<ByteSequence> columnFamilies) {
+ final Set<ByteSequence> cfSet;
+ if (columnFamilies.isEmpty()) {
+ cfSet = Collections.emptySet();
+ } else {
+ if (columnFamilies instanceof Set<?>) {
+ cfSet = (Set<ByteSequence>) columnFamilies;
+ } else {
+ cfSet = Set.copyOf(columnFamilies);
+ }
+ }
+ return cfSet;
+ }
+
/**
* This seek method will reuse the supplied LocalityGroupSeekCache if it can. Otherwise it will
* delegate to the _seek method.
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
index 7a0111c2c9..4738485ad0 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.spi.balancer;
import static java.util.concurrent.TimeUnit.SECONDS;
+import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -245,31 +246,8 @@ public class SimpleLoadBalancer implements TabletBalancer {
Map<TableId,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
for (int i = 0; i < count; i++) {
- TableId table;
- Integer tooLittleCount;
- if (tableToBalance == null) {
- // find a table to migrate
- // look for an uneven table count
- int biggestDifference = 0;
- TableId biggestDifferenceTable = null;
- for (var tableEntry : tooMuchMap.entrySet()) {
- TableId tableID = tableEntry.getKey();
- tooLittleMap.putIfAbsent(tableID, 0);
- int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
- if (diff > biggestDifference) {
- biggestDifference = diff;
- biggestDifferenceTable = tableID;
- }
- }
- if (biggestDifference < 2) {
- table = busiest(tooMuch.status.getTableMap());
- } else {
- table = biggestDifferenceTable;
- }
- } else {
- // just balance the given table
- table = tableToBalance;
- }
+ TableId table = getTableToMigrate(tooMuch, tooMuchMap, tooLittleMap);
+
Map<TabletId,TabletStatistics> onlineTabletsForTable = donerTabletStats.get(table);
try {
if (onlineTabletsForTable == null) {
@@ -297,10 +275,7 @@ public class SimpleLoadBalancer implements TabletBalancer {
* is only one tabletserver that holds all of the tablets. Here we check to see if in fact
* that is the case and if so set the value to 0.
*/
- tooLittleCount = tooLittleMap.get(table);
- if (tooLittleCount == null) {
- tooLittleCount = 0;
- }
+ Integer tooLittleCount = tooLittleMap.getOrDefault(table, 0);
tooLittleMap.put(table, tooLittleCount + 1);
tooMuch.count--;
tooLittle.count++;
@@ -309,6 +284,29 @@ public class SimpleLoadBalancer implements TabletBalancer {
return result;
}
+ private TableId getTableToMigrate(ServerCounts tooMuch, Map<TableId,Integer> tooMuchMap,
+ Map<TableId,Integer> tooLittleMap) {
+
+ if (tableToBalance != null) {
+ return tableToBalance;
+ }
+
+ // find a table to migrate
+ // look for an uneven table count
+ Entry<TableId,Integer> biggestEntry = tooMuchMap.entrySet().stream().map(entry -> {
+ TableId tableID = entry.getKey();
+ int diff = entry.getValue() - tooLittleMap.getOrDefault(tableID, 0);
+ return new SimpleEntry<>(tableID, diff); // map the table count to the difference
+ }).max(Entry.comparingByValue()) // get the largest difference
+ .orElseGet(() -> new SimpleEntry<>(null, 0));
+
+ if (biggestEntry.getValue() < 2) {
+ return busiest(tooMuch.status.getTableMap());
+ } else {
+ return biggestEntry.getKey();
+ }
+ }
+
protected List<TabletStatistics> getOnlineTabletsForTable(TabletServerId tabletServerId,
TableId tableId) throws AccumuloSecurityException, AccumuloException {
return environment.listOnlineTabletsForTable(tabletServerId, tableId);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index e772f8b59f..0d3c85fed5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -504,20 +504,13 @@ public class FileManager {
ArrayList<InterruptibleIterator> iters = new ArrayList<>();
- boolean sawTimeSet = false;
- for (DataFileValue dfv : files.values()) {
- if (dfv.isTimeSet()) {
- sawTimeSet = true;
- break;
- }
- }
+ boolean sawTimeSet = files.values().stream().anyMatch(DataFileValue::isTimeSet);
for (Entry<FileSKVIterator,String> entry : newlyReservedReaders.entrySet()) {
- FileSKVIterator reader = entry.getKey();
+ FileSKVIterator source = entry.getKey();
String filename = entry.getValue();
InterruptibleIterator iter;
- FileSKVIterator source = reader;
if (samplerConfig != null) {
source = source.getSample(samplerConfig);
if (source == null) {
@@ -525,16 +518,8 @@ public class FileManager {
}
}
- if (detachable) {
- FileDataSource fds = new FileDataSource(filename, source);
- dataSources.add(fds);
- SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
- iter = new ProblemReportingIterator(context, tablet.tableId(), filename,
- continueOnFailure, ssi);
- } else {
- iter = new ProblemReportingIterator(context, tablet.tableId(), filename,
- continueOnFailure, source);
- }
+ iter = new ProblemReportingIterator(context, tablet.tableId(), filename, continueOnFailure,
+ detachable ? getSsi(filename, source) : source);
if (sawTimeSet) {
// constructing FileRef is expensive so avoid if not needed
@@ -550,6 +535,12 @@ public class FileManager {
return iters;
}
+ private SourceSwitchingIterator getSsi(String filename, FileSKVIterator source) {
+ FileDataSource fds = new FileDataSource(filename, source);
+ dataSources.add(fds);
+ return new SourceSwitchingIterator(fds);
+ }
+
public synchronized void detach() {
releaseReaders(tablet, tabletReservedReaders, false);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index ce38d7161a..15574f3657 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -653,20 +653,7 @@ public class ScanServer extends AbstractServer
throw new NoSuchScanIDException();
}
- Set<StoredTabletFile> scanSessionFiles;
-
- if (session instanceof SingleScanSession) {
- var sss = (SingleScanSession) session;
- scanSessionFiles =
- Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet());
- } else if (session instanceof MultiScanSession) {
- var mss = (MultiScanSession) session;
- scanSessionFiles = mss.exents.stream()
- .flatMap(e -> mss.getTabletResolver().getTablet(e).getDatafiles().keySet().stream())
- .collect(Collectors.toUnmodifiableSet());
- } else {
- throw new IllegalArgumentException("Unknown session type " + session.getClass().getName());
- }
+ Set<StoredTabletFile> scanSessionFiles = getScanSessionFiles(session);
long myReservationId = nextScanReservationId.incrementAndGet();
// we are only reserving if the files already exists in reservedFiles, so only need the read
@@ -698,6 +685,20 @@ public class ScanServer extends AbstractServer
return new ScanReservation(scanSessionFiles, myReservationId);
}
+ private static Set<StoredTabletFile> getScanSessionFiles(ScanSession session) {
+ if (session instanceof SingleScanSession) {
+ var sss = (SingleScanSession) session;
+ return Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet());
+ } else if (session instanceof MultiScanSession) {
+ var mss = (MultiScanSession) session;
+ return mss.exents.stream()
+ .flatMap(e -> mss.getTabletResolver().getTablet(e).getDatafiles().keySet().stream())
+ .collect(Collectors.toUnmodifiableSet());
+ } else {
+ throw new IllegalArgumentException("Unknown session type " + session.getClass().getName());
+ }
+ }
+
private void cleanUpReservedFiles(long expireTimeMs) {
// Do a quick check to see if there is any potential work. This check is done to avoid acquiring
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 9dab5bbf86..4437e6b422 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -1202,33 +1202,27 @@ public class TabletClientHandler implements TabletClientService.Iface {
throw new RuntimeException(e);
}
- ArrayList<Tablet> tabletsToFlush = new ArrayList<>();
-
KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
ByteBufferUtil.toText(startRow));
- for (Tablet tablet : server.getOnlineTablets().values()) {
- if (ke.overlaps(tablet.getExtent())) {
- tabletsToFlush.add(tablet);
- }
- }
+ List<Tablet> tabletsToFlush = server.getOnlineTablets().values().stream()
+ .filter(tablet -> ke.overlaps(tablet.getExtent())).collect(toList());
- Long flushID = null;
+ if (tabletsToFlush.isEmpty())
+ return; // no tablets to flush
- for (Tablet tablet : tabletsToFlush) {
- if (flushID == null) {
- // read the flush id once from zookeeper instead of reading
- // it for each tablet
- try {
- flushID = tablet.getFlushID();
- } catch (NoNodeException e) {
- // table was probably deleted
- log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
- return;
- }
- }
- tablet.flush(flushID);
+ // read the flush id once from zookeeper instead of reading it for each tablet
+ final long flushID;
+ try {
+ Tablet firstTablet = tabletsToFlush.get(0);
+ flushID = firstTablet.getFlushID();
+ } catch (NoNodeException e) {
+ // table was probably deleted
+ log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
+ return;
}
+
+ tabletsToFlush.forEach(tablet -> tablet.flush(flushID));
}
@Override
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index bf4f2ab725..5eb7ebfa1a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -95,9 +95,11 @@ public class LookupTask extends ScanTask<MultiScanResult> {
// check the time so that the read ahead thread is not monopolized
while (iter.hasNext() && bytesAdded < maxResultsSize
&& (System.currentTimeMillis() - startTime) < maxScanTime) {
- Entry<KeyExtent,List<Range>> entry = iter.next();
- KeyExtent extent = entry.getKey();
- List<Range> ranges = entry.getValue();
+
+ final Entry<KeyExtent,List<Range>> entry = iter.next();
+ final KeyExtent extent = entry.getKey();
+ final List<Range> ranges = entry.getValue();
+
iter.remove();
// check that tablet server is serving requested tablet
@@ -112,9 +114,8 @@ public class LookupTask extends ScanTask<MultiScanResult> {
try {
- // do the following check to avoid a race condition
- // between setting false below and the task being
- // canceled
+ // do the following check to avoid a race condition between setting false below and the
+ // task being canceled
if (isCancelled())
interruptFlag.set(true);
@@ -127,10 +128,8 @@ public class LookupTask extends ScanTask<MultiScanResult> {
// Add results from this Tablet.lookup() to the accumulated results
results.addAll(tabletResults);
- // if the tablet was closed it it possible that the
- // interrupt flag was set.... do not want it set for
- // the next
- // lookup
+ // if the tablet was closed, it is possible that the interrupt flag was set.... do not
+ // want it set for the next lookup
interruptFlag.set(false);
} catch (IOException e) {
@@ -164,29 +163,12 @@ public class LookupTask extends ScanTask<MultiScanResult> {
long finishTime = System.currentTimeMillis();
session.totalLookupTime += (finishTime - startTime);
session.numEntries += results.size();
+ boolean queriesIsEmpty = !session.queries.isEmpty();
- // convert everything to thrift before adding result
- List<TKeyValue> retResults = new ArrayList<>();
- for (KVEntry entry : results)
- retResults
- .add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())));
- // @formatter:off
- Map<TKeyExtent,List<TRange>> retFailures = failures.entrySet().stream().collect(Collectors.toMap(
- entry -> entry.getKey().toThrift(),
- entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
- ));
- // @formatter:on
- List<TKeyExtent> retFullScans =
- fullScans.stream().map(KeyExtent::toThrift).collect(Collectors.toList());
- TKeyExtent retPartScan = null;
- TKey retPartNextKey = null;
- if (partScan != null) {
- retPartScan = partScan.toThrift();
- retPartNextKey = partNextKey.toThrift();
- }
// add results to queue
- addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan,
- retPartNextKey, partNextKeyInclusive, !session.queries.isEmpty()));
+ MultiScanResult multiScanResult = getMultiScanResult(results, partScan, failures, fullScans,
+ partNextKey, partNextKeyInclusive, queriesIsEmpty);
+ addResult(multiScanResult);
} catch (IterationInterruptedException iie) {
if (!isCancelled()) {
log.warn("Iteration interrupted, when scan not cancelled", iie);
@@ -202,4 +184,34 @@ public class LookupTask extends ScanTask<MultiScanResult> {
runState.set(ScanRunState.FINISHED);
}
}
+
+ private MultiScanResult getMultiScanResult(List<KVEntry> results, KeyExtent partScan,
+ Map<KeyExtent,List<Range>> failures, List<KeyExtent> fullScans, Key partNextKey,
+ boolean partNextKeyInclusive, boolean queriesIsEmpty) {
+
+ // convert everything to thrift before adding result
+ List<TKeyValue> retResults = results.stream().map(
+ entry -> new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())))
+ .collect(Collectors.toList());
+
+ // @formatter:off
+ Map<TKeyExtent,List<TRange>> retFailures = failures.entrySet().stream().collect(Collectors.toMap(
+ entry -> entry.getKey().toThrift(),
+ entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
+ ));
+ // @formatter:on
+
+ List<TKeyExtent> retFullScans =
+ fullScans.stream().map(KeyExtent::toThrift).collect(Collectors.toList());
+
+ TKeyExtent retPartScan = null;
+ TKey retPartNextKey = null;
+ if (partScan != null) {
+ retPartScan = partScan.toThrift();
+ retPartNextKey = partNextKey.toThrift();
+ }
+
+ return new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey,
+ partNextKeyInclusive, queriesIsEmpty);
+ }
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index e4660ea964..6e3d728b21 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -396,83 +396,96 @@ public class CompactableImpl implements Compactable {
if (isCompactionStratConfigured)
return Set.of();
- switch (selectStatus) {
- case NOT_ACTIVE:
- case CANCELED: {
- Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
- candidates.removeAll(allCompactingFiles);
- return Collections.unmodifiableSet(candidates);
- }
- case NEW:
- case SELECTING:
- return Set.of();
- case SELECTED: {
- Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
- candidates.removeAll(allCompactingFiles);
- if (getNanoTime() - selectedTimeNanos < selectedExpirationDuration.toNanos()) {
- candidates.removeAll(selectedFiles);
- }
- return Collections.unmodifiableSet(candidates);
- }
- case RESERVED: {
- Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
- candidates.removeAll(allCompactingFiles);
- candidates.removeAll(selectedFiles);
- return Collections.unmodifiableSet(candidates);
- }
- default:
- throw new AssertionError();
- }
+ return handleSystemCompaction(currFiles);
}
case SELECTOR:
// intentional fall through
case USER:
- switch (selectStatus) {
- case NOT_ACTIVE:
- case NEW:
- case SELECTING:
- case CANCELED:
- return Set.of();
- case SELECTED:
- case RESERVED: {
- if (selectKind == kind) {
- Set<StoredTabletFile> candidates = new HashSet<>(selectedFiles);
- candidates.removeAll(allCompactingFiles);
- candidates = Collections.unmodifiableSet(candidates);
- // verify that candidates are still around and fail quietly if not
- if (!currFiles.containsAll(candidates)) {
- log.debug("Selected files not in all files {} {}", candidates, currFiles);
- return Set.of();
- }
- return candidates;
- } else {
- return Set.of();
- }
- }
- default:
- throw new AssertionError();
- }
+ return handleUserSelectorCompaction(currFiles, kind);
case CHOP: {
- switch (chopStatus) {
- case NOT_ACTIVE:
- case SELECTING:
- case MARKING:
+ return handleChopCompaction(currFiles);
+ }
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ private Set<StoredTabletFile> handleChopCompaction(Set<StoredTabletFile> currFiles) {
+ switch (chopStatus) {
+ case NOT_ACTIVE:
+ case SELECTING:
+ case MARKING:
+ return Set.of();
+ case SELECTED: {
+ if (selectStatus == FileSelectionStatus.NEW
+ || selectStatus == FileSelectionStatus.SELECTING)
+ return Set.of();
+
+ var filesToChop = getFilesToChop(currFiles);
+ filesToChop.removeAll(allCompactingFiles);
+ if (selectStatus == FileSelectionStatus.SELECTED
+ || selectStatus == FileSelectionStatus.RESERVED)
+ filesToChop.removeAll(selectedFiles);
+ return Collections.unmodifiableSet(filesToChop);
+ }
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ private Set<StoredTabletFile> handleUserSelectorCompaction(Set<StoredTabletFile> currFiles,
+ CompactionKind kind) {
+ switch (selectStatus) {
+ case NOT_ACTIVE:
+ case NEW:
+ case SELECTING:
+ case CANCELED:
+ return Set.of();
+ case SELECTED:
+ case RESERVED: {
+ if (selectKind == kind) {
+ Set<StoredTabletFile> candidates = Sets.difference(selectedFiles, allCompactingFiles);
+ // verify that candidates are still around and fail quietly if not
+ if (!currFiles.containsAll(candidates)) {
+ log.debug("Selected files not in all files {} {}", candidates, currFiles);
return Set.of();
- case SELECTED: {
- if (selectStatus == FileSelectionStatus.NEW
- || selectStatus == FileSelectionStatus.SELECTING)
- return Set.of();
-
- var filesToChop = getFilesToChop(currFiles);
- filesToChop.removeAll(allCompactingFiles);
- if (selectStatus == FileSelectionStatus.SELECTED
- || selectStatus == FileSelectionStatus.RESERVED)
- filesToChop.removeAll(selectedFiles);
- return Collections.unmodifiableSet(filesToChop);
}
- default:
- throw new AssertionError();
+ // must create a copy because the sets passed to Sets.difference could change after this
+ // method returns
+ return Set.copyOf(candidates);
+ } else {
+ return Set.of();
+ }
+ }
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ private Set<StoredTabletFile> handleSystemCompaction(Set<StoredTabletFile> currFiles) {
+ switch (selectStatus) {
+ case NOT_ACTIVE:
+ case CANCELED: {
+ // must create a copy because the sets passed to Sets.difference could change after this
+ // method returns
+ return Set.copyOf(Sets.difference(currFiles, allCompactingFiles));
+ }
+ case NEW:
+ case SELECTING:
+ return Set.of();
+ case SELECTED: {
+ Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
+ candidates.removeAll(allCompactingFiles);
+ if (getNanoTime() - selectedTimeNanos < selectedExpirationDuration.toNanos()) {
+ candidates.removeAll(selectedFiles);
}
+ return Collections.unmodifiableSet(candidates);
+ }
+ case RESERVED: {
+ Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
+ candidates.removeAll(allCompactingFiles);
+ candidates.removeAll(selectedFiles);
+ return Collections.unmodifiableSet(candidates);
}
default:
throw new AssertionError();