You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/09/11 17:08:32 UTC
[accumulo] branch master updated: Use concurrent map instead of
cache for bulkImported map (#1356)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new efc9311 Use concurrent map instead of cache for bulkImported map (#1356)
efc9311 is described below
commit efc931102180562d1f17f9493bb7d4cd55f3f4a1
Author: Keith Turner <kt...@apache.org>
AuthorDate: Wed Sep 11 13:08:27 2019 -0400
Use concurrent map instead of cache for bulkImported map (#1356)
While analyzing some tablet code I noticed a Guava cahce was being used
like a map. When I first saw the code I was concerned the cache may
evict entries (which would be a bug). However the cache was created
with default settings, which according to the javadoc does not
automatically evict. This commit replaces the cache with a concurrent
map so no one has to do this analysis again.
---
.../tserver/tablet/BulkImportCacheCleaner.java | 2 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 34 ++++++++++++----------
2 files changed, 19 insertions(+), 17 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
index c908840..03f33e2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
@@ -40,7 +40,7 @@ public class BulkImportCacheCleaner implements Runnable {
// gather the list of transactions the tablets have cached
final Set<Long> tids = new HashSet<>();
for (Tablet tablet : server.getOnlineTablets().values()) {
- tids.addAll(tablet.getBulkIngestedFiles().keySet());
+ tids.addAll(tablet.getBulkIngestedTxIds());
}
try {
// get the current transactions from ZooKeeper
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index bbc70a4..1e89624 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -38,7 +38,7 @@ import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -152,7 +152,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -254,8 +253,13 @@ public class Tablet {
// tablet lock.
private final Set<FileRef> bulkImporting = new HashSet<>();
- // Files that were successfully bulk imported.
- private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().build();
+ // Files that were successfully bulk imported. Using a concurrent map supports non-locking
+ // operations on the key set which is useful for the periodic task that cleans up completed bulk
+ // imports for all tablets. However the values of this map are ArrayList which do not support
+ // concurrency. This is ok because all operations on the values are done while the tablet lock is
+ // held.
+ private final ConcurrentHashMap<Long,List<FileRef>> bulkImported =
+ new ConcurrentHashMap<Long,List<FileRef>>();
private final int logId;
@@ -344,7 +348,7 @@ public class Tablet {
this.location = locationPath;
this.tabletDirectory = tabletPaths.dir;
for (Entry<Long,List<FileRef>> entry : data.getBulkImported().entrySet()) {
- this.bulkImported.put(entry.getKey(), new CopyOnWriteArrayList<>(entry.getValue()));
+ this.bulkImported.put(entry.getKey(), new ArrayList<>(entry.getValue()));
}
final List<LogEntry> logEntries = tabletPaths.logEntries;
@@ -2246,17 +2250,17 @@ public class Tablet {
MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio,
getTabletServer().getContext(), getTabletServer().getLock());
MasterMetadataUtil.addNewTablet(getTabletServer().getContext(), low, lowDirectory,
- getTabletServer().getTabletSession(), lowDatafileSizes, getBulkIngestedFiles(), time,
- lastFlushID, lastCompactID, getTabletServer().getLock());
+ getTabletServer().getTabletSession(), lowDatafileSizes, bulkImported, time, lastFlushID,
+ lastCompactID, getTabletServer().getLock());
MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove,
getTabletServer().getContext(), getTabletServer().getLock());
log.debug("TABLET_HIST {} split {} {}", extent, low, high);
newTablets.put(high, new TabletData(tabletDirectory, highDatafileSizes, time, lastFlushID,
- lastCompactID, lastLocation, getBulkIngestedFiles()));
+ lastCompactID, lastLocation, bulkImported));
newTablets.put(low, new TabletData(lowDirectory, lowDatafileSizes, time, lastFlushID,
- lastCompactID, lastLocation, getBulkIngestedFiles()));
+ lastCompactID, lastLocation, bulkImported));
long t2 = System.currentTimeMillis();
@@ -2338,7 +2342,7 @@ public class Tablet {
"Timeout waiting " + (lockWait / 1000.) + " seconds to get tablet lock for " + extent);
}
- List<FileRef> alreadyImported = bulkImported.getIfPresent(tid);
+ List<FileRef> alreadyImported = bulkImported.get(tid);
if (alreadyImported != null) {
for (FileRef entry : alreadyImported) {
if (fileMap.remove(entry) != null) {
@@ -2385,7 +2389,7 @@ public class Tablet {
}
try {
- bulkImported.get(tid, ArrayList::new).addAll(fileMap.keySet());
+ bulkImported.computeIfAbsent(tid, k -> new ArrayList<>()).addAll(fileMap.keySet());
} catch (Exception ex) {
log.info(ex.toString(), ex);
}
@@ -2848,14 +2852,12 @@ public class Tablet {
}
}
- public Map<Long,List<FileRef>> getBulkIngestedFiles() {
- return new HashMap<>(bulkImported.asMap());
+ public Set<Long> getBulkIngestedTxIds() {
+ return bulkImported.keySet();
}
public void cleanupBulkLoadedFiles(Set<Long> tids) {
- for (Long tid : tids) {
- bulkImported.invalidate(tid);
- }
+ bulkImported.keySet().removeAll(tids);
}
}