You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2023/04/25 13:16:39 UTC
[ozone] branch master updated: HDDS-7995. Support incremental snapshot on OM (#4294)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cc1d2b3984 HDDS-7995. Support incremental snapshot on OM (#4294)
cc1d2b3984 is described below
commit cc1d2b3984194d22027fc119c05329e1c9e73c5f
Author: NibiruXu <ax...@qq.com>
AuthorDate: Tue Apr 25 21:16:33 2023 +0800
HDDS-7995. Support incremental snapshot on OM (#4294)
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 6 +
.../hadoop/hdds/utils/DBCheckpointMetrics.java | 21 ++
.../hadoop/hdds/utils/DBCheckpointServlet.java | 49 +++-
.../apache/hadoop/hdds/utils/FaultInjector.java | 43 ++++
.../java/org/apache/hadoop/hdds/utils/HAUtils.java | 49 +++-
.../apache/hadoop/hdds/utils/HddsServerUtil.java | 26 +-
.../hadoop/hdds/utils/RDBSnapshotProvider.java | 233 +++++++++++++++++
.../hadoop/hdds/utils/TestRDBSnapshotProvider.java | 234 +++++++++++++++++
.../apache/hadoop/hdds/utils/db/TestRDBStore.java | 71 +++++-
.../hdds/scm/ha/SCMDBCheckpointProvider.java | 4 +-
.../hadoop/ozone/om/helpers/OMNodeDetails.java | 43 ++--
.../hdds/scm/TestSCMDbCheckpointServlet.java | 2 +-
.../hadoop/ozone/om/TestOMDbCheckpointServlet.java | 50 +++-
.../hadoop/ozone/om/TestOMRatisSnapshots.java | 277 ++++++++++++++++++++-
.../snapshot/TestOzoneManagerSnapshotProvider.java | 2 +-
.../hadoop/ozone/om/OMDBCheckpointServlet.java | 22 +-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 49 ++--
.../ozone/om/ratis/OzoneManagerStateMachine.java | 26 ++
.../om/ratis_snapshot/OmRatisSnapshotProvider.java | 96 +++----
19 files changed, 1161 insertions(+), 142 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 9ba770c214..99108fc766 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -143,6 +143,8 @@ public final class OzoneConsts {
"flushBeforeCheckpoint";
public static final String OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA =
"includeSnapshotData";
+ public static final String OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST =
+ "toExcludeSST";
public static final String RANGER_OZONE_SERVICE_VERSION_KEY =
"#RANGEROZONESERVICEVERSION";
@@ -458,6 +460,10 @@ public final class OzoneConsts {
public static final long DEFAULT_OM_UPDATE_ID = -1L;
+ // RocksDB snapshot
+ public static final String SNAPSHOT_CANDIDATE_DIR = ".candidate";
+ public static final String ROCKSDB_SST_SUFFIX = ".sst";
+
// SCM default service Id and node Id in non-HA where config is not defined
// in non-HA style.
public static final String SCM_DUMMY_NODEID = "scmNodeId";
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointMetrics.java
index d3f3cd1d8f..52b6e1ef53 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointMetrics.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointMetrics.java
@@ -40,8 +40,11 @@ public class DBCheckpointMetrics {
// Metrics to track checkpoint statistics from last run.
private @Metric MutableGaugeLong lastCheckpointCreationTimeTaken;
private @Metric MutableGaugeLong lastCheckpointStreamingTimeTaken;
+ private @Metric MutableGaugeLong lastCheckpointStreamingNumSSTExcluded;
+ // NOTE: numCheckpoints includes numIncrementalCheckpoints
private @Metric MutableCounterLong numCheckpoints;
private @Metric MutableCounterLong numCheckpointFails;
+ private @Metric MutableCounterLong numIncrementalCheckpoints;
public DBCheckpointMetrics() {
}
@@ -68,6 +71,10 @@ public class DBCheckpointMetrics {
this.lastCheckpointStreamingTimeTaken.set(val);
}
+ public void setLastCheckpointStreamingNumSSTExcluded(long val) {
+ this.lastCheckpointStreamingNumSSTExcluded.set(val);
+ }
+
@VisibleForTesting
public void incNumCheckpoints() {
numCheckpoints.incr();
@@ -78,6 +85,11 @@ public class DBCheckpointMetrics {
numCheckpointFails.incr();
}
+ @VisibleForTesting
+ public void incNumIncrementalCheckpoint() {
+ numIncrementalCheckpoints.incr();
+ }
+
@VisibleForTesting
public long getLastCheckpointCreationTimeTaken() {
return lastCheckpointCreationTimeTaken.value();
@@ -88,6 +100,11 @@ public class DBCheckpointMetrics {
return numCheckpoints.value();
}
+ @VisibleForTesting
+ public long getNumIncrementalCheckpoints() {
+ return numIncrementalCheckpoints.value();
+ }
+
@VisibleForTesting
public long getNumCheckpointFails() {
return numCheckpointFails.value();
@@ -97,4 +114,8 @@ public class DBCheckpointMetrics {
public long getLastCheckpointStreamingTimeTaken() {
return lastCheckpointStreamingTimeTaken.value();
}
+
+ public long getLastCheckpointStreamingNumSSTExcluded() {
+ return lastCheckpointStreamingNumSSTExcluded.value();
+ }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
index 21b7c7cfde..291dd3b27d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
@@ -26,7 +26,12 @@ import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.hadoop.hdds.server.OzoneAdmins;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -36,6 +41,8 @@ import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.writeDBCheckpointToStream;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@@ -135,12 +142,24 @@ public class DBCheckpointServlet extends HttpServlet {
DBCheckpoint checkpoint = null;
try {
-
boolean flush = false;
String flushParam =
request.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH);
if (StringUtils.isNotEmpty(flushParam)) {
- flush = Boolean.valueOf(flushParam);
+ flush = Boolean.parseBoolean(flushParam);
+ }
+
+ List<String> receivedSstList = new ArrayList<>();
+ List<String> excludedSstList = new ArrayList<>();
+ String[] sstParam = request.getParameterValues(
+ OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST);
+ if (sstParam != null) {
+ receivedSstList.addAll(
+ Arrays.stream(sstParam)
+ .filter(s -> s.endsWith(ROCKSDB_SST_SUFFIX))
+ .distinct()
+ .collect(Collectors.toList()));
+ LOG.info("Received excluding SST {}", receivedSstList);
}
checkpoint = dbStore.getCheckpoint(flush);
@@ -164,12 +183,20 @@ public class DBCheckpointServlet extends HttpServlet {
Instant start = Instant.now();
writeDbDataToStream(checkpoint, request,
- response.getOutputStream());
+ response.getOutputStream(), receivedSstList, excludedSstList);
Instant end = Instant.now();
long duration = Duration.between(start, end).toMillis();
LOG.info("Time taken to write the checkpoint to response output " +
"stream: {} milliseconds", duration);
+
+ LOG.info("Excluded SST {} from the latest checkpoint.",
+ excludedSstList);
+ if (!excludedSstList.isEmpty()) {
+ dbMetrics.incNumIncrementalCheckpoint();
+ }
+ dbMetrics.setLastCheckpointStreamingNumSSTExcluded(
+ excludedSstList.size());
dbMetrics.setLastCheckpointStreamingTimeTaken(duration);
dbMetrics.incNumCheckpoints();
} catch (Exception e) {
@@ -196,12 +223,20 @@ public class DBCheckpointServlet extends HttpServlet {
* @param ignoredRequest The httpRequest which generated this checkpoint.
* (Parameter is ignored in this class but used in child classes).
* @param destination The stream to write to.
+ * @param toExcludeList the files to be excluded
+ * @param excludedList the files excluded
+ *
*/
public void writeDbDataToStream(DBCheckpoint checkpoint,
- HttpServletRequest ignoredRequest,
- OutputStream destination)
+ HttpServletRequest ignoredRequest,
+ OutputStream destination,
+ List<String> toExcludeList,
+ List<String> excludedList)
throws IOException, InterruptedException {
- writeDBCheckpointToStream(checkpoint, destination);
- }
+ Objects.requireNonNull(toExcludeList);
+ Objects.requireNonNull(excludedList);
+ writeDBCheckpointToStream(checkpoint, destination,
+ toExcludeList, excludedList);
+ }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
new file mode 100644
index 0000000000..27957d162a
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+
+/**
+ * Used to inject certain faults for testing.
+ */
+public abstract class FaultInjector {
+
+ @VisibleForTesting
+ public void init() {
+ }
+
+ @VisibleForTesting
+ public void pause() throws IOException {
+ }
+
+ @VisibleForTesting
+ public void resume() throws IOException {
+ }
+
+ @VisibleForTesting
+ public void reset() throws IOException {
+ }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index df368e5319..055b904d02 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.utils;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -53,6 +54,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -71,6 +73,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURAT
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION_DEFAULT;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
/**
@@ -168,6 +171,7 @@ public final class HAUtils {
/**
* Replace the current DB with the new DB checkpoint.
+ * (checkpoint in checkpointPath will not be deleted here)
*
* @param lastAppliedIndex the last applied index in the current SCM DB.
* @param checkpointPath path to the new DB checkpoint
@@ -201,13 +205,16 @@ public final class HAUtils {
// an inconsistent state and this marker file will fail it from
// starting up.
Files.createFile(markerFile);
- FileUtils.moveDirectory(checkpointPath, oldDB.toPath());
+ // Copy the candidate DB to real DB
+ org.apache.commons.io.FileUtils.copyDirectory(checkpointPath.toFile(),
+ oldDB);
Files.deleteIfExists(markerFile);
} catch (IOException e) {
LOG.error("Failed to move downloaded DB checkpoint {} to metadata "
+ "directory {}. Resetting to original DB.", checkpointPath,
oldDB.toPath());
try {
+ FileUtil.fullyDelete(oldDB);
Files.move(dbBackup.toPath(), oldDB.toPath());
Files.deleteIfExists(markerFile);
} catch (IOException ex) {
@@ -225,7 +232,7 @@ public final class HAUtils {
*/
public static TransactionInfo getTrxnInfoFromCheckpoint(
OzoneConfiguration conf, Path dbPath, DBDefinition definition)
- throws Exception {
+ throws IOException {
if (dbPath != null) {
Path dbDir = dbPath.getParent();
@@ -245,12 +252,12 @@ public final class HAUtils {
* @param tempConfig
* @param dbDir path to DB
* @return TransactionInfo
- * @throws Exception
+ * @throws IOException
*/
private static TransactionInfo getTransactionInfoFromDB(
OzoneConfiguration tempConfig, Path dbDir, String dbName,
DBDefinition definition)
- throws Exception {
+ throws IOException {
try (DBStore dbStore = loadDB(tempConfig, dbDir.toFile(),
dbName, definition)) {
@@ -297,7 +304,7 @@ public final class HAUtils {
long lastAppliedIndex, String leaderId, Path newDBlocation,
Logger logger) {
if (transactionInfo.getTransactionIndex() <= lastAppliedIndex) {
- logger.error("Failed to install checkpoint from SCM leader: {}"
+ logger.error("Failed to install checkpoint from the leader: {}"
+ ". The last applied index: {} is greater than or equal to the "
+ "checkpoint's applied index: {}. Deleting the downloaded "
+ "checkpoint {}", leaderId, lastAppliedIndex,
@@ -319,7 +326,8 @@ public final class HAUtils {
configuration.getObject(RocksDBConfiguration.class);
DBStoreBuilder dbStoreBuilder =
DBStoreBuilder.newBuilder(configuration, rocksDBConfiguration)
- .setName(dbName).setPath(Paths.get(metaDir.getPath()));
+ .setName(dbName)
+ .setPath(Paths.get(metaDir.getPath()));
// Add column family names and codecs.
for (DBColumnFamilyDefinition columnFamily : definition
.getColumnFamilies()) {
@@ -347,6 +355,35 @@ public final class HAUtils {
}
return metadataDir;
}
+
+ /**
+ * Scan the DB dir and return the existing SST files.
+ * SSTs could be used for avoiding repeated download.
+ *
+ * @param db the file representing the DB to be scanned
+ * @return the list of SST file name. If db not exist, will return empty list
+ */
+ public static List<String> getExistingSstFiles(File db) {
+ List<String> sstList = new ArrayList<>();
+ if (!db.exists()) {
+ return sstList;
+ }
+ FilenameFilter filter = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.endsWith(ROCKSDB_SST_SUFFIX);
+ }
+ };
+ String[] tempArray = db.list(filter);
+ if (tempArray != null) {
+ sstList = Arrays.asList(tempArray);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scanned SST files {} in {}.", sstList, db.getAbsolutePath());
+ }
+ }
+ return sstList;
+ }
+
/**
* Build CA list which need to be passed to client.
*
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 201a70e42b..f120ffad33 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -26,6 +26,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
@@ -525,12 +526,17 @@ public final class HddsServerUtil {
/**
* Write DB Checkpoint to an output stream as a compressed file (tar).
*
- * @param checkpoint checkpoint file
- * @param destination destination output stream.
+ * @param checkpoint checkpoint file
+ * @param destination destination output stream.
+ * @param toExcludeList the files to be excluded
+ * @param excludedList the files excluded
* @throws IOException
*/
- public static void writeDBCheckpointToStream(DBCheckpoint checkpoint,
- OutputStream destination)
+ public static void writeDBCheckpointToStream(
+ DBCheckpoint checkpoint,
+ OutputStream destination,
+ List<String> toExcludeList,
+ List<String> excludedList)
throws IOException {
try (TarArchiveOutputStream archiveOutputStream =
new TarArchiveOutputStream(destination);
@@ -540,10 +546,14 @@ public final class HddsServerUtil {
TarArchiveOutputStream.BIGNUMBER_POSIX);
for (Path path : files.collect(Collectors.toList())) {
if (path != null) {
- Path fileName = path.getFileName();
- if (fileName != null) {
- includeFile(path.toFile(), fileName.toString(),
- archiveOutputStream);
+ Path fileNamePath = path.getFileName();
+ if (fileNamePath != null) {
+ String fileName = fileNamePath.toString();
+ if (!toExcludeList.contains(fileName)) {
+ includeFile(path.toFile(), fileName, archiveOutputStream);
+ } else {
+ excludedList.add(fileName);
+ }
}
}
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
new file mode 100644
index 0000000000..c934e7d2b9
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_CANDIDATE_DIR;
+
+/**
+ * The RocksDB specified snapshot provider.
+ * Supports Incremental Snapshot and Full Snapshot.
+ *
+ * The process is as the followings:
+ * 1. Download the snapshot file from the leader
+ * 2. Untar the snapshot file to candidate dir
+ * 3. Return the candidate dir as DBCheckpoint
+ *
+ * The difference between incremental and full snapshot is whether to send
+ * the existing SST file list to the leader or not.
+ *
+ */
+public abstract class RDBSnapshotProvider implements Closeable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RDBSnapshotProvider.class);
+
+ private final File snapshotDir;
+ private final File candidateDir;
+ private final String dbName;
+ private final AtomicReference<String> lastLeaderRef;
+ private final AtomicLong numDownloaded;
+ private FaultInjector injector;
+
+ public RDBSnapshotProvider(File snapshotDir, String dbName) {
+ this.snapshotDir = snapshotDir;
+ this.candidateDir = new File(snapshotDir, dbName + SNAPSHOT_CANDIDATE_DIR);
+ this.dbName = dbName;
+ this.injector = null;
+ this.lastLeaderRef = new AtomicReference<>(null);
+ this.numDownloaded = new AtomicLong();
+ init();
+ }
+
+ /**
+ * Initialize or reinitialize the RDB snapshot provider.
+ */
+ public synchronized void init() {
+ // check parent snapshot dir
+ if (!snapshotDir.exists()) {
+ HddsUtils.createDir(snapshotDir.toString());
+ }
+
+ LOG.info("Cleaning up the candidate dir: {}", candidateDir);
+ // cleanup candidate dir
+ if (candidateDir.exists()) {
+ FileUtil.fullyDeleteContents(candidateDir);
+ } else {
+ // create candidate dir
+ HddsUtils.createDir(candidateDir.toString());
+ }
+
+ // reset leader info
+ lastLeaderRef.set(null);
+ }
+
+ /**
+ * Download the latest DB snapshot(checkpoint) from the Leader.
+ *
+ * @param leaderNodeID the ID of leader node
+ * @return {@link DBCheckpoint}
+ * @throws IOException
+ */
+ public DBCheckpoint downloadDBSnapshotFromLeader(String leaderNodeID)
+ throws IOException {
+ LOG.info("Prepare to download the snapshot from leader OM {} and " +
+ "reloading state from the snapshot.", leaderNodeID);
+ checkLeaderConsistent(leaderNodeID);
+
+ String snapshotFileName = getSnapshotFileName(leaderNodeID);
+ File targetFile = new File(snapshotDir, snapshotFileName);
+ downloadSnapshot(leaderNodeID, targetFile);
+ LOG.info("Successfully download the latest snapshot {} from leader OM: {}",
+ targetFile, leaderNodeID);
+
+ RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
+ candidateDir, true);
+ LOG.info("Successfully untar the downloaded snapshot {} at {}.", targetFile,
+ checkpoint.getCheckpointLocation());
+
+ numDownloaded.incrementAndGet();
+ injectPause();
+ return checkpoint;
+ }
+
+ /**
+ * Clean up the candidate DB for the following reason:
+ * 1. If leader changes when installing incremental snapshot
+ * Notice: here prevents downloading the error IC from the new leader,
+ * instead, will ask for a full snapshot directly
+ * 2. Ready to download the full snapshot
+ *
+ * @param currentLeader the ID of leader node
+ */
+ private void checkLeaderConsistent(String currentLeader) {
+ String lastLeader = lastLeaderRef.get();
+ if (lastLeader != null) {
+ if (!lastLeader.equals(currentLeader)) {
+ LOG.info("Last leader for install snapshot is {}, but current leader " +
+ "is {}. ", lastLeader, currentLeader);
+ init();
+ lastLeaderRef.set(currentLeader);
+ }
+ return;
+ }
+
+ List<String> files = HAUtils.getExistingSstFiles(candidateDir);
+ if (!files.isEmpty()) {
+ LOG.warn("Candidate DB directory {} is not empty when last leader is " +
+ "null.", candidateDir);
+ init();
+ }
+ lastLeaderRef.set(currentLeader);
+ }
+
+ /**
+ * Get the snapshot file name.
+ *
+ * @param leaderNodeID the ID of leader node
+ * @return snapshot file name
+ */
+ public String getSnapshotFileName(String leaderNodeID) {
+ String snapshotTime = Long.toString(System.currentTimeMillis());
+ return dbName + "-" + leaderNodeID + "-" + snapshotTime + ".tar";
+ }
+
+ /**
+ * Untar the downloaded snapshot and convert to {@link RocksDBCheckpoint}.
+ *
+ * @param snapshot the downloaded snapshot tar file
+ * @param untarDir the directory to place the untarred files
+ * @param deleteSnapshot whether to delete the downloaded snapshot tar file
+ * @return {@link RocksDBCheckpoint}
+ * @throws IOException
+ */
+ public RocksDBCheckpoint getCheckpointFromSnapshotFile(File snapshot,
+ File untarDir, boolean deleteSnapshot) throws IOException {
+ // Untar the checkpoint file.
+ Path untarredDbDir = untarDir.toPath();
+ FileUtil.unTar(snapshot, untarredDbDir.toFile());
+
+ if (deleteSnapshot) {
+ FileUtil.fullyDelete(snapshot);
+ }
+ return new RocksDBCheckpoint(untarredDbDir);
+ }
+
+ /**
+ * The abstract method to download the snapshot.
+ * Could be implemented in HTTP, GRPC, etc.
+ *
+ * @param leaderNodeID the ID of leader node
+ * @param targetFile the snapshot file to be downloaded in
+ * @throws IOException
+ */
+ public abstract void downloadSnapshot(String leaderNodeID, File targetFile)
+ throws IOException;
+
+ /**
+ * Inject pause for test only.
+ *
+ * @throws IOException
+ */
+ private void injectPause() throws IOException {
+ if (injector != null) {
+ injector.pause();
+ }
+ }
+
+ @VisibleForTesting
+ public File getSnapshotDir() {
+ return snapshotDir;
+ }
+
+ @VisibleForTesting
+ public File getCandidateDir() {
+ return candidateDir;
+ }
+
+ @VisibleForTesting
+ public FaultInjector getInjector() {
+ return injector;
+ }
+
+ @VisibleForTesting
+ public void setInjector(FaultInjector injector) {
+ this.injector = injector;
+ }
+
+ @VisibleForTesting
+ public long getNumDownloaded() {
+ return numDownloaded.get();
+ }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
new file mode 100644
index 0000000000..3fcb53da14
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableConfig;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Statistics;
+import org.rocksdb.StatsLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.writeDBCheckpointToStream;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test Common RocksDB's snapshot provider service.
+ */
+public class TestRDBSnapshotProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestRDBSnapshotProvider.class);
+
+ private final List<String> families =
+ Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
+ "First", "Second", "Third");
+ public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
+
+ private RDBStore rdbStore = null;
+ private ManagedDBOptions options = null;
+ private Set<TableConfig> configSet;
+ private RDBSnapshotProvider rdbSnapshotProvider;
+ private File testDir;
+ private final int numUsedCF = 3;
+ private final String leaderId = "leaderNode-1";
+ private final AtomicReference<DBCheckpoint> latestCK =
+ new AtomicReference<>(null);
+
+ @BeforeEach
+ public void init(@TempDir File tempDir) throws Exception {
+ options = getNewDBOptions();
+ configSet = new HashSet<>();
+ for (String name : families) {
+ TableConfig newConfig = new TableConfig(name,
+ new ManagedColumnFamilyOptions());
+ configSet.add(newConfig);
+ }
+ testDir = tempDir;
+ rdbStore = new RDBStore(tempDir, options, configSet,
+ MAX_DB_UPDATES_SIZE_THRESHOLD);
+ rdbSnapshotProvider = new RDBSnapshotProvider(testDir, "test.db") {
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void downloadSnapshot(String leaderNodeID, File targetFile)
+ throws IOException {
+ for (int i = 0; i < 10; i++) {
+ insertDataToDB(numUsedCF);
+ }
+ DBCheckpoint dbCheckpoint = rdbStore.getCheckpoint(true);
+ latestCK.set(dbCheckpoint);
+ File[] files = dbCheckpoint.getCheckpointLocation().toFile().
+ listFiles();
+ assertNotNull(files);
+ LOG.info("Db files: {}", Arrays.stream(files)
+ .map(a -> "".concat(a.getName()).concat(" length: ").
+ concat(String.valueOf(a.length())))
+ .collect(Collectors.toList()));
+ try (OutputStream outputStream = new FileOutputStream(targetFile)) {
+ writeDBCheckpointToStream(dbCheckpoint, outputStream,
+ HAUtils.getExistingSstFiles(
+ rdbSnapshotProvider.getCandidateDir()), new ArrayList<>());
+ }
+ }
+ };
+ }
+
+ @AfterEach
+ public void down() throws Exception {
+ if (rdbStore != null) {
+ rdbStore.close();
+ }
+ if (testDir.exists()) {
+ FileUtil.fullyDelete(testDir);
+ }
+ }
+
+ @Test
+ public void testDownloadDBSnapshotFromLeader() throws Exception {
+ File candidateDir = rdbSnapshotProvider.getCandidateDir();
+ assertTrue(candidateDir.exists());
+
+ DBCheckpoint checkpoint;
+ int before = HAUtils.getExistingSstFiles(
+ rdbSnapshotProvider.getCandidateDir()).size();
+ assertEquals(0, before);
+
+ // Get first snapshot
+ checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
+ File checkpointDir = checkpoint.getCheckpointLocation().toFile();
+ assertEquals(candidateDir, checkpointDir);
+ int first = HAUtils.getExistingSstFiles(
+ rdbSnapshotProvider.getCandidateDir()).size();
+
+ // Get second snapshot
+ checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
+ int second = HAUtils.getExistingSstFiles(
+ rdbSnapshotProvider.getCandidateDir()).size();
+ assertTrue(second > first, "The second snapshot should" +
+ " have more SST files");
+ DBCheckpoint latestCheckpoint = latestCK.get();
+ compareDB(latestCheckpoint.getCheckpointLocation().toFile(),
+ checkpoint.getCheckpointLocation().toFile(), numUsedCF);
+
+ // Get third snapshot
+ checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
+ int third = HAUtils.getExistingSstFiles(
+ rdbSnapshotProvider.getCandidateDir()).size();
+ assertTrue(third > second, "The third snapshot should" +
+ " have more SST files");
+ compareDB(latestCK.get().getCheckpointLocation().toFile(),
+ checkpoint.getCheckpointLocation().toFile(), numUsedCF);
+
+ // Test cleanup candidateDB
+ rdbSnapshotProvider.init();
+ assertEquals(0, HAUtils.getExistingSstFiles(
+ rdbSnapshotProvider.getCandidateDir()).size());
+ }
+
+ public void compareDB(File db1, File db2, int columnFamilyUsed)
+ throws Exception {
+ try (RDBStore rdbStore1 = new RDBStore(db1, getNewDBOptions(),
+ configSet, MAX_DB_UPDATES_SIZE_THRESHOLD);
+ RDBStore rdbStore2 = new RDBStore(db2, getNewDBOptions(),
+ configSet, MAX_DB_UPDATES_SIZE_THRESHOLD)) {
+ // all entries should be same from two DB
+ for (int i = 0; i < columnFamilyUsed; i++) {
+ try (TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> iterator
+ = rdbStore1.getTable(families.get(i)).iterator()) {
+ while (iterator.hasNext()) {
+ KeyValue<byte[], byte[]> keyValue = iterator.next();
+ byte[] key = keyValue.getKey();
+ byte[] value1 = keyValue.getValue();
+ byte[] value2 = rdbStore2.getTable(families.get(i))
+ .getIfExist(key);
+ assertArrayEquals(value1, value2);
+ }
+ }
+ }
+ }
+ }
+
+ private void insertDataToDB(int columnFamilyUsed) throws IOException {
+ for (int i = 0; i < columnFamilyUsed; i++) {
+ insertRandomData(rdbStore, i);
+ }
+ }
+
+ public ManagedDBOptions getNewDBOptions() {
+ ManagedDBOptions managedOptions = new ManagedDBOptions();
+ managedOptions.setCreateIfMissing(true);
+ managedOptions.setCreateMissingColumnFamilies(true);
+
+ Statistics statistics = new Statistics();
+ statistics.setStatsLevel(StatsLevel.ALL);
+ managedOptions.setStatistics(statistics);
+ return managedOptions;
+ }
+
+ public void insertRandomData(RDBStore dbStore, int familyIndex)
+ throws IOException {
+ try (Table<byte[], byte[]> firstTable = dbStore.getTable(families.
+ get(familyIndex))) {
+ Assertions.assertNotNull(firstTable, "Table cannot be null");
+ for (int x = 0; x < 100; x++) {
+ byte[] key =
+ RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+ byte[] value =
+ RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+ firstTable.put(key, value);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
index c7ca903be0..0a9e220597 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
@@ -20,7 +20,10 @@
package org.apache.hadoop.hdds.utils.db;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
@@ -44,6 +47,8 @@ import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+
/**
* RDBStore Tests.
*/
@@ -83,9 +88,11 @@ public class TestRDBStore {
rdbStore.close();
}
}
- private void insertRandomData(RDBStore dbStore, int familyIndex)
- throws Exception {
- try (Table firstTable = dbStore.getTable(families.get(familyIndex))) {
+
+ public void insertRandomData(RDBStore dbStore, int familyIndex)
+ throws IOException {
+ try (Table<byte[], byte[]> firstTable = dbStore.getTable(families.
+ get(familyIndex))) {
Assertions.assertNotNull(firstTable, "Table cannot be null");
for (int x = 0; x < 100; x++) {
byte[] key =
@@ -94,6 +101,8 @@ public class TestRDBStore {
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
firstTable.put(key, value);
}
+ } catch (Exception e) {
+ throw new IOException(e);
}
}
@@ -359,4 +368,60 @@ public class TestRDBStore {
}
}
+ @Test
+ public void testSstConsistency() throws IOException {
+ for (int i = 0; i < 10; i++) {
+ insertRandomData(rdbStore, 0);
+ insertRandomData(rdbStore, 1);
+ insertRandomData(rdbStore, 2);
+ }
+ DBCheckpoint dbCheckpoint1 = rdbStore.getCheckpoint(true);
+
+ for (int i = 0; i < 10; i++) {
+ insertRandomData(rdbStore, 0);
+ insertRandomData(rdbStore, 1);
+ insertRandomData(rdbStore, 2);
+ }
+ DBCheckpoint dbCheckpoint2 = rdbStore.getCheckpoint(true);
+ compareSstWithSameName(dbCheckpoint1.getCheckpointLocation().toFile(),
+ dbCheckpoint2.getCheckpointLocation().toFile());
+
+ for (int i = 0; i < 10; i++) {
+ insertRandomData(rdbStore, 0);
+ insertRandomData(rdbStore, 1);
+ insertRandomData(rdbStore, 2);
+ }
+ DBCheckpoint dbCheckpoint3 = rdbStore.getCheckpoint(true);
+ compareSstWithSameName(dbCheckpoint2.getCheckpointLocation().toFile(),
+ dbCheckpoint3.getCheckpointLocation().toFile());
+ }
+
+ private void compareSstWithSameName(File checkpoint1, File checkpoint2)
+ throws IOException {
+ FilenameFilter filter = (dir, name) -> name.endsWith(ROCKSDB_SST_SUFFIX);
+ String[] files1 = checkpoint1.list(filter);
+ String[] files2 = checkpoint1.list(filter);
+ assert files1 != null;
+ assert files2 != null;
+ // Get all file names in the both checkpoints
+ List<String> result = Arrays.asList(files1);
+ result.retainAll(Arrays.asList(files2));
+
+ for (String name: result) {
+ File fileInCk1 = new File(checkpoint1.getAbsoluteFile(), name);
+ File fileInCk2 = new File(checkpoint2.getAbsoluteFile(), name);
+ long length1 = fileInCk1.length();
+ long length2 = fileInCk2.length();
+ Assertions.assertEquals(length1, length2, name);
+
+ try (InputStream fileStream1 = new FileInputStream(fileInCk1);
+ InputStream fileStream2 = new FileInputStream(fileInCk2)) {
+ byte[] content1 = new byte[fileStream1.available()];
+ byte[] content2 = new byte[fileStream2.available()];
+ fileStream1.read(content1);
+ fileStream2.read(content2);
+ Assertions.assertArrayEquals(content1, content2);
+ }
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
index 7a0becdf19..afea097198 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
@@ -28,6 +28,7 @@ import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
// TODO: define a generic interface for this
/**
@@ -66,7 +67,8 @@ public class SCMDBCheckpointProvider {
}
Instant start = Instant.now();
- HddsServerUtil.writeDBCheckpointToStream(checkpoint, stream);
+ HddsServerUtil.writeDBCheckpointToStream(checkpoint, stream,
+ new ArrayList<>(), new ArrayList<>());
Instant end = Instant.now();
long duration = Duration.between(start, end).toMillis();
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
index ee680ef854..b061c14b1f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
@@ -17,8 +17,6 @@
package org.apache.hadoop.ozone.om.helpers;
-import org.apache.commons.lang3.StringUtils;
-
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
@@ -26,13 +24,19 @@ import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMNodeInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.NodeState;
import org.apache.hadoop.hdds.NodeDetails;
+import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
@@ -160,23 +164,28 @@ public final class OMNodeDetails extends NodeDetails {
}
}
- public String getOMDBCheckpointEnpointUrl(boolean isHttpPolicy) {
- if (isHttpPolicy) {
- if (StringUtils.isNotEmpty(getHttpAddress())) {
- return "http://" + getHttpAddress() +
- OZONE_DB_CHECKPOINT_HTTP_ENDPOINT +
- "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true&" +
- OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA + "=true";
- }
- } else {
- if (StringUtils.isNotEmpty(getHttpsAddress())) {
- return "https://" + getHttpsAddress() +
- OZONE_DB_CHECKPOINT_HTTP_ENDPOINT +
- "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true&" +
- OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA + "=true";
+ public URL getOMDBCheckpointEndpointUrl(boolean isHttp, boolean flush,
+ List<String> sstList) throws IOException {
+ URL url;
+ try {
+ URIBuilder urlBuilder = new URIBuilder().
+ setScheme(isHttp ? "http" : "https").
+ setHost(isHttp ? getHttpAddress() : getHttpsAddress()).
+ setPath(OZONE_DB_CHECKPOINT_HTTP_ENDPOINT).
+ addParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA, "true").
+ addParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH,
+ flush ? "true" : "false");
+ if (sstList != null && !sstList.isEmpty()) {
+ for (String s: sstList) {
+ urlBuilder.addParameter(
+ OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST, s);
+ }
}
+ url = urlBuilder.build().toURL();
+ } catch (URISyntaxException | MalformedURLException e) {
+ throw new IOException("Could not get OM DB Checkpoint Endpoint Url", e);
}
- return null;
+ return url;
}
public String getOMPrintInfo() {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
index 8c47a197d6..28184a8ff8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
@@ -120,7 +120,7 @@ public class TestSCMDbCheckpointServlet {
Collections.emptyList(),
false);
doCallRealMethod().when(scmDbCheckpointServletMock)
- .writeDbDataToStream(any(), any(), any());
+ .writeDbDataToStream(any(), any(), any(), any(), any());
HttpServletRequest requestMock = mock(HttpServletRequest.class);
HttpServletResponse responseMock = mock(HttpServletResponse.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index 316130c2e2..637b402acd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -193,7 +194,7 @@ public class TestOMDbCheckpointServlet {
responseMock);
doCallRealMethod().when(omDbCheckpointServletMock)
- .writeDbDataToStream(any(), any(), any());
+ .writeDbDataToStream(any(), any(), any(), any(), any());
}
@Test
@@ -300,7 +301,7 @@ public class TestOMDbCheckpointServlet {
// Get the tarball.
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
- fileOutputStream);
+ fileOutputStream, new ArrayList<>(), new ArrayList<>());
}
// Untar the file into a temp folder to be examined.
@@ -380,7 +381,7 @@ public class TestOMDbCheckpointServlet {
// Get the tarball.
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
- fileOutputStream);
+ fileOutputStream, new ArrayList<>(), new ArrayList<>());
}
// Untar the file into a temp folder to be examined.
@@ -399,6 +400,49 @@ public class TestOMDbCheckpointServlet {
Assert.assertEquals(initialCheckpointSet, finalCheckpointSet);
}
+ @Test
+ public void testWriteDbDataWithToExcludeFileList()
+ throws Exception {
+ prepSnapshotData();
+
+ File dummyFile = new File(dbCheckpoint.getCheckpointLocation().toString(),
+ "dummy.sst");
+ try (OutputStreamWriter writer = new OutputStreamWriter(
+ new FileOutputStream(dummyFile), StandardCharsets.UTF_8)) {
+ writer.write("Dummy data.");
+ }
+ Assert.assertTrue(dummyFile.exists());
+ List<String> toExcludeList = new ArrayList<>();
+ List<String> excludedList = new ArrayList<>();
+ toExcludeList.add(dummyFile.getName());
+
+ // Set http param to exclude snapshot data.
+ when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA))
+ .thenReturn(null);
+
+ // Get the tarball.
+ try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
+ omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
+ fileOutputStream, toExcludeList, excludedList);
+ }
+
+ // Untar the file into a temp folder to be examined.
+ String testDirName = folder.newFolder().getAbsolutePath();
+ int testDirLength = testDirName.length() + 1;
+ FileUtil.unTar(tempFile, new File(testDirName));
+
+ // Confirm the checkpoint directories match.
+ Path checkpointLocation = dbCheckpoint.getCheckpointLocation();
+ Set<String> initialCheckpointSet = getFiles(checkpointLocation,
+ checkpointLocation.toString().length() + 1);
+ Path finalCheckpointLocation = Paths.get(testDirName);
+ Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation,
+ testDirLength);
+
+ initialCheckpointSet.removeAll(finalCheckpointSet);
+ Assert.assertTrue(initialCheckpointSet.contains(dummyFile.getName()));
+ }
+
private void prepSnapshotData() throws Exception {
setupCluster();
metaDir = OMStorage.getOmDbDir(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index dd124be723..1850ba0b5a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -20,6 +20,9 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.ExitManager;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.DBCheckpointMetrics;
+import org.apache.hadoop.hdds.utils.FaultInjector;
+import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -61,8 +64,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -73,7 +78,9 @@ import java.util.stream.Stream;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* Tests the Ratis snapshots feature in OM.
@@ -305,6 +312,236 @@ public class TestOMRatisSnapshots {
Assertions.assertTrue(hardLinkCount > 0, "No hard links were found");
}
+ @Test
+ @Timeout(300)
+ public void testInstallIncrementalSnapshot() throws Exception {
+ // Get the leader OM
+ String leaderOMNodeId = OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy())
+ .getCurrentProxyOMNodeId();
+
+ OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+ OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+ // Find the inactive OM
+ String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+ if (cluster.isOMActive(followerNodeId)) {
+ followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+ }
+ OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+ // Set fault injector to pause before install
+ FaultInjector faultInjector = new SnapshotPauseInjector();
+ followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
+ // Do some transactions so that the log index increases
+ List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+ 80);
+
+ // Start the inactive OM. Checkpoint installation will happen spontaneously.
+ cluster.startInactiveOM(followerNodeId);
+
+ // Wait the follower download the snapshot,but get stuck by injector
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.getOmSnapshotProvider().getNumDownloaded() == 1;
+ }, 1000, 10000);
+
+ // Do some transactions, let leader OM take a new snapshot and purge the
+ // old logs, so that follower must download the new snapshot again.
+ List<String> secondKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+ 160);
+
+ // Resume the follower thread, it would download the incremental snapshot.
+ faultInjector.resume();
+
+ // Get the latest db checkpoint from the leader OM.
+ TransactionInfo transactionInfo =
+ TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+ TermIndex leaderOMTermIndex =
+ TermIndex.valueOf(transactionInfo.getTerm(),
+ transactionInfo.getTransactionIndex());
+ long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+ // The recently started OM should be lagging behind the leader OM.
+ // Wait & for follower to update transactions to leader snapshot index.
+ // Timeout error if follower does not load update within 10s
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+ >= leaderOMSnapshotIndex - 1;
+ }, 1000, 10000);
+
+ assertEquals(2, followerOM.getOmSnapshotProvider().getNumDownloaded());
+
+ // Verify that the follower OM's DB contains the transactions which were
+ // made while it was inactive.
+ OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+ assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+ followerOMMetaMngr.getVolumeKey(volumeName)));
+ assertNotNull(followerOMMetaMngr.getBucketTable().get(
+ followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+
+ for (String key : firstKeys) {
+ assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+ .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+ }
+ for (String key : secondKeys) {
+ assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+ .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+ }
+
+ // Verify the metrics recording the incremental checkpoint at leader side
+ DBCheckpointMetrics dbMetrics = leaderOM.getMetrics().
+ getDBCheckpointMetrics();
+ Assertions.assertTrue(
+ dbMetrics.getLastCheckpointStreamingNumSSTExcluded() > 0);
+ assertEquals(1, dbMetrics.getNumIncrementalCheckpoints());
+
+ // Verify RPC server is running
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.isOmRpcServerRunning();
+ }, 100, 5000);
+
+ // Read & Write after snapshot installed.
+ List<String> newKeys = writeKeys(1);
+ readKeys(newKeys);
+ assertNotNull(followerOMMetaMngr.getKeyTable(
+ TEST_BUCKET_LAYOUT).get(followerOMMetaMngr.getOzoneKey(
+ volumeName, bucketName, newKeys.get(0))));
+
+ // Verify follower candidate directory get cleaned
+ String[] filesInCandidate = followerOM.getOmSnapshotProvider().
+ getCandidateDir().list();
+ assertNotNull(filesInCandidate);
+ assertEquals(0, filesInCandidate.length);
+ }
+
+ @Test
+ @Timeout(300)
+ public void testInstallIncrementalSnapshotWithFailure() throws Exception {
+ // Get the leader OM
+ String leaderOMNodeId = OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy())
+ .getCurrentProxyOMNodeId();
+
+ OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+ OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+ // Find the inactive OM
+ String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+ if (cluster.isOMActive(followerNodeId)) {
+ followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+ }
+ OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+ // Set fault injector to pause before install
+ FaultInjector faultInjector = new SnapshotPauseInjector();
+ followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
+ // Do some transactions so that the log index increases
+ List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+ 80);
+
+ // Start the inactive OM. Checkpoint installation will happen spontaneously.
+ cluster.startInactiveOM(followerNodeId);
+
+ // Wait the follower download the snapshot,but get stuck by injector
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.getOmSnapshotProvider().getNumDownloaded() == 1;
+ }, 1000, 10000);
+
+ // Do some transactions, let leader OM take a new snapshot and purge the
+ // old logs, so that follower must download the new snapshot again.
+ List<String> secondKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+ 160);
+
+ // Resume the follower thread, it would download the incremental snapshot.
+ faultInjector.resume();
+
+ // Pause the follower thread again to block the second-time install
+ faultInjector.reset();
+
+ // Wait the follower download the incremental snapshot, but get stuck
+ // by injector
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.getOmSnapshotProvider().getNumDownloaded() == 2;
+ }, 1000, 10000);
+
+ // Corrupt the mixed checkpoint in the candidate DB dir
+ File followerCandidateDir = followerOM.getOmSnapshotProvider().
+ getCandidateDir();
+ List<String> sstList = HAUtils.getExistingSstFiles(followerCandidateDir);
+ Assertions.assertTrue(sstList.size() > 0);
+ Collections.shuffle(sstList);
+ List<String> victimSstList = sstList.subList(0, sstList.size() / 3);
+ for (String sst: victimSstList) {
+ File victimSst = new File(followerCandidateDir, sst);
+ Assertions.assertTrue(victimSst.delete());
+ }
+
+ // Resume the follower thread, it would download the full snapshot again
+ // as the installation will fail for the corruption detected.
+ faultInjector.resume();
+
+ // Get the latest db checkpoint from the leader OM.
+ TransactionInfo transactionInfo =
+ TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+ TermIndex leaderOMTermIndex =
+ TermIndex.valueOf(transactionInfo.getTerm(),
+ transactionInfo.getTransactionIndex());
+ long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+ // Wait & for follower to update transactions to leader snapshot index.
+ // Timeout error if follower does not load update within 10s
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+ >= leaderOMSnapshotIndex - 1;
+ }, 1000, 10000);
+
+ // Verify that the follower OM's DB contains the transactions which were
+ // made while it was inactive.
+ OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+ assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+ followerOMMetaMngr.getVolumeKey(volumeName)));
+ assertNotNull(followerOMMetaMngr.getBucketTable().get(
+ followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+
+ // Verify that the follower OM's DB contains the transactions which were
+ // made while it was inactive.
+ for (String key : firstKeys) {
+ assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+ .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+ }
+ for (String key : secondKeys) {
+ assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+ .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+ }
+
+ // Verify the metrics
+ DBCheckpointMetrics dbMetrics = leaderOM.getMetrics().
+ getDBCheckpointMetrics();
+ assertEquals(0, dbMetrics.getLastCheckpointStreamingNumSSTExcluded());
+ assertTrue(dbMetrics.getNumIncrementalCheckpoints() >= 1);
+ assertTrue(dbMetrics.getNumCheckpoints() >= 3);
+
+ // Verify RPC server is running
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.isOmRpcServerRunning();
+ }, 100, 5000);
+
+ // Read & Write after snapshot installed.
+ List<String> newKeys = writeKeys(1);
+ readKeys(newKeys);
+ assertNotNull(followerOMMetaMngr.getKeyTable(
+ TEST_BUCKET_LAYOUT).get(followerOMMetaMngr.getOzoneKey(
+ volumeName, bucketName, newKeys.get(0))));
+
+ // Verify follower candidate directory get cleaned
+ String[] filesInCandidate = followerOM.getOmSnapshotProvider().
+ getCandidateDir().list();
+ assertNotNull(filesInCandidate);
+ assertEquals(0, filesInCandidate.length);
+ }
+
@Ignore("Enable this unit test after RATIS-1481 used")
public void testInstallSnapshotWithClientWrite() throws Exception {
// Get the leader OM
@@ -642,8 +879,7 @@ public class TestOMRatisSnapshots {
return keys;
}
- private List<String> writeKeys(long keyCount) throws IOException,
- InterruptedException {
+ private List<String> writeKeys(long keyCount) throws IOException {
List<String> keys = new ArrayList<>();
long index = 0;
while (index < keyCount) {
@@ -687,4 +923,39 @@ public class TestOMRatisSnapshots {
log.error("System Exit: " + message, throwable);
}
}
+
+ private static class SnapshotPauseInjector extends FaultInjector {
+ private CountDownLatch ready;
+ private CountDownLatch wait;
+
+ SnapshotPauseInjector() {
+ init();
+ }
+
+ @Override
+ public void init() {
+ this.ready = new CountDownLatch(1);
+ this.wait = new CountDownLatch(1);
+ }
+
+ @Override
+ public void pause() throws IOException {
+ ready.countDown();
+ try {
+ wait.await();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void resume() throws IOException {
+ wait.countDown();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+ }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
index 32146a8269..6b39c5d0fc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
@@ -123,7 +123,7 @@ public class TestOzoneManagerSnapshotProvider {
// Download latest checkpoint from leader OM to follower OM
DBCheckpoint omSnapshot = followerOM.getOmSnapshotProvider()
- .getOzoneManagerDBSnapshot(leaderOMNodeId);
+ .downloadDBSnapshotFromLeader(leaderOMNodeId);
long leaderSnapshotIndex = leaderOM.getRatisSnapshotIndex();
long downloadedSnapshotIndex = getDownloadedSnapshotIndex(omSnapshot);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 0f8e8b7edb..15423a16da 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -47,7 +47,9 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -115,8 +117,13 @@ public class OMDBCheckpointServlet extends DBCheckpointServlet {
@Override
public void writeDbDataToStream(DBCheckpoint checkpoint,
HttpServletRequest request,
- OutputStream destination)
+ OutputStream destination,
+ List<String> toExcludeList,
+ List<String> excludedList)
throws IOException, InterruptedException {
+ Objects.requireNonNull(toExcludeList);
+ Objects.requireNonNull(excludedList);
+
// Map of inodes to path.
Map<Object, Path> copyFiles = new HashMap<>();
// Map of link to path.
@@ -125,13 +132,24 @@ public class OMDBCheckpointServlet extends DBCheckpointServlet {
getFilesForArchive(checkpoint, copyFiles, hardLinkFiles,
includeSnapshotData(request));
+ // Exclude file
+ Map<Object, Path> finalCopyFiles = new HashMap<>();
+ copyFiles.forEach((o, path) -> {
+ String fName = path.getFileName().toString();
+ if (!toExcludeList.contains(fName)) {
+ finalCopyFiles.put(o, path);
+ } else {
+ excludedList.add(fName);
+ }
+ });
+
try (TarArchiveOutputStream archiveOutputStream =
new TarArchiveOutputStream(destination)) {
archiveOutputStream
.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
archiveOutputStream
.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
- writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+ writeFilesToArchive(finalCopyFiles, hardLinkFiles, archiveOutputStream);
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index bc4162177f..14a7a0c815 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -2199,7 +2200,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
jvmPauseMonitor.stop();
}
if (omRatisSnapshotProvider != null) {
- omRatisSnapshotProvider.stop();
+ omRatisSnapshotProvider.close();
}
OMPerformanceMetrics.unregister();
RatisDropwizardExports.clear(ratisMetricsMap, ratisReporterList);
@@ -3536,12 +3537,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
return null;
}
- DBCheckpoint omDBCheckpoint = getDBCheckpointFromLeader(leaderId);
- if (omDBCheckpoint == null) {
+ DBCheckpoint omDBCheckpoint;
+ try {
+ omDBCheckpoint = omRatisSnapshotProvider.
+ downloadDBSnapshotFromLeader(leaderId);
+ } catch (IOException ex) {
+ LOG.error("Failed to download snapshot from Leader {}.", leaderId, ex);
return null;
}
- LOG.info("Downloaded checkpoint from Leader {} to the location {}",
- leaderId, omDBCheckpoint.getCheckpointLocation());
TermIndex termIndex = null;
try {
@@ -3553,7 +3556,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
/**
- * Install checkpoint. If the checkpoints snapshot index is greater than
+ * Install checkpoint. If the checkpoint snapshot index is greater than
* OM's last applied transaction index, then re-initialize the OM
* state via this checkpoint. Before re-initializing OM state, the OM Ratis
* server should be stopped so that no new transactions can be applied.
@@ -3633,8 +3636,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
try {
time = Time.monotonicNow();
- dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, oldDBLocation,
- checkpointLocation);
+ dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex,
+ oldDBLocation, checkpointLocation);
term = checkpointTrxnInfo.getTerm();
lastAppliedIndex = checkpointTrxnInfo.getTransactionIndex();
LOG.info("Replaced DB with checkpoint from OM: {}, term: {}, " +
@@ -3642,7 +3645,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
Time.monotonicNow() - time);
} catch (Exception e) {
LOG.error("Failed to install Snapshot from {} as OM failed to replace" +
- " DB with downloaded checkpoint. Reloading old OM state.", e);
+ " DB with downloaded checkpoint. Reloading old OM state.",
+ leaderId, e);
}
} else {
LOG.warn("Cannot proceed with InstallSnapshot as OM is at TermIndex {} " +
@@ -3701,7 +3705,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
FileUtils.deleteFully(dbBackup);
}
} catch (Exception e) {
- LOG.error("Failed to delete the backup of the original DB {}", dbBackup);
+ LOG.error("Failed to delete the backup of the original DB {}",
+ dbBackup, e);
}
if (lastAppliedIndex != checkpointTrxnInfo.getTransactionIndex()) {
@@ -3719,25 +3724,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
return newTermIndex;
}
-
- /**
- * Download the latest OM DB checkpoint from the leader OM.
- *
- * @param leaderId OMNodeID of the leader OM node.
- * @return latest DB checkpoint from leader OM.
- */
- private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
- LOG.info("Downloading checkpoint from leader OM {} and reloading state " +
- "from the checkpoint.", leaderId);
-
- try {
- return omRatisSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
- } catch (IOException e) {
- LOG.error("Failed to download checkpoint from OM leader {}", leaderId, e);
- }
- return null;
- }
-
private void stopTrashEmptier() {
if (this.emptier != null) {
emptier.interrupt();
@@ -3793,7 +3779,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
// an inconsistent state and this marker file will fail OM from
// starting up.
Files.createFile(markerFile);
- FileUtils.moveDirectory(checkpointPath, oldDB.toPath());
+ // Copy the candidate DB to real DB
+ org.apache.commons.io.FileUtils.copyDirectory(checkpointPath.toFile(),
+ oldDB);
moveOmSnapshotData(oldDB.toPath(), dbSnapshotsDir.toPath());
Files.deleteIfExists(markerFile);
} catch (IOException e) {
@@ -3801,6 +3789,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
"directory {}. Resetting to original DB.", checkpointPath,
oldDB.toPath());
try {
+ FileUtil.fullyDelete(oldDB);
Files.move(dbBackup.toPath(), oldDB.toPath());
if (dbSnapshotsBackup.exists()) {
Files.move(dbSnapshotsBackup.toPath(), dbSnapshotsDir.toPath());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index a7775f5bc0..12343524c4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -55,6 +55,7 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
@@ -208,6 +209,31 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
ozoneManager.updatePeerList(newPeerIds);
}
+ /**
+ * Called to notify state machine about the snapshot install result.
+ * Trigger the cleanup of candidate DB dir.
+ * @param result InstallSnapshotResult
+ * @param snapshotIndex the index of installed snapshot
+ * @param peer the peer which fini
+ */
+ @Override
+ public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result,
+ long snapshotIndex, RaftPeer peer) {
+ LOG.info("Receive notifySnapshotInstalled event {} for the peer: {}" +
+ " snapshotIndex: {}.", result, peer.getId(), snapshotIndex);
+ switch (result) {
+ case SUCCESS:
+ case SNAPSHOT_UNAVAILABLE:
+ // Currently, only trigger for the one who installed snapshot
+ if (ozoneManager.getOmRatisServer().getServer().getPeer().equals(peer)) {
+ ozoneManager.getOmSnapshotProvider().init();
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
/**
* Validate/pre-process the incoming update request in the state machine.
* @return the content to be written to the log entry. Null means the request
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
index e5b2fdf963..b924a2294f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
@@ -23,18 +23,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
import org.apache.hadoop.hdds.server.http.HttpConfig;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.hdds.utils.RDBSnapshotProvider;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
-import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
-import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
@@ -71,24 +68,21 @@ import org.slf4j.LoggerFactory;
* bootstrap. The follower needs these copies to respond the users
* snapshot requests when it becomes the leader.
*/
-public class OmRatisSnapshotProvider {
+public class OmRatisSnapshotProvider extends RDBSnapshotProvider {
private static final Logger LOG =
LoggerFactory.getLogger(OmRatisSnapshotProvider.class);
- private final File omSnapshotDir;
- private Map<String, OMNodeDetails> peerNodesMap;
+ private final Map<String, OMNodeDetails> peerNodesMap;
private final HttpConfig.Policy httpPolicy;
private final boolean spnegoEnabled;
private final URLConnectionFactory connectionFactory;
public OmRatisSnapshotProvider(MutableConfigurationSource conf,
File omRatisSnapshotDir, Map<String, OMNodeDetails> peerNodeDetails) {
-
+ super(omRatisSnapshotDir, OM_DB_NAME);
LOG.info("Initializing OM Snapshot Provider");
- this.omSnapshotDir = omRatisSnapshotDir;
-
- this.peerNodesMap = new HashMap<>();
+ this.peerNodesMap = new ConcurrentHashMap<>();
peerNodesMap.putAll(peerNodeDetails);
this.httpPolicy = HttpConfig.getHttpPolicy(conf);
@@ -115,78 +109,60 @@ public class OmRatisSnapshotProvider {
}
/**
- * Download the latest checkpoint from OM Leader via HTTP.
- * @param leaderOMNodeID leader OM Node ID.
- * @return the DB checkpoint (including the ratis snapshot index)
+ * When a new OM is bootstrapped, add it to the peerNode map.
*/
- public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
- throws IOException {
- String snapshotTime = Long.toString(System.currentTimeMillis());
- String snapshotFileName = OM_DB_NAME + "-" + leaderOMNodeID
- + "-" + snapshotTime;
- String snapshotFilePath = Paths.get(omSnapshotDir.getAbsolutePath(),
- snapshotFileName).toFile().getAbsolutePath();
- File targetFile = new File(snapshotFilePath + ".tar");
+ public void addNewPeerNode(OMNodeDetails newOMNode) {
+ peerNodesMap.put(newOMNode.getNodeId(), newOMNode);
+ }
- String omCheckpointUrl = peerNodesMap.get(leaderOMNodeID)
- .getOMDBCheckpointEnpointUrl(httpPolicy.isHttpEnabled());
+ /**
+ * When an OM is decommissioned, remove it from the peerNode map.
+ */
+ public void removeDecommissionedPeerNode(String decommNodeId) {
+ peerNodesMap.remove(decommNodeId);
+ }
+ @Override
+ public void downloadSnapshot(String leaderNodeID, File targetFile)
+ throws IOException {
+ OMNodeDetails leader = peerNodesMap.get(leaderNodeID);
+ URL omCheckpointUrl = leader.getOMDBCheckpointEndpointUrl(
+ httpPolicy.isHttpEnabled(), true,
+ HAUtils.getExistingSstFiles(getCandidateDir()));
LOG.info("Downloading latest checkpoint from Leader OM {}. Checkpoint " +
- "URL: {}", leaderOMNodeID, omCheckpointUrl);
+ "URL: {}", leaderNodeID, omCheckpointUrl);
SecurityUtil.doAsCurrentUser(() -> {
- HttpURLConnection httpURLConnection = (HttpURLConnection)
- connectionFactory.openConnection(new URL(omCheckpointUrl),
- spnegoEnabled);
- httpURLConnection.connect();
- int errorCode = httpURLConnection.getResponseCode();
+ HttpURLConnection connection = (HttpURLConnection)
+ connectionFactory.openConnection(omCheckpointUrl, spnegoEnabled);
+ connection.setRequestMethod("GET");
+ connection.connect();
+ int errorCode = connection.getResponseCode();
if ((errorCode != HTTP_OK) && (errorCode != HTTP_CREATED)) {
throw new IOException("Unexpected exception when trying to reach " +
"OM to download latest checkpoint. Checkpoint URL: " +
omCheckpointUrl + ". ErrorCode: " + errorCode);
}
- try (InputStream inputStream = httpURLConnection.getInputStream()) {
+ try (InputStream inputStream = connection.getInputStream()) {
FileUtils.copyInputStreamToFile(inputStream, targetFile);
} catch (IOException ex) {
- LOG.error("OM snapshot {} cannot be downloaded.", targetFile, ex);
boolean deleted = FileUtils.deleteQuietly(targetFile);
if (!deleted) {
LOG.error("OM snapshot which failed to download {} cannot be deleted",
targetFile);
}
+ throw ex;
+ } finally {
+ connection.disconnect();
}
return null;
});
-
- // Untar the checkpoint file.
- Path untarredDbDir = Paths.get(snapshotFilePath);
- FileUtil.unTar(targetFile, untarredDbDir.toFile());
- FileUtils.deleteQuietly(targetFile);
-
- LOG.info("Successfully downloaded latest checkpoint from leader OM: {}",
- leaderOMNodeID);
-
- RocksDBCheckpoint omCheckpoint = new RocksDBCheckpoint(untarredDbDir);
- return omCheckpoint;
}
- public void stop() {
+ @Override
+ public void close() throws IOException {
if (connectionFactory != null) {
connectionFactory.destroy();
}
}
-
- /**
- * When a new OM is bootstrapped, add it to the peerNode map.
- */
- public void addNewPeerNode(OMNodeDetails newOMNode) {
- peerNodesMap.put(newOMNode.getNodeId(), newOMNode);
- }
-
- /**
- * When an OM is decommissioned, remove it from the peerNode map.
- */
- public void removeDecommissionedPeerNode(String decommNodeId) {
- peerNodesMap.remove(decommNodeId);
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org