You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/05/10 20:56:01 UTC
[accumulo] 01/01: Merge remote-tracking branch 'upstream/1.9'
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f73f61b9942ee41dad6795220965656e15c180cd
Merge: 5e072bb f2bf2d8
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri May 10 16:54:35 2019 -0400
Merge remote-tracking branch 'upstream/1.9'
.../org/apache/accumulo/tserver/tablet/Tablet.java | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index a48bdf5,3366e9e..26204d9
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -2412,11 -2442,30 +2417,23 @@@ public class Tablet
}
}
}
+
- Iterator<FileRef> fiter = fileMap.keySet().iterator();
- while (fiter.hasNext()) {
- FileRef file = fiter.next();
++ fileMap.keySet().removeIf(file -> {
+ if (bulkImporting.contains(file)) {
+ log.info("Ignoring import of bulk file currently importing: " + file);
- fiter.remove();
++ return true;
+ }
- }
++ return false;
++ });
+
if (fileMap.isEmpty()) {
return;
}
- if (writesInProgress < 0) {
- throw new IllegalStateException("FATAL: Something really bad went wrong. Attempted to "
- + "increment a negative number of writes in progress " + writesInProgress + "on tablet "
- + extent);
- }
+ incrementWritesInProgress();
+
+ // prevent other threads from processing this file while its added to the metadata table.
+ bulkImporting.addAll(fileMap.keySet());
-
- writesInProgress++;
}
try {
tabletServer.updateBulkImportState(files, BulkImportState.LOADING);
@@@ -2431,10 -2480,27 +2448,15 @@@
}
} finally {
synchronized (this) {
- if (writesInProgress < 1)
- throw new IllegalStateException("FATAL: Something really bad went wrong. Attempted to "
- + "decrement the number of writes in progress " + writesInProgress
- + " to < 0 on tablet " + extent);
-
- writesInProgress--;
- if (writesInProgress == 0)
- this.notifyAll();
+ decrementWritesInProgress();
+ if (!bulkImporting.removeAll(fileMap.keySet())) {
+ throw new AssertionError(
+ "Likely bug in code, always expect to remove something. Please open an Accumulo issue.");
+ }
+
try {
- bulkImported.get(tid, new Callable<List<FileRef>>() {
- @Override
- public List<FileRef> call() throws Exception {
- return new ArrayList<>();
- }
- }).addAll(fileMap.keySet());
+ bulkImported.get(tid, ArrayList::new).addAll(fileMap.keySet());
} catch (Exception ex) {
log.info(ex.toString(), ex);
}