You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/11/03 16:42:08 UTC
[ozone] branch HDDS-6517-Snapshot updated: HDDS-7281. [Snapshot] Handle RocksDB compaction DAG persistence and reconstruction (#3824)
This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
new e642ddeb55 HDDS-7281. [Snapshot] Handle RocksDB compaction DAG persistence and reconstruction (#3824)
e642ddeb55 is described below
commit e642ddeb55edd2bd2580a7ce6a1d30c0e081b1a3
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Thu Nov 3 09:42:02 2022 -0700
HDDS-7281. [Snapshot] Handle RocksDB compaction DAG persistence and reconstruction (#3824)
---
.../hadoop/hdds/utils/db/DBStoreBuilder.java | 9 +-
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 46 +-
hadoop-hdds/rocksdb-checkpoint-differ/pom.xml | 32 +-
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 1340 +++++++++++---------
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 371 ++++--
.../src/test/resources/log4j.properties | 21 +
.../hadoop/ozone/om/helpers/SnapshotInfo.java | 12 +
.../hadoop/ozone/freon/TestOMSnapshotDAG.java | 220 ++++
.../src/main/proto/OmClientProtocol.proto | 23 +-
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 3 +-
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 23 +-
.../request/snapshot/OMSnapshotCreateRequest.java | 8 +
12 files changed, 1397 insertions(+), 711 deletions(-)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 5478e8658a..9f94e1d2c5 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -92,6 +92,7 @@ public final class DBStoreBuilder {
private boolean openReadOnly = false;
private int maxFSSnapshots = 0;
private final DBProfile defaultCfProfile;
+ private boolean enableCompactionLog;
/**
* Create DBStoreBuilder from a generic DBDefinition.
@@ -190,7 +191,8 @@ public final class DBStoreBuilder {
}
return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
- registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName);
+ registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName,
+ enableCompactionLog);
} finally {
tableConfigs.forEach(TableConfig::close);
}
@@ -247,6 +249,11 @@ public final class DBStoreBuilder {
return this;
}
+ public DBStoreBuilder setEnableCompactionLog(boolean enableCompactionLog) {
+ this.enableCompactionLog = enableCompactionLog;
+ return this;
+ }
+
/**
* Set the {@link ManagedDBOptions} and default
* {@link ManagedColumnFamilyOptions} based on {@code prof}.
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 9fabf5e9df..992627c4cf 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -64,19 +64,30 @@ public class RDBStore implements DBStore {
private final RDBMetrics rdbMetrics;
private final RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
private final String dbJmxBeanName;
+ /**
+ * Name of the SST file backup directory placed under metadata dir.
+ * Can be made configurable later.
+ */
+ private final String dbCompactionSSTBackupDirName = "compaction-sst-backup";
+ /**
+ * Name of the compaction log directory placed under metadata dir.
+ * Can be made configurable later.
+ */
+ private final String dbCompactionLogDirName = "compaction-log";
@VisibleForTesting
public RDBStore(File dbFile, ManagedDBOptions options,
Set<TableConfig> families) throws IOException {
this(dbFile, options, new ManagedWriteOptions(), families,
- new CodecRegistry(), false, 1000, null);
+ new CodecRegistry(), false, 1000, null, false);
}
@SuppressWarnings("parameternumber")
public RDBStore(File dbFile, ManagedDBOptions dbOptions,
ManagedWriteOptions writeOptions, Set<TableConfig> families,
CodecRegistry registry, boolean readOnly, int maxFSSnapshots,
- String dbJmxBeanNameName) throws IOException {
+ String dbJmxBeanNameName, boolean enableCompactionLog)
+ throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
Preconditions.checkArgument(!families.isEmpty());
@@ -86,13 +97,14 @@ public class RDBStore implements DBStore {
dbJmxBeanNameName;
try {
- rocksDBCheckpointDiffer =
- new RocksDBCheckpointDiffer(
- dbLocation.getAbsolutePath(), maxFSSnapshots,
- Paths.get(dbLocation.getParent(), "db.checkpoints").toString(),
- Paths.get(dbLocation.getParent(), "db.savedSSTFiles").toString(),
- dbLocation.getAbsolutePath(), 0, "Snapshot_");
- rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
+ if (enableCompactionLog) {
+ rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(
+ dbLocation.getParent(), dbCompactionSSTBackupDirName,
+ dbCompactionLogDirName, dbLocation);
+ rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
+ } else {
+ rocksDBCheckpointDiffer = null;
+ }
db = RocksDatabase.open(dbFile, dbOptions, writeOptions,
families, readOnly);
@@ -114,7 +126,7 @@ public class RDBStore implements DBStore {
//create checkpoints directory if not exists.
checkpointsParentDir =
- Paths.get(dbLocation.getParent(), "db.checkpoints").toString();
+ Paths.get(dbLocation.getParent(), "db.checkpoints").toString();
File checkpointsDir = new File(checkpointsParentDir);
if (!checkpointsDir.exists()) {
boolean success = checkpointsDir.mkdir();
@@ -138,6 +150,15 @@ public class RDBStore implements DBStore {
}
}
+ if (enableCompactionLog) {
+ // Finish the initialization of compaction DAG tracker by setting the
+ // sequence number as current compaction log filename.
+ rocksDBCheckpointDiffer.setCurrentCompactionLog(
+ db.getLatestSequenceNumber());
+ // Load all previous compaction logs
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
+ }
+
//Initialize checkpoint manager
checkPointManager = new RDBCheckpointManager(db, dbLocation.getName());
rdbMetrics = RDBMetrics.create();
@@ -160,7 +181,10 @@ public class RDBStore implements DBStore {
}
}
- @VisibleForTesting
+ public String getSnapshotsParentDir() {
+ return snapshotsParentDir;
+ }
+
public RocksDBCheckpointDiffer getRocksDBCheckpointDiffer() {
return rocksDBCheckpointDiffer;
}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index 85a56f8cf6..568a6cc6f8 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -22,6 +22,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>hdds</artifactId>
<version>1.3.0-SNAPSHOT</version>
</parent>
+
<artifactId>rocksdb-checkpoint-differ</artifactId>
<version>1.3.0-SNAPSHOT</version>
<description>RocksDB Checkpoint Differ</description>
@@ -29,50 +30,56 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<packaging>jar</packaging>
<dependencies>
-
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
-
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
-
<dependency>
<groupId>com.github.vlsi.mxgraph</groupId>
<artifactId>jgraphx</artifactId>
<version>4.2.2</version>
</dependency>
-
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
<version>1.5.0</version>
</dependency>
-
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-guava</artifactId>
<version>1.5.0</version>
</dependency>
-
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-ext</artifactId>
<version>1.4.0</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<scope>compile</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-test-utils</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -145,6 +152,9 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<allowedImports>
<allowedImport>org.rocksdb.AbstractEventListener</allowedImport>
<allowedImport>org.rocksdb.Checkpoint</allowedImport>
+ <allowedImport>org.rocksdb.ColumnFamilyDescriptor</allowedImport>
+ <allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>
+ <allowedImport>org.rocksdb.ColumnFamilyOptions</allowedImport>
<allowedImport>org.rocksdb.CompactionJobInfo</allowedImport>
<allowedImport>org.rocksdb.CompressionType</allowedImport>
<allowedImport>org.rocksdb.DBOptions</allowedImport>
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 32e9e268c0..26884fb9fb 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -17,70 +17,48 @@
*/
package org.apache.ozone.rocksdiff;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+import org.apache.commons.lang3.StringUtils;
+import org.rocksdb.AbstractEventListener;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionJobInfo;
+import org.rocksdb.DBOptions;
+import org.rocksdb.LiveFileMetaData;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileReader;
+import org.rocksdb.TableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
-import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
-
-import org.rocksdb.AbstractEventListener;
-import org.rocksdb.Checkpoint;
-import org.rocksdb.CompactionJobInfo;
-import org.rocksdb.CompressionType;
-import org.rocksdb.DBOptions;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.LiveFileMetaData;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.SstFileReader;
-import org.rocksdb.TableProperties;
-
-import com.google.common.graph.GraphBuilder;
-import com.google.common.graph.MutableGraph;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
// TODO
-// 1. Create a local instance of RocksDiff-local-RocksDB. This is the
-// rocksDB that we can use for maintaining DAG and any other state. This is
-// a per node state so it it doesn't have to go through RATIS anyway.
-// 2. Store fwd DAG in Diff-Local-RocksDB in Compaction Listener
-// 3. Store fwd DAG in Diff-Local-RocksDB in Compaction Listener
-// 4. Store last-Snapshot-counter/Compaction-generation-counter in Diff-Local
-// -RocksDB in Compaction Listener
-// 5. System Restart handling. Read the DAG from Disk and load it in memory.
-// 6. Take the base snapshot. All the SST file nodes in the base snapshot
-// should be arked with that Snapshot generation. Subsequently, all SST file
-// node should have a snapshot-generation count and Compaction-generation
-// count.
-// 6. DAG based SST file pruning. Start from the oldest snapshot and we can
-// unlink any SST
-// file from the SaveCompactedFilePath directory that is reachable in the
-// reverse DAG.
-// 7. DAG pruning : For each snapshotted bucket, We can recycle the part of
-// the DAG that is older than the predefined policy for the efficient snapdiff.
-// E.g. we may decide not to support efficient snapdiff from any snapshot that
-// is older than 2 weeks.
-// Note on 8. & 9 .
-// ==================
-// A simple handling is to just iterate over all keys in keyspace when the
-// compaction DAG is lost, instead of optimizing every case. And start
-// Compaction-DAG afresh from the latest snapshot.
-// --
// 8. Handle bootstrapping rocksDB for a new OM follower node
// - new node will receive Active object store as well as all existing
// rocksDB checkpoints.
@@ -90,109 +68,204 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
// thise case.
// - Getting the DB sync. This case needs to handle getting the
// compaction-DAG information as well.
-//
-//
+
/**
- * RocksDBCheckpointDiffer class.
+ * RocksDB checkpoint differ.
+ * <p>
+ * Implements Ozone Manager RocksDB compaction listener (compaction log
+ * persistence and SST file hard-linking), compaction DAG construction,
+ * and compaction DAG reconstruction upon OM restarts.
+ * <p>
+ * It is important to note that compaction log is per-DB instance. Since
+ * each OM DB instance might trigger compactions at different timings.
*/
-//CHECKSTYLE:OFF
-@SuppressWarnings({"NM_METHOD_NAMING_CONVENTION"})
public class RocksDBCheckpointDiffer {
- private final String rocksDbPath;
- private String cpPath;
- private String cfDBPath;
- private String saveCompactedFilePath;
- private int maxSnapshots;
+
private static final Logger LOG =
LoggerFactory.getLogger(RocksDBCheckpointDiffer.class);
- // keeps track of all the snapshots created so far.
- private int lastSnapshotCounter;
- private String lastSnapshotPrefix;
-
- // tracks number of compactions so far
- private static final long UNKNOWN_COMPACTION_GEN = 0;
- private long currentCompactionGen = 0;
-
- // Something to track all the snapshots created so far.
- private Snapshot[] allSnapshots;
-
- public RocksDBCheckpointDiffer(String dbPath,
- int maxSnapshots,
- String checkpointPath,
- String sstFileSaveDir,
- String cfPath,
- int initialSnapshotCounter,
- String snapPrefix) {
- this.maxSnapshots = maxSnapshots;
- allSnapshots = new Snapshot[this.maxSnapshots];
- cpPath = checkpointPath;
-
- saveCompactedFilePath = sstFileSaveDir;
- rocksDbPath = dbPath;
- cfDBPath = cfPath;
-
- // TODO: This module should be self sufficient in tracking the last
- // snapshotCounter and currentCompactionGen for a given dbPath. It needs
- // to be persisted.
- lastSnapshotCounter = initialSnapshotCounter;
- lastSnapshotPrefix = snapPrefix;
- currentCompactionGen = lastSnapshotCounter;
-
- // TODO: this should also independently persist every compaction e.g.
- // (input files) ->
- // { (output files) + lastSnapshotCounter + currentCompactionGen }
- // mapping.
+ private final String sstBackupDir;
+ private final String activeDBLocationStr;
+
+ private String compactionLogDir = null;
+
+ /**
+ * Compaction log path for DB compaction history persistence.
+ * This is the source of truth for in-memory SST DAG reconstruction upon
+ * OM restarts.
+ * <p>
+ * Initialized to the latest sequence number on OM startup. The log also rolls
+ * over (gets appended to a new file) whenever an Ozone snapshot is taken.
+ */
+ private volatile String currentCompactionLogPath = null;
+
+ private static final String COMPACTION_LOG_FILENAME_SUFFIX = ".log";
+
+ /**
+ * Marks the beginning of a comment line in the compaction log.
+ */
+ private static final String COMPACTION_LOG_COMMENT_LINE_PREFIX = "# ";
+
+ /**
+ * Marks the beginning of a compaction log entry.
+ */
+ private static final String COMPACTION_LOG_ENTRY_LINE_PREFIX = "C ";
+
+ /**
+ * Prefix for the sequence number line when writing to compaction log
+ * right after taking an Ozone snapshot.
+ */
+ private static final String COMPACTION_LOG_SEQNUM_LINE_PREFIX = "S ";
+
+ /**
+ * SST file extension. Must be lower case.
+ * Used to trim the file extension when writing compaction entries to the log
+ * to save space.
+ */
+ private static final String SST_FILE_EXTENSION = ".sst";
+ private static final int SST_FILE_EXTENSION_LENGTH =
+ SST_FILE_EXTENSION.length();
+
+ private static final int LONG_MAX_STRLEN =
+ String.valueOf(Long.MAX_VALUE).length();
+
+ private long reconstructionSnapshotGeneration;
+
+ /**
+ * Dummy object that acts as a write lock in compaction listener.
+ */
+ private final Object compactionListenerWriteLock = new Object();
+
+ /**
+ * Constructor.
+ * Note that previous compaction logs are loaded by RDBStore after this
+ * object's initialization by calling loadAllCompactionLogs().
+ * @param metadataDir Ozone metadata directory.
+ * @param sstBackupDir Name of the SST backup dir under metadata dir.
+ * @param compactionLogDirName Name of the compaction log dir.
+ */
+ public RocksDBCheckpointDiffer(String metadataDir, String sstBackupDir,
+ String compactionLogDirName, File activeDBLocation) {
+
+ setCompactionLogDir(metadataDir, compactionLogDirName);
+
+ this.sstBackupDir = Paths.get(metadataDir, sstBackupDir) + "/";
+
+ // Create the directory if SST backup path does not already exist
+ File dir = new File(this.sstBackupDir);
+ if (!dir.exists() && !dir.mkdir()) {
+ final String errorMsg = "Failed to create SST file backup directory. "
+ + "Check if OM has write permission.";
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ // Active DB location is used in getSSTFileSummary
+ this.activeDBLocationStr = activeDBLocation.toString() + "/";
}
- // Node in the DAG to represent an SST file
- private class CompactionNode {
- public String fileName; // Name of the SST file
- public String snapshotId; // The last snapshot that was created before this
- // node came into existance;
- public long snapshotGeneration;
- public long totalNumberOfKeys;
- public long cumulativeKeysReverseTraversal;
-
- CompactionNode (String f, String sid, long numKeys, long compactionGen) {
- fileName = f;
- snapshotId = sid;
- snapshotGeneration = lastSnapshotCounter;
- totalNumberOfKeys = numKeys;
- cumulativeKeysReverseTraversal = 0;
+ private void setCompactionLogDir(String metadataDir,
+ String compactionLogDirName) {
+
+ final File parentDir = new File(metadataDir);
+ if (!parentDir.exists()) {
+ if (!parentDir.mkdir()) {
+ LOG.error("Error creating compaction log parent dir.");
+ return;
+ }
+ }
+
+ this.compactionLogDir =
+ Paths.get(metadataDir, compactionLogDirName).toString();
+ File clDir = new File(compactionLogDir);
+ if (!clDir.exists() && !clDir.mkdir()) {
+ LOG.error("Error creating compaction log dir.");
+ return;
}
+
+ // Create a readme file explaining what the compaction log dir is for
+ final Path readmePath = Paths.get(compactionLogDir, "_README.txt");
+ final File readmeFile = new File(readmePath.toString());
+ if (!readmeFile.exists()) {
+ try (BufferedWriter bw = Files.newBufferedWriter(
+ readmePath, StandardOpenOption.CREATE)) {
+ bw.write("This directory holds Ozone Manager RocksDB compaction logs.\n"
+ + "DO NOT add, change or delete any files in this directory unless "
+ + "you know what you are doing.\n");
+ } catch (IOException ignored) {
+ }
+ }
+
+ // Append /
+ this.compactionLogDir += "/";
}
- private static class Snapshot {
- String dbPath;
- String snapshotID;
- long snapshotGeneration;
+ /**
+ * Set the current compaction log filename with a given RDB sequence number.
+ * @param latestSequenceNum latest sequence number of RDB.
+ */
+ public void setCurrentCompactionLog(long latestSequenceNum) {
+ String latestSequenceIdStr = String.valueOf(latestSequenceNum);
+
+ if (latestSequenceIdStr.length() < LONG_MAX_STRLEN) {
+ // Pad zeroes to the left for ordered file listing when sorted
+ // alphabetically.
+ latestSequenceIdStr =
+ StringUtils.leftPad(latestSequenceIdStr, LONG_MAX_STRLEN, "0");
+ }
- Snapshot(String db, String id, long gen) {
- dbPath = db;
- snapshotID = id;
- snapshotGeneration = gen;
+ // Local temp variable for storing the new compaction log file path
+ final String newCompactionLog =
+ compactionLogDir + latestSequenceIdStr + COMPACTION_LOG_FILENAME_SUFFIX;
+
+ File clFile = new File(newCompactionLog);
+ if (clFile.exists()) {
+ LOG.warn("Compaction log exists: {}. Will append", newCompactionLog);
}
+
+ this.currentCompactionLogPath =
+ compactionLogDir + latestSequenceIdStr + COMPACTION_LOG_FILENAME_SUFFIX;
+
+ // Create empty file if it doesn't exist
+ appendToCurrentCompactionLog("");
}
- public enum GType {FNAME, KEYSIZE, CUMUTATIVE_SIZE};
+ // Node in the DAG to represent an SST file
+ private static class CompactionNode {
+ // Name of the SST file
+ private final String fileName;
+ // The last snapshot created before this node came into existence
+ private final String snapshotId;
+ private final long snapshotGeneration;
+ private final long totalNumberOfKeys;
+ private long cumulativeKeysReverseTraversal;
+
+ CompactionNode(String file, String ssId, long numKeys, long seqNum) {
+ fileName = file;
+ // Retained for debuggability. Unused for now.
+ snapshotId = ssId;
+ totalNumberOfKeys = numKeys;
+ snapshotGeneration = seqNum;
+ cumulativeKeysReverseTraversal = 0L;
+ }
+ @Override
+ public String toString() {
+ return String.format("Node{%s}", fileName);
+ }
+ }
- // Hash table to track Compaction node for a given SST File.
- private ConcurrentHashMap<String, CompactionNode> compactionNodeTable =
+ // Hash table to track CompactionNode for a given SST File.
+ private final ConcurrentHashMap<String, CompactionNode> compactionNodeMap =
new ConcurrentHashMap<>();
- // We are mainiting a two way DAG. This allows easy traversal from
+ // We are maintaining a two way DAG. This allows easy traversal from
// source snapshot to destination snapshot as well as the other direction.
- // TODO : Persist this information to the disk.
- // TODO: A system crash while the edge is inserted in DAGFwd but not in
- // DAGReverse will compromise the two way DAG. Set of input/output files shud
- // be written to // disk(RocksDB) first, would avoid this problem.
- private MutableGraph<CompactionNode> compactionDAGFwd =
+ private final MutableGraph<CompactionNode> forwardCompactionDAG =
GraphBuilder.directed().build();
- private MutableGraph<CompactionNode> compactionDAGReverse =
+ private final MutableGraph<CompactionNode> backwardCompactionDAG =
GraphBuilder.directed().build();
public static final Integer DEBUG_DAG_BUILD_UP = 2;
@@ -215,390 +288,511 @@ public class RocksDBCheckpointDiffer {
DEBUG_LEVEL.add(level);
}
- // Flushes the WAL and Creates a RocksDB checkpoint
- @SuppressWarnings({"NM_METHOD_NAMING_CONVENTION"})
- public void createCheckPoint(String dbPathArg, String cpPathArg,
- RocksDB rocksDB) {
- LOG.warn("Creating Checkpoint for RocksDB instance : " +
- dbPathArg + "in a CheckPoint Location" + cpPathArg);
- try {
- rocksDB.flush(new FlushOptions());
- Checkpoint cp = Checkpoint.create(rocksDB);
- cp.createCheckpoint(cpPathArg);
- } catch (RocksDBException e) {
- throw new RuntimeException(e.getMessage());
+ /**
+ * Append (then flush) to the current compaction log file path.
+ * Note: This does NOT automatically append newline to the log.
+ */
+ private synchronized void appendToCurrentCompactionLog(String content) {
+ if (currentCompactionLogPath == null) {
+ LOG.error("Unable to append compaction log. "
+ + "Compaction log path is not set. "
+ + "Please check initialization.");
+ throw new RuntimeException("Compaction log path not set");
+ }
+ try (BufferedWriter bw = Files.newBufferedWriter(
+ Paths.get(currentCompactionLogPath),
+ StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
+ bw.write(content);
+ bw.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to append compaction log to " +
+ currentCompactionLogPath, e);
}
}
- public void setRocksDBForCompactionTracking(DBOptions rocksOptions)
- throws RocksDBException {
- setRocksDBForCompactionTracking(rocksOptions,
- new ArrayList<AbstractEventListener>());
- }
-
- public void setRocksDBForCompactionTracking(
- DBOptions rocksOptions, List<AbstractEventListener> list) {
- final AbstractEventListener onCompactionCompletedListener =
- new AbstractEventListener() {
- @Override
- @SuppressFBWarnings({
- "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
- "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public void onCompactionCompleted(
- final RocksDB db, final CompactionJobInfo compactionJobInfo) {
- synchronized (db) {
- LOG.warn(compactionJobInfo.compactionReason().toString());
- LOG.warn("List of input files:");
- for (String file : compactionJobInfo.inputFiles()) {
- LOG.warn(file);
- String saveLinkFileName =
- saveCompactedFilePath + new File(file).getName();
- Path link = Paths.get(saveLinkFileName);
- Path srcFile = Paths.get(file);
- try {
- Files.createLink(link, srcFile);
- } catch (IOException e) {
- LOG.warn("Exception in creating hardlink");
- e.printStackTrace();
- }
- }
- LOG.warn("List of output files:");
- for (String file : compactionJobInfo.outputFiles()) {
- LOG.warn(file + ",");
- }
- // Let us also build the graph
- for (String outFilePath : compactionJobInfo.outputFiles()) {
- String outfile =
- Paths.get(outFilePath).getFileName().toString();
- CompactionNode outfileNode = compactionNodeTable.get(outfile);
- if (outfileNode == null) {
- long numKeys = 0;
- try {
- numKeys = getSSTFileSummary(outfile);
- } catch (Exception e) {
- LOG.warn(e.getMessage());
- }
- outfileNode = new CompactionNode(outfile,
- lastSnapshotPrefix, numKeys,
- currentCompactionGen);
- compactionDAGFwd.addNode(outfileNode);
- compactionDAGReverse.addNode(outfileNode);
- compactionNodeTable.put(outfile, outfileNode);
- }
- for (String inFilePath : compactionJobInfo.inputFiles()) {
- String infile =
- Paths.get(inFilePath).getFileName().toString();
- CompactionNode infileNode = compactionNodeTable.get(infile);
- if (infileNode == null) {
- long numKeys = 0;
- try {
- numKeys = getSSTFileSummary(infile);
- } catch (Exception e) {
- LOG.warn(e.getMessage());
- }
- infileNode = new CompactionNode(infile,
- lastSnapshotPrefix,
- numKeys, UNKNOWN_COMPACTION_GEN);
- compactionDAGFwd.addNode(infileNode);
- compactionDAGReverse.addNode(infileNode);
- compactionNodeTable.put(infile, infileNode);
- }
- if (outfileNode.fileName.compareToIgnoreCase(
- infileNode.fileName) != 0) {
- compactionDAGFwd.putEdge(outfileNode, infileNode);
- compactionDAGReverse.putEdge(infileNode, outfileNode);
- }
- }
- }
- if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
- printMutableGraph(null, null, compactionDAGFwd);
- }
- }
- }
- };
+ /**
+ * Append a sequence number to the compaction log (roughly) when an Ozone
+ * snapshot (RDB checkpoint) is taken.
+ * @param sequenceNum RDB sequence number
+ */
+ public void appendSequenceNumberToCompactionLog(long sequenceNum) {
+ final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum + "\n";
+ appendToCurrentCompactionLog(line);
+ }
- list.add(onCompactionCompletedListener);
+ /**
+ * Takes {@link org.rocksdb.Options}.
+ */
+ public void setRocksDBForCompactionTracking(Options rocksOptions,
+ List<AbstractEventListener> list) {
+ list.add(newCompactionBeginListener());
+ list.add(newCompactionCompletedListener());
rocksOptions.setListeners(list);
}
+ public void setRocksDBForCompactionTracking(Options rocksOptions) {
+ setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
+ }
+ /**
+ * Takes {@link org.rocksdb.DBOptions}.
+ */
+ public void setRocksDBForCompactionTracking(DBOptions rocksOptions,
+ List<AbstractEventListener> list) {
+ list.add(newCompactionBeginListener());
+ list.add(newCompactionCompletedListener());
+ rocksOptions.setListeners(list);
+ }
- public void setRocksDBForCompactionTracking(Options rocksOptions)
+ public void setRocksDBForCompactionTracking(DBOptions rocksOptions)
throws RocksDBException {
- setRocksDBForCompactionTracking(rocksOptions,
- new ArrayList<AbstractEventListener>());
- }
-
- public void setRocksDBForCompactionTracking(
- Options rocksOptions, List<AbstractEventListener> list) {
- final AbstractEventListener onCompactionCompletedListener =
- new AbstractEventListener() {
- @Override
- @SuppressFBWarnings({
- "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
- "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public void onCompactionCompleted(
- final RocksDB db,final CompactionJobInfo compactionJobInfo) {
- synchronized (db) {
- LOG.warn(compactionJobInfo.compactionReason().toString());
- LOG.warn("List of input files:");
- for (String file : compactionJobInfo.inputFiles()) {
- LOG.warn(file);
- String saveLinkFileName =
- saveCompactedFilePath + new File(file).getName();
- Path link = Paths.get(saveLinkFileName);
- Path srcFile = Paths.get(file);
- try {
- Files.createLink(link, srcFile);
- } catch (IOException e) {
- LOG.warn("Exception in creating hardlink");
- e.printStackTrace();
- }
- }
- LOG.warn("List of output files:");
- for (String file : compactionJobInfo.outputFiles()) {
- LOG.warn(file);
- }
- // Let us also build the graph
- for (String outFilePath : compactionJobInfo.outputFiles()) {
- String outfile =
- Paths.get(outFilePath).getFileName().toString();
- CompactionNode outfileNode = compactionNodeTable.get(outfile);
- if (outfileNode == null) {
- long numKeys = 0;
- try {
- numKeys = getSSTFileSummary(outfile);
- } catch (Exception e) {
- LOG.warn(e.getMessage());
- }
- outfileNode = new CompactionNode(outfile,
- lastSnapshotPrefix,
- numKeys, currentCompactionGen);
- compactionDAGFwd.addNode(outfileNode);
- compactionDAGReverse.addNode(outfileNode);
- compactionNodeTable.put(outfile, outfileNode);
- }
- for (String inFilePath : compactionJobInfo.inputFiles()) {
- String infile =
- Paths.get(inFilePath).getFileName().toString();
- CompactionNode infileNode = compactionNodeTable.get(infile);
- if (infileNode == null) {
- long numKeys = 0;
- try {
- numKeys = getSSTFileSummary(infile);
- } catch (Exception e) {
- LOG.warn(e.getMessage());
- }
- infileNode = new CompactionNode(infile,
- lastSnapshotPrefix, numKeys,
- UNKNOWN_COMPACTION_GEN);
- compactionDAGFwd.addNode(infileNode);
- compactionDAGReverse.addNode(infileNode);
- compactionNodeTable.put(infile, infileNode);
- }
- if (outfileNode.fileName.compareToIgnoreCase(
- infileNode.fileName) != 0) {
- compactionDAGFwd.putEdge(outfileNode, infileNode);
- compactionDAGReverse.putEdge(infileNode, outfileNode);
- }
- }
- }
- if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
- printMutableGraph(null, null,
- compactionDAGFwd);
- }
+ setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
+ }
+
+ private AbstractEventListener newCompactionBeginListener() {
+ return new AbstractEventListener() {
+ @Override
+ public void onCompactionBegin(RocksDB db,
+ CompactionJobInfo compactionJobInfo) {
+
+ synchronized (compactionListenerWriteLock) {
+
+ if (compactionJobInfo.inputFiles().size() == 0) {
+ LOG.error("Compaction input files list is empty");
+ return;
+ }
+
+ // Create hardlink backups for the SST files that are going
+ // to be deleted after this RDB compaction.
+ for (String file : compactionJobInfo.inputFiles()) {
+ LOG.debug("Creating hard link for '{}'", file);
+ String saveLinkFileName =
+ sstBackupDir + new File(file).getName();
+ Path link = Paths.get(saveLinkFileName);
+ Path srcFile = Paths.get(file);
+ try {
+ Files.createLink(link, srcFile);
+ } catch (IOException e) {
+ LOG.error("Exception in creating hard link for {}", file);
+ throw new RuntimeException("Failed to create hard link", e);
}
}
- };
- list.add(onCompactionCompletedListener);
- rocksOptions.setListeners(list);
+ }
+ }
+ };
}
- public RocksDB getRocksDBInstanceWithCompactionTracking(String dbPath)
- throws RocksDBException {
- final Options opt = new Options().setCreateIfMissing(true)
- .setCompressionType(CompressionType.NO_COMPRESSION);
- opt.setMaxBytesForLevelMultiplier(2);
- setRocksDBForCompactionTracking(opt);
- return RocksDB.open(opt, dbPath);
+ private AbstractEventListener newCompactionCompletedListener() {
+ return new AbstractEventListener() {
+ @Override
+ public void onCompactionCompleted(RocksDB db,
+ CompactionJobInfo compactionJobInfo) {
+
+ synchronized (compactionListenerWriteLock) {
+
+ if (compactionJobInfo.inputFiles().isEmpty()) {
+ LOG.error("Compaction input files list is empty");
+ return;
+ }
+
+ final StringBuilder sb = new StringBuilder();
+
+ if (LOG.isDebugEnabled()) {
+ // Print compaction reason for this entry in the log file
+ // e.g. kLevelL0FilesNum / kLevelMaxLevelSize.
+ sb.append(COMPACTION_LOG_COMMENT_LINE_PREFIX)
+ .append(compactionJobInfo.compactionReason())
+ .append('\n');
+ }
+
+ // Mark the beginning of a compaction log
+ sb.append(COMPACTION_LOG_ENTRY_LINE_PREFIX);
+
+ // Trim DB path, only keep the SST file name
+ final int filenameOffset =
+ compactionJobInfo.inputFiles().get(0).lastIndexOf("/") + 1;
+
+ // Append the list of input files
+ final List<String> inputFiles = compactionJobInfo.inputFiles();
+ // Trim the file path, leave only the SST file name without extension
+ inputFiles.replaceAll(s -> s.substring(
+ filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
+ final String inputFilesJoined = String.join(",", inputFiles);
+ sb.append(inputFilesJoined);
+
+ // Insert delimiter between input files an output files
+ sb.append(':');
+
+ // Append the list of output files
+ final List<String> outputFiles = compactionJobInfo.outputFiles();
+ outputFiles.replaceAll(s -> s.substring(
+ filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
+ final String outputFilesJoined = String.join(",", outputFiles);
+ sb.append(outputFilesJoined);
+
+ // End of line
+ sb.append('\n');
+
+ // Write input and output file names to compaction log
+ appendToCurrentCompactionLog(sb.toString());
+
+ // Populate the DAG
+ populateCompactionDAG(inputFiles, outputFiles,
+ db.getLatestSequenceNumber());
+/*
+ if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
+ printMutableGraph(null, null, compactionDAGFwd);
+ }
+ */
+ }
+ }
+ };
}
- // Get a summary of a given SST file
- public long getSSTFileSummary(String filename)
- throws RocksDBException {
+ /**
+ * Get number of keys in an SST file.
+ * @param filename SST filename
+ * @return number of keys
+ */
+ private long getSSTFileSummary(String filename) throws RocksDBException {
+
+ if (!filename.endsWith(SST_FILE_EXTENSION)) {
+ filename += SST_FILE_EXTENSION;
+ }
+
Options option = new Options();
SstFileReader reader = new SstFileReader(option);
- try {
- reader.open(saveCompactedFilePath + filename);
- } catch (RocksDBException e) {
- reader.open(rocksDbPath + "/"+ filename);
+
+ File sstFile = new File(sstBackupDir + filename);
+ File sstFileInActiveDB = new File(activeDBLocationStr + filename);
+ if (sstFile.exists()) {
+ reader.open(sstBackupDir + filename);
+ } else if (sstFileInActiveDB.exists()) {
+ reader.open(activeDBLocationStr + filename);
+ } else {
+ throw new RuntimeException("Can't find SST file: " + filename);
}
+
TableProperties properties = reader.getTableProperties();
- LOG.warn("getSSTFileSummary " + filename + ":: " +
- properties.getNumEntries());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} has {} keys", filename, properties.getNumEntries());
+ }
return properties.getNumEntries();
}
- // Read the current Live manifest for a given RocksDB instance (Active or
- // Checkpoint). Returns the list of currently active SST FileNames.
- @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+ /**
+ * Helper method to trim the filename retrieved from LiveFileMetaData.
+ */
+ private String trimSSTFilename(String filename) {
+ if (!filename.startsWith("/")) {
+ final String errorMsg = String.format(
+ "Invalid start of filename: '%s'. Expected '/'", filename);
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ if (!filename.endsWith(SST_FILE_EXTENSION)) {
+ final String errorMsg = String.format(
+ "Invalid extension of file: '%s'. Expected '%s'",
+ filename, SST_FILE_EXTENSION_LENGTH);
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ return filename.substring("/".length(),
+ filename.length() - SST_FILE_EXTENSION_LENGTH);
+ }
+
+ /**
+ * Get a list of relevant column family descriptors.
+ * @param cfOpts ColumnFamilyOptions
+ * @return List of ColumnFamilyDescriptor
+ */
+ @VisibleForTesting
+ static List<ColumnFamilyDescriptor> getCFDescriptorList(
+ ColumnFamilyOptions cfOpts) {
+ return asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
+ new ColumnFamilyDescriptor("keyTable".getBytes(UTF_8), cfOpts),
+ new ColumnFamilyDescriptor("directoryTable".getBytes(UTF_8), cfOpts),
+ new ColumnFamilyDescriptor("fileTable".getBytes(UTF_8), cfOpts)
+ );
+ }
+
+ /**
+ * Read the current Live manifest for a given RocksDB instance (Active or
+ * Checkpoint).
+ * @param dbPathArg path to a RocksDB directory
+ * @return a list of SST files (without extension) in the DB.
+ */
public HashSet<String> readRocksDBLiveFiles(String dbPathArg) {
RocksDB rocksDB = null;
HashSet<String> liveFiles = new HashSet<>();
- //
- try (final Options options =
- new Options().setParanoidChecks(true)
- .setCreateIfMissing(true)
- .setCompressionType(CompressionType.NO_COMPRESSION)
- .setForceConsistencyChecks(false)) {
- rocksDB = RocksDB.openReadOnly(options, dbPathArg);
+
+ final ColumnFamilyOptions cfOpts = new ColumnFamilyOptions();
+ final List<ColumnFamilyDescriptor> cfDescriptors =
+ getCFDescriptorList(cfOpts);
+ final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+ try (DBOptions dbOptions = new DBOptions()
+ .setParanoidChecks(true)) {
+
+ rocksDB = RocksDB.openReadOnly(dbOptions, dbPathArg,
+ cfDescriptors, columnFamilyHandles);
List<LiveFileMetaData> liveFileMetaDataList =
rocksDB.getLiveFilesMetaData();
- LOG.warn("Live File Metadata for DB: " + dbPathArg);
+ LOG.debug("SST File Metadata for DB: " + dbPathArg);
for (LiveFileMetaData m : liveFileMetaDataList) {
- LOG.warn("\tFile :" + m.fileName());
- LOG.warn("\tLevel :" + m.level());
- liveFiles.add(Paths.get(m.fileName()).getFileName().toString());
+ LOG.debug("File: {}, Level: {}", m.fileName(), m.level());
+ final String trimmedFilename = trimSSTFilename(m.fileName());
+ liveFiles.add(trimmedFilename);
}
} catch (RocksDBException e) {
+ LOG.error("Error during RocksDB operation: {}", e.getMessage());
e.printStackTrace();
} finally {
if (rocksDB != null) {
rocksDB.close();
}
+ cfOpts.close();
}
return liveFiles;
}
- // Given the src and destination Snapshots, it prints a Diff list.
- private synchronized void printSnapdiffSSTFiles(
- Snapshot src, Snapshot dest) throws RocksDBException {
- LOG.warn("Src Snapshot files :" + src.dbPath);
+ /**
+ * Process each line of compaction log text file input and populate the DAG.
+ */
+ private synchronized void processCompactionLogLine(String line) {
+
+ LOG.debug("Processing line: {}", line);
+
+ if (line.startsWith("#")) {
+ // Skip comments
+ LOG.debug("Comment line, skipped");
+ } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
+ // Read sequence number
+ LOG.debug("Reading sequence number as snapshot generation");
+ final String seqNumStr =
+ line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+ // This would the snapshot generation for the nodes to come
+ reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+ } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ // Read compaction log entry
+
+ // Trim the beginning
+ line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+ final String[] io = line.split(":");
+ if (io.length != 2) {
+ LOG.error("Invalid line in compaction log: {}", line);
+ return;
+ }
+ final String[] inputFiles = io[0].split(",");
+ final String[] outputFiles = io[1].split(",");
+ populateCompactionDAG(asList(inputFiles),
+ asList(outputFiles), reconstructionSnapshotGeneration);
+ } else {
+ LOG.error("Invalid line in compaction log: {}", line);
+ }
+ }
+
+ /**
+ * Helper to read compaction log to the internal DAG.
+ */
+ private void readCompactionLogToDAG(String currCompactionLogPath) {
+ LOG.debug("Loading compaction log: {}", currCompactionLogPath);
+ try (Stream<String> logLineStream =
+ Files.lines(Paths.get(currCompactionLogPath), UTF_8)) {
+ logLineStream.forEach(this::processCompactionLogLine);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+ }
+
+ /**
+ * Load existing compaction log files to the in-memory DAG.
+ * This only needs to be done once during OM startup.
+ */
+ public synchronized void loadAllCompactionLogs() {
+ if (compactionLogDir == null) {
+ throw new RuntimeException("Compaction log directory must be set first");
+ }
+ reconstructionSnapshotGeneration = 0L;
+ try {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+ .sorted()) {
+ for (Path logPath : pathStream.collect(Collectors.toList())) {
+ readCompactionLogToDAG(logPath.toString());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ /**
+ * Snapshot information node class for the differ.
+ */
+ public static class DifferSnapshotInfo {
+ private final String dbPath;
+ private final String snapshotID;
+ private final long snapshotGeneration;
+
+ public DifferSnapshotInfo(String db, String id, long gen) {
+ dbPath = db;
+ snapshotID = id;
+ snapshotGeneration = gen;
+ }
+
+ public String getDbPath() {
+ return dbPath;
+ }
+
+ public String getSnapshotID() {
+ return snapshotID;
+ }
+
+ public long getSnapshotGeneration() {
+ return snapshotGeneration;
+ }
+
+ @Override
+ public String toString() {
+ return "DifferSnapshotInfo{" + "dbPath='" + dbPath + '\''
+ + ", snapshotID='" + snapshotID + '\'' + ", snapshotGeneration="
+ + snapshotGeneration + '}';
+ }
+ }
+
+ /**
+ * Get a list of SST files that differs between src and destination snapshots.
+ * <p>
+ * Expected input: src is a snapshot taken AFTER the dest.
+ *
+ * @param src source snapshot
+ * @param dest destination snapshot
+ */
+ public synchronized List<String> getSSTDiffList(
+ DifferSnapshotInfo src, DifferSnapshotInfo dest) {
+
HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
- LOG.warn("dest Snapshot files :" + dest.dbPath);
HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
HashSet<String> fwdDAGSameFiles = new HashSet<>();
HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
- LOG.warn("Doing forward diff between source and destination " +
- "Snapshots:" + src.dbPath + ", " + dest.dbPath);
- realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
- compactionDAGFwd,
- fwdDAGSameFiles,
- fwdDAGDifferentFiles);
-
- LOG.warn("Overall Summary \n" +
- "Doing Overall diff between source and destination Snapshots:" +
- src.dbPath + ", " + dest.dbPath);
- System.out.print("fwd DAG Same files :");
- for (String file : fwdDAGSameFiles) {
- System.out.print(file + ", ");
- }
- LOG.warn("");
- System.out.print("\nFwd DAG Different files :");
- for (String file : fwdDAGDifferentFiles) {
- CompactionNode n = compactionNodeTable.get(file);
- System.out.print(file + ", ");
- }
- LOG.warn("");
- }
-
- @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void realPrintSnapdiffSSTFiles(
- Snapshot src, Snapshot dest,
- HashSet<String> srcSnapFiles,
- HashSet<String> destSnapFiles,
+ LOG.debug("Doing forward diff from src '{}' to dest '{}'",
+ src.dbPath, dest.dbPath);
+ internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
+ forwardCompactionDAG, fwdDAGSameFiles, fwdDAGDifferentFiles);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
+ dest.dbPath + "':");
+ StringBuilder logSB = new StringBuilder();
+
+ logSB.append("Fwd DAG same SST files: ");
+ for (String file : fwdDAGSameFiles) {
+ logSB.append(file).append(" ");
+ }
+ LOG.debug(logSB.toString());
+
+ logSB.setLength(0);
+ logSB.append("Fwd DAG different SST files: ");
+ for (String file : fwdDAGDifferentFiles) {
+ logSB.append(file).append(" ");
+ }
+ LOG.debug("{}", logSB);
+ }
+
+ return new ArrayList<>(fwdDAGDifferentFiles);
+ }
+
+ /**
+ * Core getSSTDiffList logic.
+ */
+ private void internalGetSSTDiffList(
+ DifferSnapshotInfo src, DifferSnapshotInfo dest,
+ HashSet<String> srcSnapFiles, HashSet<String> destSnapFiles,
MutableGraph<CompactionNode> mutableGraph,
HashSet<String> sameFiles, HashSet<String> differentFiles) {
-
for (String fileName : srcSnapFiles) {
if (destSnapFiles.contains(fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+ LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
+ src.dbPath, dest.dbPath, fileName);
sameFiles.add(fileName);
continue;
}
- CompactionNode infileNode =
- compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
+
+ CompactionNode infileNode = compactionNodeMap.get(fileName);
if (infileNode == null) {
- LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
- "never compacted");
+ LOG.debug("Source '{}' SST file '{}' is never compacted",
+ src.dbPath, fileName);
differentFiles.add(fileName);
continue;
}
- System.out.print(" Expandin File:" + fileName + ":\n");
- Set<CompactionNode> nextLevel = new HashSet<>();
- nextLevel.add(infileNode);
+
+ LOG.debug("Expanding SST file: {}", fileName);
Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
- int i = 1;
- while (currentLevel.size() != 0) {
- LOG.warn("DAG Level :" + i++);
+ currentLevel.add(infileNode);
+ // Traversal level/depth indicator for debug print
+ int level = 1;
+ while (!currentLevel.isEmpty()) {
+ LOG.debug("BFS level: {}. Current level has {} nodes.",
+ level++, currentLevel.size());
+
+ final Set<CompactionNode> nextLevel = new HashSet<>();
for (CompactionNode current : currentLevel) {
- LOG.warn("acknowledging file " + current.fileName);
+ LOG.debug("Processing node: {}", current.fileName);
if (current.snapshotGeneration <= dest.snapshotGeneration) {
- LOG.warn("Reached dest generation count. SrcSnapshot : " +
- src.dbPath + " and Dest " + "Snapshot" + dest.dbPath +
- " Contain Diffrent file " + current.fileName);
+ LOG.debug("Current node's snapshot generation '{}' "
+ + "reached destination snapshot's '{}'. "
+ + "Src '{}' and dest '{}' have different SST file: '{}'",
+ current.snapshotGeneration, dest.snapshotGeneration,
+ src.dbPath, dest.dbPath, current.fileName);
differentFiles.add(current.fileName);
continue;
}
+
Set<CompactionNode> successors = mutableGraph.successors(current);
- if (successors == null || successors.size() == 0) {
- LOG.warn("No further compaction for the file" +
- ".SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Diffrent file " +
- current.fileName);
+ if (successors.isEmpty()) {
+ LOG.debug("No further compaction happened to the current file. " +
+ "Src '{}' and dest '{}' have different file: {}",
+ src.dbPath, dest.dbPath, current.fileName);
differentFiles.add(current.fileName);
- } else {
- for (CompactionNode oneSucc : successors) {
- if (sameFiles.contains(oneSucc.fileName) ||
- differentFiles.contains(oneSucc.fileName)) {
- LOG.warn("Skipping file :" + oneSucc.fileName);
- continue;
- }
- if (destSnapFiles.contains(oneSucc.fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " +
- oneSucc.fileName);
- sameFiles.add(oneSucc.fileName);
- continue;
- } else {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Diffrent file " +
- oneSucc.fileName);
- nextLevel.add(oneSucc);
- }
+ continue;
+ }
+
+ for (CompactionNode node : successors) {
+ if (sameFiles.contains(node.fileName) ||
+ differentFiles.contains(node.fileName)) {
+ LOG.debug("Skipping known processed SST: {}", node.fileName);
+ continue;
+ }
+
+ if (destSnapFiles.contains(node.fileName)) {
+ LOG.debug("Src '{}' and dest '{}' have the same SST: {}",
+ src.dbPath, dest.dbPath, node.fileName);
+ sameFiles.add(node.fileName);
+ continue;
}
+
+ // Queue different SST to the next level
+ LOG.debug("Src '{}' and dest '{}' have a different SST: {}",
+ src.dbPath, dest.dbPath, node.fileName);
+ nextLevel.add(node);
}
}
- currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
- LOG.warn("");
+ currentLevel = nextLevel;
}
}
- LOG.warn("Summary :");
- for (String file : sameFiles) {
- System.out.print("Same File : " + file);
- }
- LOG.warn("");
-
- for (String file : differentFiles) {
- System.out.print("Different File : " + file);
- }
- LOG.warn("");
}
- @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC")
- class NodeComparator implements Comparator<CompactionNode>
- {
- public int compare(CompactionNode a, CompactionNode b)
- {
+ static class NodeComparator
+ implements Comparator<CompactionNode>, Serializable {
+ public int compare(CompactionNode a, CompactionNode b) {
return a.fileName.compareToIgnoreCase(b.fileName);
}
@@ -608,210 +802,204 @@ public class RocksDBCheckpointDiffer {
}
}
-
- public void dumpCompactioNodeTable() {
- List<CompactionNode> nodeList =
- compactionNodeTable.values().stream().collect(Collectors.toList());
- Collections.sort(nodeList, new NodeComparator());
- for (CompactionNode n : nodeList ) {
- LOG.warn("File : " + n.fileName + " :: Total keys : "
- + n.totalNumberOfKeys);
- LOG.warn("File : " + n.fileName + " :: Cumulative keys : " +
+ @VisibleForTesting
+ void dumpCompactionNodeTable() {
+ List<CompactionNode> nodeList = compactionNodeMap.values().stream()
+ .sorted(new NodeComparator()).collect(Collectors.toList());
+ for (CompactionNode n : nodeList) {
+ LOG.debug("File '{}' total keys: {}", n.fileName, n.totalNumberOfKeys);
+ LOG.debug("File '{}' cumulative keys: {}", n.fileName,
n.cumulativeKeysReverseTraversal);
}
}
- @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void printMutableGraphFromAGivenNode(
- String fileName, int level, MutableGraph<CompactionNode> mutableGraph) {
- CompactionNode infileNode =
- compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
+ @VisibleForTesting
+ public synchronized void printMutableGraphFromAGivenNode(String fileName,
+ int sstLevel, MutableGraph<CompactionNode> mutableGraph) {
+
+ CompactionNode infileNode = compactionNodeMap.get(fileName);
if (infileNode == null) {
return;
}
- System.out.print("\nCompaction Level : " + level + " Expandin File:" +
- fileName + ":\n");
- Set<CompactionNode> nextLevel = new HashSet<>();
- nextLevel.add(infileNode);
+ LOG.debug("Expanding file: {}. SST compaction level: {}",
+ fileName, sstLevel);
Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- int i = 1;
- while (currentLevel.size() != 0) {
- LOG.warn("DAG Level :" + i++);
+ currentLevel.add(infileNode);
+ int levelCounter = 1;
+ while (!currentLevel.isEmpty()) {
+ LOG.debug("DAG Level: {}", levelCounter++);
+ final Set<CompactionNode> nextLevel = new HashSet<>();
+ StringBuilder sb = new StringBuilder();
for (CompactionNode current : currentLevel) {
Set<CompactionNode> successors = mutableGraph.successors(current);
- for (CompactionNode oneSucc : successors) {
- System.out.print(oneSucc.fileName + " ");
- nextLevel.add(oneSucc);
+ for (CompactionNode succNode : successors) {
+ sb.append(succNode.fileName).append(" ");
+ nextLevel.add(succNode);
}
}
- currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
- LOG.warn("");
+ LOG.debug("{}", sb);
+ currentLevel = nextLevel;
}
}
- public synchronized void printMutableGraph(
- String srcSnapId, String destSnapId,
+ synchronized void printMutableGraph(String srcSnapId, String destSnapId,
MutableGraph<CompactionNode> mutableGraph) {
- LOG.warn("Printing the Graph");
- Set<CompactionNode> topLevelNodes = new HashSet<>();
- Set<CompactionNode> allNodes = new HashSet<>();
- for (CompactionNode n : mutableGraph.nodes()) {
+
+ LOG.debug("Gathering all SST file nodes from src '{}' to dest '{}'",
+ srcSnapId, destSnapId);
+
+ final Queue<CompactionNode> nodeQueue = new LinkedList<>();
+ // Queue source snapshot SST file nodes
+ for (CompactionNode node : mutableGraph.nodes()) {
if (srcSnapId == null ||
- n.snapshotId.compareToIgnoreCase(srcSnapId) == 0) {
- topLevelNodes.add(n);
+ node.snapshotId.compareToIgnoreCase(srcSnapId) == 0) {
+ nodeQueue.add(node);
}
}
- Iterator iter = topLevelNodes.iterator();
- while (iter.hasNext()) {
- CompactionNode n = (CompactionNode) iter.next();
- Set<CompactionNode> succ = mutableGraph.successors(n);
- LOG.warn("Parent Node :" + n.fileName);
- if (succ.size() == 0) {
- LOG.warn("No Children Node ");
- allNodes.add(n);
- iter.remove();
- iter = topLevelNodes.iterator();
+
+ final Set<CompactionNode> allNodesSet = new HashSet<>();
+ while (!nodeQueue.isEmpty()) {
+ CompactionNode node = nodeQueue.poll();
+ Set<CompactionNode> succSet = mutableGraph.successors(node);
+ LOG.debug("Current node: {}", node);
+ if (succSet.isEmpty()) {
+ LOG.debug("Has no successor node");
+ allNodesSet.add(node);
continue;
}
- for (CompactionNode oneSucc : succ) {
- LOG.warn("Children Node :" + oneSucc.fileName);
- if (srcSnapId == null||
- oneSucc.snapshotId.compareToIgnoreCase(destSnapId) == 0) {
- allNodes.add(oneSucc);
- } else {
- topLevelNodes.add(oneSucc);
+ for (CompactionNode succNode : succSet) {
+ LOG.debug("Has successor node: {}", succNode);
+ if (srcSnapId == null ||
+ succNode.snapshotId.compareToIgnoreCase(destSnapId) == 0) {
+ allNodesSet.add(succNode);
+ continue;
}
+ nodeQueue.add(succNode);
}
- iter.remove();
- iter = topLevelNodes.iterator();
- }
- LOG.warn("src snap:" + srcSnapId);
- LOG.warn("dest snap:" + destSnapId);
- for (CompactionNode n : allNodes) {
- LOG.warn("Files are :" + n.fileName);
}
- }
+ LOG.debug("Files are: {}", allNodesSet);
+ }
- public void createSnapshot(RocksDB rocksDB) throws InterruptedException {
-
- LOG.warn("Current time is::" + System.currentTimeMillis());
- long t1 = System.currentTimeMillis();
+ public MutableGraph<CompactionNode> getForwardCompactionDAG() {
+ return forwardCompactionDAG;
+ }
- cpPath = cpPath + lastSnapshotCounter;
- createCheckPoint(rocksDbPath, cpPath, rocksDB);
- allSnapshots[lastSnapshotCounter] = new Snapshot(cpPath,
- lastSnapshotPrefix, lastSnapshotCounter);
+ public MutableGraph<CompactionNode> getBackwardCompactionDAG() {
+ return backwardCompactionDAG;
+ }
- long t2 = System.currentTimeMillis();
- LOG.warn("Current time is::" + t2);
+ /**
+ * Helper method to add a new file node to the DAG.
+ * @return CompactionNode
+ */
+ private CompactionNode addNodeToDAG(String file, long seqNum) {
+ long numKeys = 0L;
+ try {
+ numKeys = getSSTFileSummary(file);
+ } catch (RocksDBException e) {
+ LOG.warn("Can't get num of keys in SST '{}': {}", file, e.getMessage());
+ }
+ CompactionNode fileNode = new CompactionNode(file, null, numKeys, seqNum);
+ forwardCompactionDAG.addNode(fileNode);
+ backwardCompactionDAG.addNode(fileNode);
- LOG.warn("millisecond difference is ::" + (t2 - t1));
- Thread.sleep(100);
- ++lastSnapshotCounter;
- lastSnapshotPrefix = "sid_" + lastSnapshotCounter;
- LOG.warn("done :: 1");
+ return fileNode;
}
+ /**
+ * Populate the compaction DAG with input and output SST files lists.
+ */
+ private void populateCompactionDAG(List<String> inputFiles,
+ List<String> outputFiles, long seqNum) {
- public void printAllSnapshots() throws InterruptedException {
- for (Snapshot snap : allSnapshots) {
- if (snap == null) {
- break;
- }
- LOG.warn("Snapshot id" + snap.snapshotID);
- LOG.warn("Snapshot path" + snap.dbPath);
- LOG.warn("Snapshot Generation" + snap.snapshotGeneration);
- LOG.warn("");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Input files: {} -> Output files: {}", inputFiles, outputFiles);
}
- }
- public void diffAllSnapshots() throws InterruptedException, RocksDBException {
- for (Snapshot snap : allSnapshots) {
- if (snap == null) {
- break;
+ for (String outfile : outputFiles) {
+ final CompactionNode outfileNode = compactionNodeMap.computeIfAbsent(
+ outfile, file -> addNodeToDAG(file, seqNum));
+
+ for (String infile : inputFiles) {
+ final CompactionNode infileNode = compactionNodeMap.computeIfAbsent(
+ infile, file -> addNodeToDAG(file, seqNum));
+ // Draw the edges
+ if (!outfileNode.fileName.equals(infileNode.fileName)) {
+ forwardCompactionDAG.putEdge(outfileNode, infileNode);
+ backwardCompactionDAG.putEdge(infileNode, outfileNode);
+ }
}
- printSnapdiffSSTFiles(allSnapshots[lastSnapshotCounter - 1], snap);
}
- }
-
- public MutableGraph<CompactionNode> getCompactionFwdDAG() {
- return compactionDAGFwd;
- }
- public MutableGraph<CompactionNode> getCompactionReverseDAG() {
- return compactionDAGFwd;
}
+ @VisibleForTesting
public synchronized void traverseGraph(
MutableGraph<CompactionNode> reverseMutableGraph,
MutableGraph<CompactionNode> fwdMutableGraph) {
- List<CompactionNode> nodeList =
- compactionNodeTable.values().stream().collect(Collectors.toList());
- Collections.sort(nodeList, new NodeComparator());
+ List<CompactionNode> nodeList = compactionNodeMap.values().stream()
+ .sorted(new NodeComparator()).collect(Collectors.toList());
- for (CompactionNode infileNode : nodeList ) {
- // fist go through fwdGraph to find nodes that don't have succesors.
+ for (CompactionNode infileNode : nodeList) {
+ // fist go through fwdGraph to find nodes that don't have successors.
// These nodes will be the top level nodes in reverse graph
Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode);
- if (successors == null || successors.size() == 0) {
- LOG.warn("traverseGraph : No successors. cumulative " +
- "keys : " + infileNode.cumulativeKeysReverseTraversal + "::total " +
- "keys ::" + infileNode.totalNumberOfKeys);
+ if (successors.size() == 0) {
+ LOG.debug("No successors. Cumulative keys: {}, total keys: {}",
+ infileNode.cumulativeKeysReverseTraversal,
+ infileNode.totalNumberOfKeys);
infileNode.cumulativeKeysReverseTraversal =
infileNode.totalNumberOfKeys;
}
}
HashSet<CompactionNode> visited = new HashSet<>();
- for (CompactionNode infileNode : nodeList ) {
+ for (CompactionNode infileNode : nodeList) {
if (visited.contains(infileNode)) {
continue;
}
visited.add(infileNode);
- System.out.print("traverseGraph: Visiting node " + infileNode.fileName +
- ":\n");
- Set<CompactionNode> nextLevel = new HashSet<>();
- nextLevel.add(infileNode);
+ LOG.debug("Visiting node '{}'", infileNode.fileName);
Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
- int i = 1;
- while (currentLevel.size() != 0) {
- LOG.warn("traverseGraph : DAG Level :" + i++);
+ currentLevel.add(infileNode);
+ int level = 1;
+ while (!currentLevel.isEmpty()) {
+ LOG.debug("BFS Level: {}. Current level has {} nodes",
+ level++, currentLevel.size());
+ final Set<CompactionNode> nextLevel = new HashSet<>();
for (CompactionNode current : currentLevel) {
- LOG.warn("traverseGraph : expanding node " + current.fileName);
+ LOG.debug("Expanding node: {}", current.fileName);
Set<CompactionNode> successors =
reverseMutableGraph.successors(current);
- if (successors == null || successors.size() == 0) {
- LOG.warn("traverseGraph : No successors. cumulative " +
- "keys : " + current.cumulativeKeysReverseTraversal);
- } else {
- for (CompactionNode oneSucc : successors) {
- LOG.warn("traverseGraph : Adding to the next level : " +
- oneSucc.fileName);
- LOG.warn("traverseGraph : " + oneSucc.fileName + "cum" + " keys"
- + oneSucc.cumulativeKeysReverseTraversal + "parent" + " " +
- current.fileName + " total " + current.totalNumberOfKeys);
- oneSucc.cumulativeKeysReverseTraversal +=
- current.cumulativeKeysReverseTraversal;
- nextLevel.add(oneSucc);
- }
+ if (successors.isEmpty()) {
+ LOG.debug("No successors. Cumulative keys: {}",
+ current.cumulativeKeysReverseTraversal);
+ continue;
+ }
+ for (CompactionNode node : successors) {
+ LOG.debug("Adding to the next level: {}", node.fileName);
+ LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}",
+ node.fileName, node.cumulativeKeysReverseTraversal,
+ current.fileName, current.totalNumberOfKeys);
+ node.cumulativeKeysReverseTraversal +=
+ current.cumulativeKeysReverseTraversal;
+ nextLevel.add(node);
}
}
- currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
- LOG.warn("");
+ currentLevel = nextLevel;
}
}
}
+ @VisibleForTesting
public boolean debugEnabled(Integer level) {
return DEBUG_LEVEL.contains(level);
}
+
+ @VisibleForTesting
+ public static Logger getLog() {
+ return LOG;
+ }
}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index a94fb32236..9a4c5c10b0 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -18,71 +18,129 @@
package org.apache.ozone.rocksdiff;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotInfo;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
+import org.rocksdb.FlushOptions;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
-////CHECKSTYLE:OFF
+/**
+ * Test RocksDBCheckpointDiffer basic functionality.
+ */
public class TestRocksDBCheckpointDiffer {
- private static final String dbPath = "./rocksdb-data";
- private static final int NUM_ROW = 25000000;
- private static final int SNAPSHOT_EVERY_SO_MANY_KEYS = 999999;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRocksDBCheckpointDiffer.class);
- // keeps track of all the snapshots created so far.
- private static int lastSnapshotCounter = 0;
- private static String lastSnapshotPrefix = "snap_id_";
+ /**
+ * RocksDB path for the test.
+ */
+ private static final String TEST_DB_PATH = "./rocksdb-data";
+ private static final int NUM_ROW = 250000;
+ private static final int SNAPSHOT_EVERY_SO_MANY_KEYS = 49999;
+
+ /**
+ * RocksDB checkpoint path prefix.
+ */
+ private static final String CP_PATH_PREFIX = "rocksdb-cp-";
+ private final ArrayList<DifferSnapshotInfo> snapshots = new ArrayList<>();
+ /**
+ * Graph type.
+ */
+ enum GType {
+ FNAME,
+ KEYSIZE,
+ CUMUTATIVE_SIZE
+ }
- public static void main(String[] args) throws Exception {
- TestRocksDBCheckpointDiffer tester= new TestRocksDBCheckpointDiffer();
+ @BeforeAll
+ public static void init() {
+ // Checkpoint differ log level. Set to DEBUG for verbose output
+ GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.getLog(), Level.INFO);
+ // Test class log level. Set to DEBUG for verbose output
+ GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO);
+ }
+
+ @Test
+ void testMain() throws Exception {
+
+ final String clDirStr = "compaction-log";
+ // Delete the compaction log dir for the test, if it exists
+ File clDir = new File(clDirStr);
+ if (clDir.exists()) {
+ deleteDirectory(clDir);
+ }
+
+ final String metadataDirStr = ".";
+ final String sstDirStr = "compaction-sst-backup";
+
+ final File dbLocation = new File(TEST_DB_PATH);
RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer(
- "./rocksdb-data",
- 1000,
- "./rocksdb-data-cp",
- "./SavedCompacted_Files/",
- "./rocksdb-data-cf/",
- 0,
- "snap_id_");
- lastSnapshotPrefix = "snap_id_" + lastSnapshotCounter;
- RocksDB rocksDB = tester.createRocksDBInstance(dbPath, differ);
- Thread.sleep(10000);
-
- tester.readRocksDBInstance(dbPath, rocksDB, null, differ);
- differ.printAllSnapshots();
+ metadataDirStr, sstDirStr, clDirStr, dbLocation);
+
+ // Empty the SST backup folder first for testing
+ File sstDir = new File(sstDirStr);
+ deleteDirectory(sstDir);
+ if (!sstDir.mkdir()) {
+ fail("Unable to create SST backup directory");
+ }
+
+ RocksDB rocksDB = createRocksDBInstanceAndWriteKeys(TEST_DB_PATH, differ);
+ readRocksDBInstance(TEST_DB_PATH, rocksDB, null, differ);
+
+ if (LOG.isDebugEnabled()) {
+ printAllSnapshots();
+ }
+
differ.traverseGraph(
- differ.getCompactionReverseDAG(),
- differ.getCompactionFwdDAG());
- differ.diffAllSnapshots();
- differ.dumpCompactioNodeTable();
- for (RocksDBCheckpointDiffer.GType gtype :
- RocksDBCheckpointDiffer.GType.values()) {
- String fname = "fwdGraph_" + gtype.toString() + ".png";
- String rname = "reverseGraph_"+ gtype.toString() + ".png";
-
- //differ.pngPrintMutableGrapth(differ.getCompactionFwdDAG(),
- // fname, gtype);
- //differ.pngPrintMutableGrapth(differ.getCompactionReverseDAG(), rname,
- // gtype);
+ differ.getBackwardCompactionDAG(),
+ differ.getForwardCompactionDAG());
+
+ diffAllSnapshots(differ);
+
+ if (LOG.isDebugEnabled()) {
+ differ.dumpCompactionNodeTable();
}
+
+ for (GType gtype : GType.values()) {
+ String fname = "fwdGraph_" + gtype + ".png";
+ String rname = "reverseGraph_" + gtype + ".png";
+/*
+ differ.pngPrintMutableGrapth(differ.getCompactionFwdDAG(), fname, gtype);
+ differ.pngPrintMutableGrapth(
+ differ.getCompactionReverseDAG(), rname, gtype);
+ */
+ }
+
rocksDB.close();
}
@@ -93,53 +151,169 @@ public class TestRocksDBCheckpointDiffer {
return random.ints(leftLimit, rightLimit + 1)
.filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
- .limit(7)
+ .limit(length)
.collect(StringBuilder::new,
StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
- // Test Code to create sample RocksDB instance.
- public RocksDB createRocksDBInstance(String dbPathArg,
- RocksDBCheckpointDiffer differ)
- throws RocksDBException, InterruptedException {
+ /**
+ * Test SST differ.
+ */
+ void diffAllSnapshots(RocksDBCheckpointDiffer differ) {
+ final DifferSnapshotInfo src = snapshots.get(snapshots.size() - 1);
+
+ // Hard-coded expected output.
+ // The results are deterministic. Retrieved from a successful run.
+ final List<List<String>> expectedDifferResult = asList(
+ asList("000024", "000017", "000028", "000026", "000019", "000021"),
+ asList("000024", "000028", "000026", "000019", "000021"),
+ asList("000024", "000028", "000026", "000021"),
+ asList("000024", "000028", "000026"),
+ asList("000028", "000026"),
+ Collections.singletonList("000028"),
+ Collections.emptyList()
+ );
+ Assertions.assertEquals(snapshots.size(), expectedDifferResult.size());
+
+ int index = 0;
+ for (DifferSnapshotInfo snap : snapshots) {
+ // Returns a list of SST files to be fed into RocksDiff
+ List<String> sstDiffList = differ.getSSTDiffList(src, snap);
+ LOG.debug("SST diff list from '{}' to '{}': {}",
+ src.getDbPath(), snap.getDbPath(), sstDiffList);
+
+ Assertions.assertEquals(expectedDifferResult.get(index), sstDiffList);
+ ++index;
+ }
+ }
+
+ /**
+ * Helper function that creates an RDB checkpoint (= Ozone snapshot).
+ */
+ private void createCheckpoint(RocksDBCheckpointDiffer differ,
+ RocksDB rocksDB) {
+
+ LOG.trace("Current time: " + System.currentTimeMillis());
+ long t1 = System.currentTimeMillis();
+
+ final long snapshotGeneration = rocksDB.getLatestSequenceNumber();
+ final String cpPath = CP_PATH_PREFIX + snapshotGeneration;
+
+ // Delete the checkpoint dir if it already exists for the test
+ File dir = new File(cpPath);
+ if (dir.exists()) {
+ deleteDirectory(dir);
+ }
+
+ final long dbLatestSequenceNumber = rocksDB.getLatestSequenceNumber();
+
+ createCheckPoint(TEST_DB_PATH, cpPath, rocksDB);
+ final String snapshotId = "snap_id_" + snapshotGeneration;
+ final DifferSnapshotInfo currentSnapshot =
+ new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration);
+ this.snapshots.add(currentSnapshot);
+
+ // Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
+ differ.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber);
+
+ differ.setCurrentCompactionLog(dbLatestSequenceNumber);
+
+ long t2 = System.currentTimeMillis();
+ LOG.trace("Current time: " + t2);
+ LOG.debug("Time elapsed: " + (t2 - t1) + " ms");
+ }
+
+ // Flushes the WAL and Creates a RocksDB checkpoint
+ void createCheckPoint(String dbPathArg, String cpPathArg,
+ RocksDB rocksDB) {
+ LOG.debug("Creating RocksDB '{}' checkpoint at '{}'", dbPathArg, cpPathArg);
+ try {
+ rocksDB.flush(new FlushOptions());
+ Checkpoint cp = Checkpoint.create(rocksDB);
+ cp.createCheckpoint(cpPathArg);
+ } catch (RocksDBException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ void printAllSnapshots() {
+ for (DifferSnapshotInfo snap : snapshots) {
+ LOG.debug("{}", snap);
+ }
+ }
+
+ // Test Code to create sample RocksDB instance.
+ private RocksDB createRocksDBInstanceAndWriteKeys(String dbPathArg,
+ RocksDBCheckpointDiffer differ) throws RocksDBException {
+
+ LOG.debug("Creating RocksDB at '{}'", dbPathArg);
+
+ // Delete the test DB dir if it exists
+ File dir = new File(dbPathArg);
+ if (dir.exists()) {
+ deleteDirectory(dir);
+ }
- System.out.println("Creating RocksDB instance at :" + dbPathArg);
+ final ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
+ .optimizeUniversalStyleCompaction();
+ final List<ColumnFamilyDescriptor> cfDescriptors =
+ RocksDBCheckpointDiffer.getCFDescriptorList(cfOpts);
+ List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
- RocksDB rocksDB = null;
- rocksDB = differ.getRocksDBInstanceWithCompactionTracking(dbPathArg);
+ // Create a RocksDB instance with compaction tracking
+ final DBOptions dbOptions = new DBOptions()
+ .setCreateIfMissing(true)
+ .setCreateMissingColumnFamilies(true);
+ differ.setRocksDBForCompactionTracking(dbOptions);
+ RocksDB rocksDB = RocksDB.open(dbOptions, dbPathArg, cfDescriptors,
+ cfHandles);
+
+ differ.setCurrentCompactionLog(rocksDB.getLatestSequenceNumber());
Random random = new Random();
// key-value
for (int i = 0; i < NUM_ROW; ++i) {
String generatedString = getRandomString(random, 7);
- String keyStr = " My" + generatedString + "StringKey" + i;
- String valueStr = " My " + generatedString + "StringValue" + i;
+ String keyStr = "Key-" + i + "-" + generatedString;
+ String valueStr = "Val-" + i + "-" + generatedString;
byte[] key = keyStr.getBytes(UTF_8);
- rocksDB.put(key, valueStr.getBytes(UTF_8));
+ // Put entry in keyTable
+ rocksDB.put(cfHandles.get(1), key, valueStr.getBytes(UTF_8));
if (i % SNAPSHOT_EVERY_SO_MANY_KEYS == 0) {
- differ.createSnapshot(rocksDB);
+ createCheckpoint(differ, rocksDB);
}
- //System.out.println(toStr(rocksDB.get(key));
}
- differ.createSnapshot(rocksDB);
+ createCheckpoint(differ, rocksDB);
return rocksDB;
}
- // RocksDB.DEFAULT_COLUMN_FAMILY
- private void UpdateRocksDBInstance(String dbPathArg, RocksDB rocksDB) {
+ static boolean deleteDirectory(java.io.File directoryToBeDeleted) {
+ File[] allContents = directoryToBeDeleted.listFiles();
+ if (allContents != null) {
+ for (java.io.File file : allContents) {
+ if (!deleteDirectory(file)) {
+ return false;
+ }
+ }
+ }
+ return directoryToBeDeleted.delete();
+ }
+
+ /**
+ * RocksDB.DEFAULT_COLUMN_FAMILY.
+ */
+ private void updateRocksDBInstance(String dbPathArg, RocksDB rocksDB) {
System.out.println("Updating RocksDB instance at :" + dbPathArg);
- //
- try (final Options options =
- new Options().setCreateIfMissing(true).
- setCompressionType(CompressionType.NO_COMPRESSION)) {
+
+ try (Options options = new Options().setCreateIfMissing(true)) {
if (rocksDB == null) {
rocksDB = RocksDB.open(options, dbPathArg);
}
Random random = new Random();
// key-value
- for (int i = 0; i< NUM_ROW; ++i) {
+ for (int i = 0; i < NUM_ROW; ++i) {
String generatedString = getRandomString(random, 7);
String keyStr = " MyUpdated" + generatedString + "StringKey" + i;
String valueStr = " My Updated" + generatedString + "StringValue" + i;
@@ -152,12 +326,14 @@ public class TestRocksDBCheckpointDiffer {
}
}
- // RocksDB.DEFAULT_COLUMN_FAMILY
+ /**
+ * RocksDB.DEFAULT_COLUMN_FAMILY.
+ */
public void testDefaultColumnFamilyOriginal() {
System.out.println("testDefaultColumnFamily begin...");
- //
- try (final Options options = new Options().setCreateIfMissing(true)) {
- try (final RocksDB rocksDB = RocksDB.open(options, "./rocksdb-data")) {
+
+ try (Options options = new Options().setCreateIfMissing(true)) {
+ try (RocksDB rocksDB = RocksDB.open(options, "./rocksdb-data")) {
// key-value
byte[] key = "Hello".getBytes(UTF_8);
rocksDB.put(key, "World".getBytes(UTF_8));
@@ -167,7 +343,7 @@ public class TestRocksDBCheckpointDiffer {
rocksDB.put("SecondKey".getBytes(UTF_8), "SecondValue".getBytes(UTF_8));
// List
- List<byte[]> keys = Arrays.asList(key, "SecondKey".getBytes(UTF_8),
+ List<byte[]> keys = asList(key, "SecondKey".getBytes(UTF_8),
"missKey".getBytes(UTF_8));
List<byte[]> values = rocksDB.multiGetAsList(keys);
for (int i = 0; i < keys.size(); i++) {
@@ -200,21 +376,22 @@ public class TestRocksDBCheckpointDiffer {
// (table)
public void testCertainColumnFamily() {
System.out.println("\ntestCertainColumnFamily begin...");
- try (final ColumnFamilyOptions cfOpts =
- new ColumnFamilyOptions().optimizeUniversalStyleCompaction()) {
+ try (ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
+ .optimizeUniversalStyleCompaction()) {
String cfName = "my-first-columnfamily";
// list of column family descriptors, first entry must always be
// default column family
- final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList(
+ final List<ColumnFamilyDescriptor> cfDescriptors = asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), cfOpts)
);
List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
- try (final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).
- setCreateMissingColumnFamilies(true);
- final RocksDB rocksDB = RocksDB.open(dbOptions, "./rocksdb-data-cf" +
- "/", cfDescriptors, cfHandles)) {
+ try (DBOptions dbOptions = new DBOptions()
+ .setCreateIfMissing(true)
+ .setCreateMissingColumnFamilies(true);
+ RocksDB rocksDB = RocksDB.open(dbOptions,
+ "./rocksdb-data-cf/", cfDescriptors, cfHandles)) {
ColumnFamilyHandle cfHandle = cfHandles.stream().filter(x -> {
try {
return (toStr(x.getName())).equals(cfName);
@@ -229,39 +406,39 @@ public class TestRocksDBCheckpointDiffer {
"FirstValue".getBytes(UTF_8));
// key
byte[] getValue = rocksDB.get(cfHandles.get(0), key.getBytes(UTF_8));
- System.out.println("get Value : " + toStr(getValue));
+ LOG.debug("get Value: " + toStr(getValue));
// key/value
rocksDB.put(cfHandles.get(1), "SecondKey".getBytes(UTF_8),
"SecondValue".getBytes(UTF_8));
- List<byte[]> keys = Arrays.asList(key.getBytes(UTF_8),
+ List<byte[]> keys = asList(key.getBytes(UTF_8),
"SecondKey".getBytes(UTF_8));
- List<ColumnFamilyHandle> cfHandleList = Arrays.asList(cfHandle,
- cfHandle);
+ List<ColumnFamilyHandle> cfHandleList = asList(cfHandle, cfHandle);
+
// key
List<byte[]> values = rocksDB.multiGetAsList(cfHandleList, keys);
for (int i = 0; i < keys.size(); i++) {
- System.out.println("multiGet:" + toStr(keys.get(i)) + "--" +
+ LOG.debug("multiGet:" + toStr(keys.get(i)) + "--" +
(values.get(i) == null ? null : toStr(values.get(i))));
}
- //rocksDB.compactRange();
- //rocksDB.compactFiles();
+
List<LiveFileMetaData> liveFileMetaDataList =
rocksDB.getLiveFilesMetaData();
for (LiveFileMetaData m : liveFileMetaDataList) {
System.out.println("Live File Metadata");
System.out.println("\tFile :" + m.fileName());
- System.out.println("\ttable :" + toStr(m.columnFamilyName()));
+ System.out.println("\tTable :" + toStr(m.columnFamilyName()));
System.out.println("\tKey Range :" + toStr(m.smallestKey()) +
" " + "<->" + toStr(m.largestKey()));
}
+
// key
rocksDB.delete(cfHandle, key.getBytes(UTF_8));
// key
RocksIterator iter = rocksDB.newIterator(cfHandle);
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
- System.out.println("iterator:" + toStr(iter.key()) + ":" +
+ System.out.println("Iterator:" + toStr(iter.key()) + ":" +
toStr(iter.value()));
}
} finally {
@@ -277,18 +454,15 @@ public class TestRocksDBCheckpointDiffer {
// Read from a given RocksDB instance and optionally write all the
// keys to a given file.
- //
- public void readRocksDBInstance(String dbPathArg, RocksDB rocksDB,
- FileWriter file,
- RocksDBCheckpointDiffer differ) {
- System.out.println("Reading RocksDB instance at : " + dbPathArg);
+ void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file,
+ RocksDBCheckpointDiffer differ) {
+
+ LOG.debug("Reading RocksDB: " + dbPathArg);
boolean createdDB = false;
- //
- try (final Options options =
- new Options().setParanoidChecks(true)
- .setCreateIfMissing(true)
- .setCompressionType(CompressionType.NO_COMPRESSION)
- .setForceConsistencyChecks(false)) {
+
+ try (Options options = new Options()
+ .setParanoidChecks(true)
+ .setForceConsistencyChecks(false)) {
if (rocksDB == null) {
rocksDB = RocksDB.openReadOnly(options, dbPathArg);
@@ -298,22 +472,21 @@ public class TestRocksDBCheckpointDiffer {
List<LiveFileMetaData> liveFileMetaDataList =
rocksDB.getLiveFilesMetaData();
for (LiveFileMetaData m : liveFileMetaDataList) {
- System.out.println("Live File Metadata");
- System.out.println("\tFile : " + m.fileName());
- System.out.println("\tLevel : " + m.level());
- System.out.println("\tTable : " + toStr(m.columnFamilyName()));
- System.out.println("\tKey Range : " + toStr(m.smallestKey())
+ LOG.debug("SST File: {}. ", m.fileName());
+ LOG.debug("\tLevel: {}", m.level());
+ LOG.debug("\tTable: {}", toStr(m.columnFamilyName()));
+ LOG.debug("\tKey Range: {}", toStr(m.smallestKey())
+ " <-> " + toStr(m.largestKey()));
if (differ.debugEnabled(DEBUG_DAG_LIVE_NODES)) {
differ.printMutableGraphFromAGivenNode(m.fileName(), m.level(),
- differ.getCompactionFwdDAG());
+ differ.getForwardCompactionDAG());
}
}
- if(differ.debugEnabled(DEBUG_READ_ALL_DB_KEYS)) {
+ if (differ.debugEnabled(DEBUG_READ_ALL_DB_KEYS)) {
RocksIterator iter = rocksDB.newIterator();
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
- System.out.println("iterator key:" + toStr(iter.key()) + ", " +
+ LOG.debug("Iterator key:" + toStr(iter.key()) + ", " +
"iter value:" + toStr(iter.value()));
if (file != null) {
file.write("iterator key:" + toStr(iter.key()) + ", iter " +
@@ -325,7 +498,7 @@ public class TestRocksDBCheckpointDiffer {
} catch (IOException | RocksDBException e) {
e.printStackTrace();
} finally {
- if (createdDB){
+ if (createdDB) {
rocksDB.close();
}
}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/resources/log4j.properties b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..b8ad21d6c7
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
index e4323c271b..689cd439c5 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
@@ -107,6 +107,10 @@ public final class SnapshotInfo implements Auditable {
private String globalPreviousSnapshotID;
private String snapshotPath; // snapshot mask
private String checkpointDir;
+ /**
+ * RocksDB transaction sequence number at the time of checkpoint creation.
+ */
+ private long dbTxSequenceNumber;
/**
* Private constructor, constructed via builder.
@@ -410,6 +414,14 @@ public final class SnapshotInfo implements Auditable {
return getCheckpointDirName(getSnapshotID());
}
+ public long getDbTxSequenceNumber() {
+ return dbTxSequenceNumber;
+ }
+
+ public void setDbTxSequenceNumber(long dbTxSequenceNumber) {
+ this.dbTxSequenceNumber = dbTxSequenceNumber;
+ }
+
/**
* Get the table key for this snapshot.
*/
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
new file mode 100644
index 0000000000..d90a1c6e03
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotInfo;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_S3_VOLUME_NAME_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+
+/**
+ * Tests Freon, with MiniOzoneCluster.
+ */
+public class TestOMSnapshotDAG {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestOMSnapshotDAG.class);
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+ private static ObjectStore store;
+ private final File metaDir = OMStorage.getOmDbDir(conf);
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ */
+ @BeforeAll
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ DatanodeRatisServerConfig ratisServerConfig =
+ conf.getObject(DatanodeRatisServerConfig.class);
+ ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+ ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
+ conf.setFromObject(ratisServerConfig);
+
+ RatisClientConfig.RaftConfig raftClientConfig =
+ conf.getObject(RatisClientConfig.RaftConfig.class);
+ raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+ raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3));
+ conf.setFromObject(raftClientConfig);
+
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
+ cluster.waitForClusterToBeReady();
+
+ store = cluster.getClient().getObjectStore();
+
+ GenericTestUtils.setLogLevel(RaftLog.LOG, Level.INFO);
+ GenericTestUtils.setLogLevel(RaftServer.LOG, Level.INFO);
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterAll
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private String getDBCheckpointAbsolutePath(SnapshotInfo snapshotInfo) {
+ return metaDir + OM_KEY_PREFIX +
+ OM_SNAPSHOT_DIR + OM_KEY_PREFIX +
+ OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+ }
+
+ private static String getSnapshotDBKey(String volumeName, String bucketName,
+ String snapshotName) {
+
+ final String dbKeyPrefix = OM_KEY_PREFIX + volumeName +
+ OM_KEY_PREFIX + bucketName;
+ return dbKeyPrefix + OM_KEY_PREFIX + snapshotName;
+ }
+
+ private DifferSnapshotInfo getDifferSnapshotInfo(
+ OMMetadataManager omMetadataManager, String volumeName, String bucketName,
+ String snapshotName) throws IOException {
+
+ final String dbKey = getSnapshotDBKey(volumeName, bucketName, snapshotName);
+ final SnapshotInfo snapshotInfo =
+ omMetadataManager.getSnapshotInfoTable().get(dbKey);
+ String checkpointPath = getDBCheckpointAbsolutePath(snapshotInfo);
+
+ // Use RocksDB transaction sequence number in SnapshotInfo, which is
+ // persisted at the time of snapshot creation, as the snapshot generation
+ return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotID(),
+ snapshotInfo.getDbTxSequenceNumber());
+ }
+
+ @Test
+ void testZeroSizeKey()
+ throws IOException, InterruptedException, TimeoutException {
+
+ RandomKeyGenerator randomKeyGenerator =
+ new RandomKeyGenerator(cluster.getConf());
+ CommandLine cmd = new CommandLine(randomKeyGenerator);
+ cmd.execute("--num-of-volumes", "1",
+ "--num-of-buckets", "1",
+ "--num-of-keys", "600",
+ "--num-of-threads", "1",
+ "--key-size", "0",
+ "--factor", "THREE",
+ "--type", "RATIS",
+ "--validate-writes"
+ );
+
+ Assertions.assertEquals(600L, randomKeyGenerator.getNumberOfKeysAdded());
+ Assertions.assertEquals(600L,
+ randomKeyGenerator.getSuccessfulValidationCount());
+
+ List<OmVolumeArgs> volList = cluster.getOzoneManager()
+ .listAllVolumes("", "", 10);
+ LOG.debug("List of all volumes: {}", volList);
+ final String volumeName = volList.stream().filter(e ->
+ !e.getVolume().equals(OZONE_S3_VOLUME_NAME_DEFAULT)) // Ignore s3v vol
+ .collect(Collectors.toList()).get(0).getVolume();
+ List<OmBucketInfo> bucketList =
+ cluster.getOzoneManager().listBuckets(volumeName, "", "", 10);
+ LOG.debug("List of all buckets under the first volume: {}", bucketList);
+ final String bucketName = bucketList.get(0).getBucketName();
+
+ // Create snapshot
+ String resp = store.createSnapshot(volumeName, bucketName, "snap1");
+ LOG.debug("Snapshot created: {}", resp);
+
+ final OzoneVolume volume = store.getVolume(volumeName);
+ final OzoneBucket bucket = volume.getBucket(bucketName);
+
+ for (int i = 0; i < 6000; i++) {
+ bucket.createKey("b_" + i, 0).close();
+ }
+
+ // Create another snapshot
+ resp = store.createSnapshot(volumeName, bucketName, "snap3");
+ LOG.debug("Snapshot created: {}", resp);
+
+ // Get snapshot SST diff list
+ OzoneManager ozoneManager = cluster.getOzoneManager();
+ OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+ RDBStore rdbStore = (RDBStore) omMetadataManager.getStore();
+ RocksDBCheckpointDiffer differ = rdbStore.getRocksDBCheckpointDiffer();
+
+ DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager,
+ volumeName, bucketName, "snap1");
+ DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
+ volumeName, bucketName, "snap3");
+
+ // RocksDB does checkpointing in a separate thread, wait for it
+ final File checkpointSnap1 = new File(snap1.getDbPath());
+ GenericTestUtils.waitFor(checkpointSnap1::exists, 2000, 20000);
+ final File checkpointSnap3 = new File(snap3.getDbPath());
+ GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);
+
+ List<String> actualDiffList = differ.getSSTDiffList(snap3, snap1);
+ LOG.debug("Got diff list: {}", actualDiffList);
+ // Hard-coded expected output.
+ // The result is deterministic. Retrieved from a successful run.
+ final List<String> expectedDiffList = Collections.singletonList("000059");
+ Assertions.assertEquals(expectedDiffList, actualDiffList);
+
+ // TODO: Use smaller DB write buffer size (currently it is set to 128 MB
+ // in DBProfile), or generate enough keys (in the millions) to trigger
+ // RDB compaction. Take another snapshot and do the diff again.
+ // Then restart OM, do the same diff again to see if DAG reconstruction
+ // works.
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 5c9b3409a5..27787143f7 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -744,17 +744,18 @@ enum SnapshotStatusProto {
* SnapshotInfo table entry for Bucket/Volume snapshot metadata
*/
message SnapshotInfo {
- required string snapshotID = 1;
- required string name = 2;
- required string volumeName = 3;
- required string bucketName = 4;
- required SnapshotStatusProto snapshotStatus = 5;
- required uint64 creationTime = 6;
- required uint64 deletionTime = 7;
- required string pathPreviousSnapshotID = 8;
- required string globalPreviousSnapshotID = 9;
- required string snapshotPath = 10;
- required string checkpointDir = 11;
+ optional string snapshotID = 1;
+ optional string name = 2;
+ optional string volumeName = 3;
+ optional string bucketName = 4;
+ optional SnapshotStatusProto snapshotStatus = 5;
+ optional uint64 creationTime = 6;
+ optional uint64 deletionTime = 7;
+ optional string pathPreviousSnapshotID = 8;
+ optional string globalPreviousSnapshotID = 9;
+ optional string snapshotPath = 10;
+ optional string checkpointDir = 11;
+ optional int64 dbTxSequenceNumber = 12;
}
message OzoneObj {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 5b829664a0..e78df5a4a5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -466,7 +466,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
rocksDBConfiguration).setName(dbName)
.setOpenReadOnly(readOnly)
.setPath(Paths.get(metaDir.getPath()))
- .setMaxFSSnapshots(maxFSSnapshots);
+ .setMaxFSSnapshots(maxFSSnapshots)
+ .setEnableCompactionLog(true);
DBStore dbStore = addOMTablesAndCodecs(dbStoreBuilder).build();
return dbStore;
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index ffdd136877..3e0057c981 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import java.io.IOException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,7 +132,27 @@ public final class OmSnapshotManager {
OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo)
throws IOException {
RDBStore store = (RDBStore) omMetadataManager.getStore();
- return store.getSnapshot(snapshotInfo.getCheckpointDirName());
+
+ final long dbLatestSequenceNumber = snapshotInfo.getDbTxSequenceNumber();
+
+ final RocksDBCheckpointDiffer dbCpDiffer =
+ omMetadataManager.getStore().getRocksDBCheckpointDiffer();
+
+ final DBCheckpoint dbCheckpoint = store.getSnapshot(
+ snapshotInfo.getCheckpointDirName());
+
+ // Write snapshot generation (latest sequence number) to compaction log.
+ // This will be used for DAG reconstruction as snapshotGeneration.
+ dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber);
+
+ // Set compaction log filename to the latest DB sequence number
+ // right after taking the RocksDB checkpoint for Ozone snapshot.
+ //
+ // Note it doesn't matter if sequence number hasn't increased (even though
+ // it shouldn't happen), since the writer always appends the file.
+ dbCpDiffer.setCurrentCompactionLog(dbLatestSequenceNumber);
+
+ return dbCheckpoint;
}
// Get OmSnapshot if the keyname has ".snapshot" key indicator
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
index 7efa4ab7f9..3c36845e19 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.om.request.snapshot;
import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OmUtils;
@@ -132,6 +133,13 @@ public class OMSnapshotCreateRequest extends OMClientRequest {
throw new OMException("Snapshot already exists", FILE_ALREADY_EXISTS);
}
+ // Note down RDB latest transaction sequence number, which is used
+ // as snapshot generation in the differ.
+ final long dbLatestSequenceNumber =
+ ((RDBStore) omMetadataManager.getStore()).getDb()
+ .getLatestSequenceNumber();
+ snapshotInfo.setDbTxSequenceNumber(dbLatestSequenceNumber);
+
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(key),
new CacheValue<>(Optional.of(snapshotInfo), transactionLogIndex));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org