You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2022/04/14 22:50:19 UTC

[GitHub] [hbase] joshelser commented on a diff in pull request #4338: HBASE-26938 Compaction failures after StoreFileTracker integration

joshelser commented on code in PR #4338:
URL: https://github.com/apache/hbase/pull/4338#discussion_r850866084


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -550,22 +551,63 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
         dropDeletesFromRow, dropDeletesToRow);
   }
 
-  public List<Path> getCompactionTargets() {
-    T writer = this.writer;
-    if (writer == null) {
-      return Collections.emptyList();
+  /**
+   * Return the progress for a given compaction request.
+   * @param request the compaction request
+   */
+  public CompactionProgress getProgress(CompactionRequestImpl request) {
+    return progressMap.get(request);
+  }
+
+  /**
+   * Return the aggregate progress for all currently active compactions.
+   */
+  public CompactionProgress getProgress() {
+    synchronized (progressMap) {
+      long totalCompactingKVs = 0;
+      long currentCompactedKVs = 0;
+      long totalCompactedSize = 0;
+      for (CompactionProgress progress: progressMap.values()) {
+        totalCompactingKVs += progress.totalCompactingKVs;
+        currentCompactedKVs += progress.currentCompactedKVs;
+        totalCompactedSize += progress.totalCompactedSize;
+      }
+      CompactionProgress result = new CompactionProgress(totalCompactingKVs);
+      result.currentCompactedKVs = currentCompactedKVs;
+      result.totalCompactedSize = totalCompactedSize;
+      return result;
     }
-    if (writer instanceof StoreFileWriter) {
-      return Arrays.asList(((StoreFileWriter) writer).getPath());
+  }
+
+  public boolean isCompacting() {
+    return !progressMap.isEmpty();
+  }
+
+  /**
+   * Return the list of target files for all currently active compactions.
+   */
+  public List<Path> getCompactionTargets() {
+    // Build a list of all the compaction targets for all active writers
+    List<Path> targets = new ArrayList<>();
+    synchronized (writerMap) {
+      for (T writer: writerMap.values()) {
+        if (writer instanceof StoreFileWriter) {
+          targets.add(((StoreFileWriter) writer).getPath());
+        } else {
+          ((AbstractMultiFileWriter) writer).writers().stream()
+            .forEach(sfw -> targets.add(sfw.getPath()));
+        }
+      }
     }
-    return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath())
-      .collect(Collectors.toList());
+    return targets;
   }
 
   /**
-   * Reset the Writer when the new storefiles were successfully added
+   * Complete the compaction after the new storefiles are successfully added.
    */
-  public void resetWriter(){
-    writer = null;
+  public void completeCompaction(CompactionRequestImpl request) {
+    writerMap.remove(request);
+    progressMap.remove(request);

Review Comment:
   I'm assuming that you're generally OK having these two synchronized data structures updated not under the same lock? Given the current use of `writerMap` and `progressMap`, nothing is jumping out at me as bad as you currently have it. Just thinking out loud.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -550,22 +551,63 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
         dropDeletesFromRow, dropDeletesToRow);
   }
 
-  public List<Path> getCompactionTargets() {
-    T writer = this.writer;
-    if (writer == null) {
-      return Collections.emptyList();
+  /**
+   * Return the progress for a given compaction request.
+   * @param request the compaction request
+   */
+  public CompactionProgress getProgress(CompactionRequestImpl request) {
+    return progressMap.get(request);
+  }
+
+  /**
+   * Return the aggregate progress for all currently active compactions.
+   */
+  public CompactionProgress getProgress() {
+    synchronized (progressMap) {
+      long totalCompactingKVs = 0;
+      long currentCompactedKVs = 0;
+      long totalCompactedSize = 0;
+      for (CompactionProgress progress: progressMap.values()) {
+        totalCompactingKVs += progress.totalCompactingKVs;
+        currentCompactedKVs += progress.currentCompactedKVs;
+        totalCompactedSize += progress.totalCompactedSize;
+      }
+      CompactionProgress result = new CompactionProgress(totalCompactingKVs);
+      result.currentCompactedKVs = currentCompactedKVs;
+      result.totalCompactedSize = totalCompactedSize;
+      return result;
     }
-    if (writer instanceof StoreFileWriter) {
-      return Arrays.asList(((StoreFileWriter) writer).getPath());
+  }
+
+  public boolean isCompacting() {
+    return !progressMap.isEmpty();
+  }
+
+  /**
+   * Return the list of target files for all currently active compactions.
+   */
+  public List<Path> getCompactionTargets() {
+    // Build a list of all the compaction targets for all active writers
+    List<Path> targets = new ArrayList<>();
+    synchronized (writerMap) {
+      for (T writer: writerMap.values()) {
+        if (writer instanceof StoreFileWriter) {
+          targets.add(((StoreFileWriter) writer).getPath());
+        } else {
+          ((AbstractMultiFileWriter) writer).writers().stream()

Review Comment:
   Suggest: make this an `else if (writer instance of AbstractMultiFileWriter)` to be defensive and have the `else` branch to fail loudly if something goes wrong.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -344,6 +340,9 @@ protected final List<Path> compact(final CompactionRequestImpl request,
     boolean finished = false;
     List<StoreFileScanner> scanners =
       createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
+    T writer = null;
+    CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
+    progressMap.put(request, progress);

Review Comment:
   Could warn if the return value from `put(...)` was anything but `null` (i.e. we should not have a progress for this `CompactionRequestImpl`)



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -550,22 +551,63 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
         dropDeletesFromRow, dropDeletesToRow);
   }
 
-  public List<Path> getCompactionTargets() {
-    T writer = this.writer;
-    if (writer == null) {
-      return Collections.emptyList();
+  /**
+   * Return the progress for a given compaction request.
+   * @param request the compaction request
+   */
+  public CompactionProgress getProgress(CompactionRequestImpl request) {
+    return progressMap.get(request);
+  }
+
+  /**
+   * Return the aggregate progress for all currently active compactions.
+   */
+  public CompactionProgress getProgress() {
+    synchronized (progressMap) {
+      long totalCompactingKVs = 0;
+      long currentCompactedKVs = 0;
+      long totalCompactedSize = 0;
+      for (CompactionProgress progress: progressMap.values()) {
+        totalCompactingKVs += progress.totalCompactingKVs;
+        currentCompactedKVs += progress.currentCompactedKVs;
+        totalCompactedSize += progress.totalCompactedSize;
+      }
+      CompactionProgress result = new CompactionProgress(totalCompactingKVs);
+      result.currentCompactedKVs = currentCompactedKVs;
+      result.totalCompactedSize = totalCompactedSize;
+      return result;
     }
-    if (writer instanceof StoreFileWriter) {
-      return Arrays.asList(((StoreFileWriter) writer).getPath());
+  }
+
+  public boolean isCompacting() {
+    return !progressMap.isEmpty();
+  }
+
+  /**
+   * Return the list of target files for all currently active compactions.
+   */
+  public List<Path> getCompactionTargets() {
+    // Build a list of all the compaction targets for all active writers
+    List<Path> targets = new ArrayList<>();
+    synchronized (writerMap) {
+      for (T writer: writerMap.values()) {
+        if (writer instanceof StoreFileWriter) {
+          targets.add(((StoreFileWriter) writer).getPath());
+        } else {
+          ((AbstractMultiFileWriter) writer).writers().stream()
+            .forEach(sfw -> targets.add(sfw.getPath()));
+        }
+      }
     }
-    return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath())
-      .collect(Collectors.toList());
+    return targets;
   }
 
   /**
-   * Reset the Writer when the new storefiles were successfully added
+   * Complete the compaction after the new storefiles are successfully added.
    */
-  public void resetWriter(){
-    writer = null;
+  public void completeCompaction(CompactionRequestImpl request) {
+    writerMap.remove(request);
+    progressMap.remove(request);

Review Comment:
   Do we care if either of the `remove()` calls returns `null` (i.e. the map doesn't have the mapping in it)?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -381,34 +376,40 @@ protected final List<Path> compact(final CompactionRequestImpl request,
       } else {
         Closeables.close(scanner, true);
       }
-      if (!finished && writer != null) {
-        abortWriter();
+      if (!finished) {
+        if (writer != null) {
+          abortWriter(writer);
+        }
+        // This signals that the target file is no longer written and can be cleaned up
+        completeCompaction(request);

Review Comment:
   Clarifying: we only need to call completeCompaction in the exceptional case (compaction didn't finish normally)? And `commitWriter()` down below is doing the same kind of cleanup work that `completeCompaction()` here is doing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hbase.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org