You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "swamirishi (via GitHub)" <gi...@apache.org> on 2023/10/02 22:45:05 UTC

Re: [PR] HDDS-9311. Use compationLogTable to store compaction information [ozone]

swamirishi commented on code in PR #5317:
URL: https://github.com/apache/ozone/pull/5317#discussion_r1343157842


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -740,20 +686,20 @@ public HashSet<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB) {
   }
 
   /**
-   * Process each line of compaction log text file input and populate the DAG.
+   * Process log line of compaction log text file input and populate the DAG.
+   * It also adds the compaction log entry to compaction log table.
    */
   void processCompactionLogLine(String line) {
 
     LOG.debug("Processing line: {}", line);
 
     synchronized (this) {
-      if (line.startsWith("#")) {
-        // Skip comments
-        LOG.debug("Comment line, skipped");
+      if (line.startsWith(COMPACTION_LOG_COMMENT_LINE_PREFIX)) {
+        reconstructionCompactionReason =
+            line.substring(COMPACTION_LOG_COMMENT_LINE_PREFIX.length());
       } else if (line.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
-        SnapshotLogInfo snapshotLogInfo = getSnapshotLogInfo(line);
-        reconstructionSnapshotGeneration = snapshotLogInfo.snapshotGenerationId;
-        reconstructionLastSnapshotID = snapshotLogInfo.snapshotId;
+        reconstructionSnapshotCreationTime =
+            getSnapshotCreationTimeFromLogLing(line);

Review Comment:
   I believe the function name has a typo



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -605,19 +530,40 @@ public void onCompactionCompleted(RocksDB db,
 
           waitForTarballCreation();
 
-          // Write input and output file names to compaction log
-          appendToCurrentCompactionLog(content);
+
+          // Add the compaction log entry to Compaction log table.
+          addToCompactionLogTable(compactionLogEntry);
 
           // Populate the DAG
           // TODO: [SNAPSHOT] Once SnapshotChainManager is put into use,
           //  set snapshotID to snapshotChainManager.getLatestGlobalSnapshot()
-          populateCompactionDAG(inputFiles, outputFiles, null,
+          populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
+              compactionLogEntry.getOutputFileInfoList(), null,
               db.getLatestSequenceNumber());
         }
       }
     };
   }
 
+  @VisibleForTesting
+  void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) {
+    // Key in the transactionId-currentTime
+    // Just trxId can't be used because multiple compaction might be
+    // running, and it is possible no new entry was added to DB.
+    // Adding current time to transactionId eliminates key collision.
+    String keyString = compactionLogEntry.getDbSequenceNumber() + "-" +

Review Comment:
   Add leading zeroes to the dbSequenceNumber here.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -605,19 +530,40 @@ public void onCompactionCompleted(RocksDB db,
 
           waitForTarballCreation();
 
-          // Write input and output file names to compaction log
-          appendToCurrentCompactionLog(content);
+
+          // Add the compaction log entry to Compaction log table.
+          addToCompactionLogTable(compactionLogEntry);
 
           // Populate the DAG
           // TODO: [SNAPSHOT] Once SnapshotChainManager is put into use,
           //  set snapshotID to snapshotChainManager.getLatestGlobalSnapshot()
-          populateCompactionDAG(inputFiles, outputFiles, null,
+          populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
+              compactionLogEntry.getOutputFileInfoList(), null,
               db.getLatestSequenceNumber());
         }
       }
     };
   }
 
+  @VisibleForTesting
+  void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) {
+    // Key in the transactionId-currentTime
+    // Just trxId can't be used because multiple compaction might be
+    // running, and it is possible no new entry was added to DB.
+    // Adding current time to transactionId eliminates key collision.
+    String keyString = compactionLogEntry.getDbSequenceNumber() + "-" +

Review Comment:
   Otherwise lexicographic ordering will not be same as db sequence number.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -799,24 +749,20 @@ private void readCompactionLogToDAG(String currCompactionLogPath) {
     }
   }
 
-  /**
-   * Load existing compaction log files to the in-memory DAG.
-   * This only needs to be done once during OM startup.
-   */
-  public void loadAllCompactionLogs() {
+  public void addEntriesFromLogFilesToDagAndCompactionLogTable() {
     synchronized (this) {
-      if (compactionLogDir == null) {
-        throw new RuntimeException("Compaction log directory must be set " +
-            "first");
-      }
-      reconstructionSnapshotGeneration = 0L;
+      reconstructionSnapshotCreationTime = 0L;
+      reconstructionCompactionReason = null;
       try {
         try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
             .filter(e -> e.toString().toLowerCase()
                 .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
             .sorted()) {
           for (Path logPath : pathStream.collect(Collectors.toList())) {
-            readCompactionLogToDAG(logPath.toString());
+            readCompactionLogFile(logPath.toString());
+            // Delete the file once entries are added to compaction table
+            // so that on next restart, only compaction log table is used.
+            Files.delete(logPath);

Review Comment:
   Can we do a soft delete here & do a hard deletion from some place else with a ttl



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1185,85 +1194,72 @@ public void pruneOlderSnapshotsWithCompactionHistory() {
 
     try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
       removeSstFiles(sstFileNodesRemoved);
-      deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+      removeKeyFromCompactionLogTable(keysToRemove);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
   }
 
-  /**
-   * Deletes the SST files from the backup directory if exists.
-   */
-  private void removeSstFiles(Set<String> sstFileNodes) {
-    for (String sstFileNode: sstFileNodes) {
-      File file =
-          new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
-      try {
-        Files.deleteIfExists(file.toPath());
-      } catch (IOException exception) {
-        LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
-      }
-    }
-  }
 
   /**
-   * Returns the list of compaction log files which are older than allowed
-   * max time in the compaction DAG.
+   * Returns the list of input files from the compaction entries which are
+   * older than the maximum allowed in the compaction DAG.
    */
-  private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+  private synchronized Pair<List<String>, List<byte[]>> getOlderFileNodes() {
     long compactionLogPruneStartTime = System.currentTimeMillis();

Review Comment:
   From what I understand, we can prune dag based on db sequence number instead of ttl configuration. We can start a discussion thread around this.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1353,29 +1349,7 @@ Set<String> pruneForwardDag(MutableGraph<CompactionNode> forwardDag,
     return removedFiles;
   }
 
-  private SnapshotLogInfo getSnapshotInfoFromLog(Path compactionLogFile) {
-    AtomicReference<SnapshotLogInfo> snapshotLogInfo =
-        new AtomicReference<>();
-    try (Stream<String> logStream = Files.lines(compactionLogFile, UTF_8)) {
-      logStream.forEach(logLine -> {
-        if (!logLine.startsWith(COMPACTION_LOG_SEQ_NUM_LINE_PREFIX)) {
-          return;
-        }
-
-        snapshotLogInfo.set(getSnapshotLogInfo(logLine));
-      });
-    } catch (IOException exception) {
-      throw new RuntimeException("Failed to read compaction log file: " +
-          compactionLogFile, exception);
-    }
-
-    return snapshotLogInfo.get();
-  }
-
-  /**
-   * Converts a snapshot compaction log line to SnapshotLogInfo.
-   */
-  private SnapshotLogInfo getSnapshotLogInfo(String logLine) {
+  private long getSnapshotCreationTimeFromLogLing(String logLine) {

Review Comment:
   typo in function name



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -799,24 +749,20 @@ private void readCompactionLogToDAG(String currCompactionLogPath) {
     }
   }
 
-  /**
-   * Load existing compaction log files to the in-memory DAG.
-   * This only needs to be done once during OM startup.
-   */
-  public void loadAllCompactionLogs() {
+  public void addEntriesFromLogFilesToDagAndCompactionLogTable() {
     synchronized (this) {
-      if (compactionLogDir == null) {
-        throw new RuntimeException("Compaction log directory must be set " +
-            "first");
-      }
-      reconstructionSnapshotGeneration = 0L;
+      reconstructionSnapshotCreationTime = 0L;
+      reconstructionCompactionReason = null;
       try {
         try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
             .filter(e -> e.toString().toLowerCase()
                 .endsWith(COMPACTION_LOG_FILE_NAME_SUFFIX))
             .sorted()) {
           for (Path logPath : pathStream.collect(Collectors.toList())) {
-            readCompactionLogToDAG(logPath.toString());
+            readCompactionLogFile(logPath.toString());
+            // Delete the file once entries are added to compaction table
+            // so that on next restart, only compaction log table is used.
+            Files.delete(logPath);

Review Comment:
   Something like this https://stackoverflow.com/questions/43483750/how-guarantee-the-file-will-be-deleted-after-automatically-some-time



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1185,85 +1194,72 @@ public void pruneOlderSnapshotsWithCompactionHistory() {
 
     try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
       removeSstFiles(sstFileNodesRemoved);
-      deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
+      removeKeyFromCompactionLogTable(keysToRemove);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
   }
 
-  /**
-   * Deletes the SST files from the backup directory if exists.
-   */
-  private void removeSstFiles(Set<String> sstFileNodes) {
-    for (String sstFileNode: sstFileNodes) {
-      File file =
-          new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
-      try {
-        Files.deleteIfExists(file.toPath());
-      } catch (IOException exception) {
-        LOG.warn("Failed to delete SST file: " + sstFileNode, exception);
-      }
-    }
-  }
 
   /**
-   * Returns the list of compaction log files which are older than allowed
-   * max time in the compaction DAG.
+   * Returns the list of input files from the compaction entries which are
+   * older than the maximum allowed in the compaction DAG.
    */
-  private List<Path> getOlderSnapshotsCompactionLogFilePaths() {
+  private synchronized Pair<List<String>, List<byte[]>> getOlderFileNodes() {

Review Comment:
   ```suggestion
     private synchronized Pair<Set<String>, List<byte[]>> getOlderFileNodes() {
   ```
   Return Set of string instead of list



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1135,28 +1118,55 @@ private CompactionNode addNodeToDAG(String file, String snapshotID,
    *                   arbitrary String as long as it helps debugging.
    * @param seqNum DB transaction sequence number.
    */
-  private void populateCompactionDAG(List<String> inputFiles,
-      List<String> outputFiles, String snapshotId, long seqNum) {
+  private void populateCompactionDAG(List<CompactionFileInfo> inputFiles,
+                                     List<CompactionFileInfo> outputFiles,
+                                     String snapshotId,

Review Comment:
   This is not being used anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org