You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2022/10/12 11:45:27 UTC

[accumulo] branch main updated: fixes user compaction stuck when producing no output (#3013)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c37144b62 fixes user compaction stuck when producing no output (#3013)
2c37144b62 is described below

commit 2c37144b6223280602ea4ffd24916b71c10b409d
Author: Keith Turner <kt...@apache.org>
AuthorDate: Wed Oct 12 12:45:20 2022 +0100

    fixes user compaction stuck when producing no output (#3013)
    
    This commit fixes a bug where :
    
     * user compactions take multiple compaction steps (because the tablet has many files)
     * the intermediate steps produce no output
    
    When the above happened CompactableImpl and Tablet would disagree about what files there were.
    The Tablet code would ignore the empty file produced by an intermediate compaction.
    CompactableImpl would expect Tablet to know of this file. When this happened things would hang
    until a tserver was restarted.
    
    Ran into this bug while continually running Bulk random walk test to reproduce #2667
---
 .../accumulo/server/util/ManagerMetadataUtil.java  | 10 ++--
 .../accumulo/tserver/tablet/CompactableImpl.java   | 57 +++++++++++++---------
 .../accumulo/tserver/tablet/CompactableUtils.java  |  2 +-
 .../accumulo/tserver/tablet/DatafileManager.java   | 19 +++++---
 .../tablet/CompactableImplFileManagerTest.java     |  2 +-
 .../accumulo/test/functional/CompactionIT.java     | 51 +++++++++++++++++++
 6 files changed, 105 insertions(+), 36 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
index 4c40125d34..82b3f92ad0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
@@ -182,9 +182,9 @@ public class ManagerMetadataUtil {
   }
 
   public static void replaceDatafiles(ServerContext context, KeyExtent extent,
-      Set<StoredTabletFile> datafilesToDelete, Set<StoredTabletFile> scanFiles, TabletFile path,
-      Long compactionId, DataFileValue size, String address, TServerInstance lastLocation,
-      ServiceLock zooLock, Optional<ExternalCompactionId> ecid) {
+      Set<StoredTabletFile> datafilesToDelete, Set<StoredTabletFile> scanFiles,
+      Optional<StoredTabletFile> path, Long compactionId, DataFileValue size, String address,
+      TServerInstance lastLocation, ServiceLock zooLock, Optional<ExternalCompactionId> ecid) {
 
     context.getAmple().putGcCandidates(extent.tableId(), datafilesToDelete);
 
@@ -193,8 +193,8 @@ public class ManagerMetadataUtil {
     datafilesToDelete.forEach(tablet::deleteFile);
     scanFiles.forEach(tablet::putScan);
 
-    if (size.getNumEntries() > 0)
-      tablet.putFile(path, size);
+    if (path.isPresent())
+      tablet.putFile(path.get(), size);
 
     if (compactionId != null)
       tablet.putCompactionId(compactionId);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 6e3d728b21..20b125eea5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -447,7 +447,8 @@ public class CompactableImpl implements Compactable {
             Set<StoredTabletFile> candidates = Sets.difference(selectedFiles, allCompactingFiles);
             // verify that candidates are still around and fail quietly if not
             if (!currFiles.containsAll(candidates)) {
-              log.debug("Selected files not in all files {} {}", candidates, currFiles);
+              log.debug("Selected files not in all files {} {} {}",
+                  Sets.difference(candidates, currFiles), candidates, currFiles);
               return Set.of();
             }
             // must create a copy because the sets passed to Sets.difference could change after this
@@ -575,21 +576,21 @@ public class CompactableImpl implements Compactable {
      * @param newFile
      *          The file produced by a compaction. If the compaction failed, this can be null.
      */
-    void completed(CompactionJob job, Set<StoredTabletFile> jobFiles, StoredTabletFile newFile) {
+    void completed(CompactionJob job, Set<StoredTabletFile> jobFiles,
+        Optional<StoredTabletFile> newFile) {
       Preconditions.checkArgument(!jobFiles.isEmpty());
       Preconditions.checkState(allCompactingFiles.removeAll(jobFiles));
-      if (newFile != null) {
-        choppedFiles.add(newFile);
+      if (newFile.isPresent()) {
+        choppedFiles.add(newFile.get());
       }
 
-      if ((job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR)
-          && newFile != null) {
+      if ((job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR)) {
         selectedCompactionCompleted(job, jobFiles, newFile);
       }
     }
 
     private void selectedCompactionCompleted(CompactionJob job, Set<StoredTabletFile> jobFiles,
-        StoredTabletFile newFile) {
+        Optional<StoredTabletFile> newFile) {
       Preconditions.checkArgument(
           job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR);
       Preconditions.checkState(selectedFiles.containsAll(jobFiles));
@@ -603,9 +604,11 @@ public class CompactableImpl implements Compactable {
         selectStatus = FileSelectionStatus.NOT_ACTIVE;
         log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus);
       } else if (selectStatus == FileSelectionStatus.RESERVED) {
-        selectedFiles.add(newFile);
+        if (newFile.isPresent()) {
+          selectedFiles.add(newFile.get());
+        }
         log.trace("Compacted subset of selected files {} {} -> {}", getExtent(),
-            asFileNames(jobFiles), newFile.getFileName());
+            asFileNames(jobFiles), newFile.orElse(null));
       } else {
         log.debug("Canceled selected compaction completed {} but others still running ",
             getExtent());
@@ -1226,7 +1229,7 @@ public class CompactableImpl implements Compactable {
     // check is done after the file are exclusively reserved in this class to avoid race conditions.
     if (!tablet.getDatafiles().keySet().containsAll(cInfo.jobFiles)) {
       // The tablet does not know of all these files, so unreserve them.
-      completeCompaction(job, cInfo.jobFiles, null);
+      completeCompaction(job, cInfo.jobFiles, Optional.empty(), true);
       return Optional.empty();
     }
 
@@ -1234,10 +1237,12 @@ public class CompactableImpl implements Compactable {
   }
 
   private void completeCompaction(CompactionJob job, Set<StoredTabletFile> jobFiles,
-      StoredTabletFile metaFile) {
+      Optional<StoredTabletFile> metaFile, boolean successful) {
     synchronized (this) {
       Preconditions.checkState(removeJob(job));
-      fileMgr.completed(job, jobFiles, metaFile);
+      if (successful) {
+        fileMgr.completed(job, jobFiles, metaFile);
+      }
 
       if (!compactionRunning) {
         notifyAll();
@@ -1257,11 +1262,14 @@ public class CompactableImpl implements Compactable {
       return;
 
     var cInfo = ocInfo.get();
-    StoredTabletFile newFile = null;
+    Optional<StoredTabletFile> newFile = Optional.empty();
     long startTime = System.currentTimeMillis();
     CompactionKind kind = job.getKind();
 
     CompactionStats stats = new CompactionStats();
+
+    boolean successful = false;
+
     try {
       TabletLogger.compacting(getExtent(), job, cInfo.localCompactionCfg);
       tablet.incrementStatusMajor();
@@ -1278,14 +1286,16 @@ public class CompactableImpl implements Compactable {
       newFile = CompactableUtils.bringOnline(tablet.getDatafileManager(), cInfo, stats,
           compactFiles, allFiles, kind, tmpFileName);
 
-      TabletLogger.compacted(getExtent(), job, newFile);
+      TabletLogger.compacted(getExtent(), job, newFile.orElse(null));
+
+      successful = true;
     } catch (CompactionCanceledException cce) {
       log.debug("Compaction canceled {} ", getExtent());
     } catch (Exception e) {
-      newFile = null;
+      newFile = Optional.empty();
       throw new RuntimeException(e);
     } finally {
-      completeCompaction(job, cInfo.jobFiles, newFile);
+      completeCompaction(job, cInfo.jobFiles, newFile, successful);
       tablet.updateTimer(MAJOR, queuedTime, startTime, stats.getEntriesRead(), newFile == null);
     }
   }
@@ -1334,7 +1344,7 @@ public class CompactableImpl implements Compactable {
 
     } catch (Exception e) {
       externalCompactions.remove(externalCompactionId);
-      completeCompaction(job, cInfo.jobFiles, null);
+      completeCompaction(job, cInfo.jobFiles, Optional.empty(), false);
       throw new RuntimeException(e);
     }
   }
@@ -1357,22 +1367,25 @@ public class CompactableImpl implements Compactable {
 
       ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId);
 
+      boolean successful = false;
+
       if (ecInfo != null) {
         log.debug("Attempting to commit external compaction {}", extCompactionId);
-        StoredTabletFile metaFile = null;
+        Optional<StoredTabletFile> metaFile = Optional.empty();
         try {
           metaFile =
               tablet.getDatafileManager().bringMajorCompactionOnline(ecInfo.meta.getJobFiles(),
                   ecInfo.meta.getCompactTmpName(), ecInfo.meta.getCompactionId(),
                   Sets.union(ecInfo.meta.getJobFiles(), ecInfo.meta.getNextFiles()),
                   new DataFileValue(fileSize, entries), Optional.of(extCompactionId));
-          TabletLogger.compacted(getExtent(), ecInfo.job, metaFile);
+          TabletLogger.compacted(getExtent(), ecInfo.job, metaFile.orElse(null));
+          successful = true;
         } catch (Exception e) {
-          metaFile = null;
+          metaFile = Optional.empty();
           log.error("Error committing external compaction {}", extCompactionId, e);
           throw new RuntimeException(e);
         } finally {
-          completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), metaFile);
+          completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), metaFile, successful);
           externalCompactions.remove(extCompactionId);
           log.debug("Completed commit of external compaction {}", extCompactionId);
         }
@@ -1408,7 +1421,7 @@ public class CompactableImpl implements Compactable {
       if (ecInfo != null) {
         tablet.getContext().getAmple().mutateTablet(getExtent()).deleteExternalCompaction(ecid)
             .mutate();
-        completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), null);
+        completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), Optional.empty(), false);
         externalCompactions.remove(ecid);
         log.debug("Processed external compaction failure {}", ecid);
       } else {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index 645afb196d..327af8d14a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -570,7 +570,7 @@ public class CompactableUtils {
   /**
    * Finish major compaction by bringing the new file online and returning the completed file.
    */
-  static StoredTabletFile bringOnline(DatafileManager datafileManager,
+  static Optional<StoredTabletFile> bringOnline(DatafileManager datafileManager,
       CompactableImpl.CompactionInfo cInfo, CompactionStats stats,
       Map<StoredTabletFile,DataFileValue> compactFiles,
       SortedMap<StoredTabletFile,DataFileValue> allFiles, CompactionKind kind,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index f2d8d2d764..517234cde4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -426,7 +426,7 @@ class DatafileManager {
     return newFile;
   }
 
-  StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,
+  Optional<StoredTabletFile> bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,
       TabletFile tmpDatafile, Long compactionId, Set<StoredTabletFile> selectedFiles,
       DataFileValue dfv, Optional<ExternalCompactionId> ecid) throws IOException {
     final KeyExtent extent = tablet.getExtent();
@@ -449,9 +449,14 @@ class DatafileManager {
     }
 
     TServerInstance lastLocation = null;
-    // calling insert to get the new file before inserting into the metadata
-    StoredTabletFile newFile = newDatafile.insert();
+    Optional<StoredTabletFile> newFile;
 
+    if (dfv.getNumEntries() > 0) {
+      // calling insert to get the new file before inserting into the metadata
+      newFile = Optional.of(newDatafile.insert());
+    } else {
+      newFile = Optional.empty();
+    }
     Long compactionIdToWrite = null;
 
     // increment start count before metadata update AND updating in memory map of files
@@ -465,8 +470,8 @@ class DatafileManager {
         Preconditions.checkState(datafileSizes.keySet().containsAll(oldDatafiles),
             "Compacted files %s are not a subset of tablet files %s", oldDatafiles,
             datafileSizes.keySet());
-        if (dfv.getNumEntries() > 0) {
-          Preconditions.checkState(!datafileSizes.containsKey(newFile),
+        if (newFile.isPresent()) {
+          Preconditions.checkState(!datafileSizes.containsKey(newFile.get()),
               "New compaction file %s already exist in tablet files %s", newFile,
               datafileSizes.keySet());
         }
@@ -475,8 +480,8 @@ class DatafileManager {
 
         datafileSizes.keySet().removeAll(oldDatafiles);
 
-        if (dfv.getNumEntries() > 0) {
-          datafileSizes.put(newFile, dfv);
+        if (newFile.isPresent()) {
+          datafileSizes.put(newFile.get(), dfv);
           // could be used by a follow on compaction in a multipass compaction
         }
 
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
index 9ebb1fcd6b..44bf56f3a2 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
@@ -428,7 +428,7 @@ public class CompactableImplFileManagerTest {
     }
 
     void completed(TestCompactionJob job, StoredTabletFile newFile) {
-      super.completed(job, job.getSTFiles(), newFile);
+      super.completed(job, job.getSTFiles(), Optional.ofNullable(newFile));
     }
 
     @Override
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 76e594559f..42e374f277 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -61,6 +61,7 @@ import org.apache.accumulo.core.iterators.Filter;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.GrepIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
@@ -494,6 +495,56 @@ public class CompactionIT extends AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void testMultiStepCompactionThatDeletesAll() throws Exception {
+
+    // There was a bug where user compactions would never complete when : the tablet had to be
+    // compacted in multiple passes AND the intermediate passes produced no output.
+
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+      final String tableName = getUniqueNames(1)[0];
+      c.tableOperations().create(tableName);
+      c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100.0");
+
+      var beforeCount = countFiles(c);
+
+      final int NUM_ENTRIES_AND_FILES = 60;
+
+      try (var writer = c.createBatchWriter(tableName)) {
+        for (int i = 0; i < NUM_ENTRIES_AND_FILES; i++) {
+          Mutation m = new Mutation("r" + i);
+          m.put("f1", "q1", "v" + i);
+          writer.addMutation(m);
+          writer.flush();
+          c.tableOperations().flush(tableName, null, null, true);
+        }
+      }
+
+      try (var scanner = c.createScanner(tableName)) {
+        assertEquals(NUM_ENTRIES_AND_FILES, scanner.stream().count());
+      }
+
+      var afterCount = countFiles(c);
+
+      assertTrue(afterCount >= beforeCount + NUM_ENTRIES_AND_FILES);
+
+      CompactionConfig comactionConfig = new CompactionConfig();
+      // configure an iterator that drops all data
+      IteratorSetting iter = new IteratorSetting(100, GrepIterator.class);
+      GrepIterator.setTerm(iter, "keep");
+      comactionConfig.setIterators(List.of(iter));
+      comactionConfig.setWait(true);
+      c.tableOperations().compact(tableName, comactionConfig);
+
+      try (var scanner = c.createScanner(tableName)) {
+        assertEquals(0, scanner.stream().count());
+      }
+
+      var finalCount = countFiles(c);
+      assertTrue(finalCount <= beforeCount);
+    }
+  }
+
   private int countFiles(AccumuloClient c) throws Exception {
     try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
       s.fetchColumnFamily(new Text(TabletColumnFamily.NAME));