You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/05/16 09:42:44 UTC
[ozone] branch HDDS-3630 updated: HDDS-6544. [Merge rocksdb in datanode] New container replication operations for schema v3 container. (#3398)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-3630
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3630 by this push:
new 4a94b609fa HDDS-6544. [Merge rocksdb in datanode] New container replication operations for schema v3 container. (#3398)
4a94b609fa is described below
commit 4a94b609fa08eb67eea4286b090dfc2711f9fbe5
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Mon May 16 17:42:38 2022 +0800
HDDS-6544. [Merge rocksdb in datanode] New container replication operations for schema v3 container. (#3398)
---
.../container/keyvalue/KeyValueContainer.java | 39 ++++++++--
.../container/keyvalue/TarContainerPacker.java | 15 +++-
.../container/keyvalue/helpers/BlockUtils.java | 85 ++++++++++++++++++++
.../metadata/DatanodeStoreSchemaThreeImpl.java | 39 ++++++++++
.../ozone/container/metadata/DatanodeTable.java | 12 +++
.../container/keyvalue/TestKeyValueContainer.java | 7 --
.../container/keyvalue/TestTarContainerPacker.java | 77 +++++++-----------
.../hadoop/hdds/utils/db/DumpFileLoader.java | 32 ++++++++
.../hadoop/hdds/utils/db/DumpFileWriter.java | 46 +++++++++++
.../hadoop/hdds/utils/db/RDBSstFileLoader.java | 65 ++++++++++++++++
.../hadoop/hdds/utils/db/RDBSstFileWriter.java | 86 ++++++++++++++++++++
.../org/apache/hadoop/hdds/utils/db/RDBTable.java | 20 +++++
.../org/apache/hadoop/hdds/utils/db/Table.java | 17 ++++
.../apache/hadoop/hdds/utils/db/TypedTable.java | 13 ++++
.../hadoop/hdds/utils/db/TestRDBTableStore.java | 91 +++++++++++++++++++++-
.../src/main/proto/DatanodeClientProtocol.proto | 2 +
16 files changed, 584 insertions(+), 62 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 6e47cb9f45..742e1018c3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -88,6 +88,8 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
// Use a non-fair RW lock for better throughput, we may revisit this decision
// if this causes fairness issues.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ // Simple lock to synchronize container metadata dump operation.
+ private final Object dumpLock = new Object();
private final KeyValueContainerData containerData;
private ConfigurationSource config;
@@ -534,6 +536,11 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
//rewriting the yaml file with new checksum calculation.
update(originalContainerData.getMetadata(), true);
+ if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+ // load metadata from received dump files before we try to parse kv
+ BlockUtils.loadKVContainerDataFromFiles(containerData, config);
+ }
+
//fill in memory stat counter (keycount, byte usage)
KeyValueContainerUtil.parseKVContainerData(containerData, config);
@@ -545,6 +552,10 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
}
//delete all the temporary data in case of any exception.
try {
+ if (containerData.getSchemaVersion() != null &&
+ containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+ BlockUtils.removeContainerFromDB(containerData, config);
+ }
FileUtils.deleteDirectory(new File(containerData.getMetadataPath()));
FileUtils.deleteDirectory(new File(containerData.getChunksPath()));
FileUtils.deleteDirectory(
@@ -579,16 +590,18 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
}
try {
- compactDB();
- // Close DB (and remove from cache) to avoid concurrent modification
- // while packing it.
- BlockUtils.removeDB(containerData, config);
+ if (!containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+ compactDB();
+ // Close DB (and remove from cache) to avoid concurrent modification
+ // while packing it.
+ BlockUtils.removeDB(containerData, config);
+ }
} finally {
readLock();
writeUnlock();
}
- packer.pack(this, destination);
+ packContainerToDestination(destination, packer);
} finally {
if (lock.isWriteLockedByCurrentThread()) {
writeUnlock();
@@ -850,4 +863,20 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
file.getName(), file.getParentFile());
}
+ private void packContainerToDestination(OutputStream destination,
+ ContainerPacker<KeyValueContainerData> packer)
+ throws IOException {
+ if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+ // Synchronize the dump and pack operation,
+ // so concurrent exports don't get dump files overwritten.
+ // We seldom got concurrent exports for a container,
+ // so it should not influence performance much.
+ synchronized (dumpLock) {
+ BlockUtils.dumpKVContainerDataToFiles(containerData, config);
+ packer.pack(this, destination);
+ }
+ } else {
+ packer.pack(this, destination);
+ }
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index 5b9d2f7aab..e555c10814 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -43,8 +43,10 @@ import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
/**
* Compress/uncompress KeyValueContainer data to a tar.gz archive.
@@ -71,7 +73,7 @@ public class TarContainerPacker
throws IOException {
byte[] descriptorFileContent = null;
KeyValueContainerData containerData = container.getContainerData();
- Path dbRoot = containerData.getDbFile().toPath();
+ Path dbRoot = getDbPath(containerData);
Path chunksRoot = Paths.get(containerData.getChunksPath());
try (InputStream decompressed = decompress(input);
@@ -159,7 +161,7 @@ public class TarContainerPacker
try (OutputStream compressed = compress(output);
ArchiveOutputStream archiveOutput = tar(compressed)) {
- includePath(containerData.getDbFile().toPath(), DB_DIR_NAME,
+ includePath(getDbPath(containerData), DB_DIR_NAME,
archiveOutput);
includePath(Paths.get(containerData.getChunksPath()), CHUNKS_DIR_NAME,
@@ -198,6 +200,15 @@ public class TarContainerPacker
"Container descriptor is missing from the container archive.");
}
+ public static Path getDbPath(KeyValueContainerData containerData) {
+ if (containerData.getSchemaVersion().equals(SCHEMA_V3)) {
+ return DatanodeStoreSchemaThreeImpl.getDumpDir(
+ new File(containerData.getMetadataPath())).toPath();
+ } else {
+ return containerData.getDbFile().toPath();
+ }
+ }
+
private byte[] readEntry(InputStream input, final long size)
throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
index 7f908a8302..73881f3a99 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.ozone.container.keyvalue.helpers;
+import java.io.File;
import java.io.IOException;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -40,6 +42,8 @@ import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaOneImpl;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.EXPORT_CONTAINER_METADATA_FAILED;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IMPORT_CONTAINER_METADATA_FAILED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_READ_METADATA_DB;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
@@ -249,4 +253,85 @@ public final class BlockUtils {
containerData.getContainerID());
}
}
+
+ /**
+ * Dump container KV metadata to external files.
+ * @param containerData
+ * @param conf
+ * @throws StorageContainerException
+ */
+ public static void dumpKVContainerDataToFiles(
+ KeyValueContainerData containerData,
+ ConfigurationSource conf) throws IOException {
+ try (DBHandle db = getDB(containerData, conf)) {
+ Preconditions.checkState(db.getStore()
+ instanceof DatanodeStoreSchemaThreeImpl);
+
+ DatanodeStoreSchemaThreeImpl store = (DatanodeStoreSchemaThreeImpl)
+ db.getStore();
+ long containerID = containerData.getContainerID();
+ File metaDir = new File(containerData.getMetadataPath());
+ File dumpDir = DatanodeStoreSchemaThreeImpl.getDumpDir(metaDir);
+ // cleanup old files first
+ deleteAllDumpFiles(dumpDir);
+
+ try {
+ if (!dumpDir.mkdirs() && !dumpDir.exists()) {
+ throw new IOException("Failed to create dir "
+ + dumpDir.getAbsolutePath() + " for container " + containerID +
+ " to dump metadata to files");
+ }
+ store.dumpKVContainerData(containerID, dumpDir);
+ } catch (IOException e) {
+ // cleanup partially dumped files
+ deleteAllDumpFiles(dumpDir);
+ throw new StorageContainerException("Failed to dump metadata" +
+ " for container " + containerID, e,
+ EXPORT_CONTAINER_METADATA_FAILED);
+ }
+ }
+ }
+
+ /**
+ * Load container KV metadata from external files.
+ * @param containerData
+ * @param conf
+ * @throws StorageContainerException
+ */
+ public static void loadKVContainerDataFromFiles(
+ KeyValueContainerData containerData,
+ ConfigurationSource conf) throws IOException {
+ try (DBHandle db = getDB(containerData, conf)) {
+ Preconditions.checkState(db.getStore()
+ instanceof DatanodeStoreSchemaThreeImpl);
+
+ DatanodeStoreSchemaThreeImpl store = (DatanodeStoreSchemaThreeImpl)
+ db.getStore();
+ long containerID = containerData.getContainerID();
+ File metaDir = new File(containerData.getMetadataPath());
+ File dumpDir = DatanodeStoreSchemaThreeImpl.getDumpDir(metaDir);
+ try {
+ store.loadKVContainerData(dumpDir);
+ } catch (IOException e) {
+ // Don't delete unloaded or partially loaded files on failure,
+ // but delete all partially loaded metadata.
+ store.removeKVContainerData(containerID);
+ throw new StorageContainerException("Failed to load metadata " +
+ "from files for container " + containerID, e,
+ IMPORT_CONTAINER_METADATA_FAILED);
+ } finally {
+ // cleanup already loaded files all together
+ deleteAllDumpFiles(dumpDir);
+ }
+ }
+ }
+
+ public static void deleteAllDumpFiles(File dumpDir) throws IOException {
+ try {
+ FileUtils.deleteDirectory(dumpDir);
+ } catch (IOException e) {
+ throw new IOException("Failed to delete dump files under "
+ + dumpDir.getAbsolutePath(), e);
+ }
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
index 8b1e9e0b20..949bcc6d21 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
+import java.io.File;
import java.io.IOException;
import static org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition.getContainerKeyPrefix;
@@ -43,6 +44,9 @@ import static org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDe
public class DatanodeStoreSchemaThreeImpl extends AbstractDatanodeStore
implements DeleteTransactionStore<String> {
+ public static final String DUMP_FILE_SUFFIX = ".data";
+ public static final String DUMP_DIR = "db";
+
private final Table<String, DeletedBlocksTransaction> deleteTransactionTable;
public DatanodeStoreSchemaThreeImpl(ConfigurationSource config,
@@ -88,4 +92,39 @@ public class DatanodeStoreSchemaThreeImpl extends AbstractDatanodeStore
getBatchHandler().commitBatchOperation(batch);
}
}
+
+ public void dumpKVContainerData(long containerID, File dumpDir)
+ throws IOException {
+ String prefix = getContainerKeyPrefix(containerID);
+ getMetadataTable().dumpToFileWithPrefix(
+ getTableDumpFile(getMetadataTable(), dumpDir), prefix);
+ getBlockDataTable().dumpToFileWithPrefix(
+ getTableDumpFile(getBlockDataTable(), dumpDir), prefix);
+ getDeletedBlocksTable().dumpToFileWithPrefix(
+ getTableDumpFile(getDeletedBlocksTable(), dumpDir), prefix);
+ getDeleteTransactionTable().dumpToFileWithPrefix(
+ getTableDumpFile(getDeleteTransactionTable(), dumpDir),
+ prefix);
+ }
+
+ public void loadKVContainerData(File dumpDir)
+ throws IOException {
+ getMetadataTable().loadFromFile(
+ getTableDumpFile(getMetadataTable(), dumpDir));
+ getBlockDataTable().loadFromFile(
+ getTableDumpFile(getBlockDataTable(), dumpDir));
+ getDeletedBlocksTable().loadFromFile(
+ getTableDumpFile(getDeletedBlocksTable(), dumpDir));
+ getDeleteTransactionTable().loadFromFile(
+ getTableDumpFile(getDeleteTransactionTable(), dumpDir));
+ }
+
+ public static File getTableDumpFile(Table<String, ?> table,
+ File dumpDir) throws IOException {
+ return new File(dumpDir, table.getName() + DUMP_FILE_SUFFIX);
+ }
+
+ public static File getDumpDir(File metaDir) {
+ return new File(metaDir, DUMP_DIR);
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
index 6254e1ecf7..1d35fab9d5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.TableIterator;
+import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -137,6 +138,17 @@ public class DatanodeTable<KEY, VALUE> implements Table<KEY, VALUE> {
table.deleteBatchWithPrefix(batch, prefix);
}
+ @Override
+ public void dumpToFileWithPrefix(File externalFile, KEY prefix)
+ throws IOException {
+ table.dumpToFileWithPrefix(externalFile, prefix);
+ }
+
+ @Override
+ public void loadFromFile(File externalFile) throws IOException {
+ table.loadFromFile(externalFile);
+ }
+
@Override
public void close() throws Exception {
table.close();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 669401896b..594c873339 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -85,7 +85,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
@@ -185,8 +184,6 @@ public class TestKeyValueContainer {
@Test
public void testEmptyContainerImportExport() throws Exception {
- assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
-
createContainer();
closeContainer();
@@ -216,8 +213,6 @@ public class TestKeyValueContainer {
@Test
public void testContainerImportExport() throws Exception {
- assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
-
long containerId = keyValueContainer.getContainerData().getContainerID();
createContainer();
long numberOfKeysToWrite = 12;
@@ -361,8 +356,6 @@ public class TestKeyValueContainer {
@Test
public void concurrentExport() throws Exception {
- assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
-
createContainer();
populate(100);
closeContainer();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index c4fe8d9574..c15841b001 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -36,7 +36,6 @@ import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
@@ -57,7 +56,6 @@ import org.junit.runners.Parameterized;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
-import static org.junit.Assume.assumeFalse;
/**
* Test the tar/untar for a given container.
@@ -132,14 +130,17 @@ public class TestTarContainerPacker {
Path containerDir = dir.resolve("container" + id);
Path dbDir = containerDir.resolve("db");
Path dataDir = containerDir.resolve("data");
+ Path metaDir = containerDir.resolve("metadata");
+ Files.createDirectories(metaDir);
Files.createDirectories(dbDir);
Files.createDirectories(dataDir);
KeyValueContainerData containerData = new KeyValueContainerData(
id, layout,
-1, UUID.randomUUID().toString(), UUID.randomUUID().toString());
+ containerData.setSchemaVersion(schemaVersion);
containerData.setChunksPath(dataDir.toString());
- containerData.setMetadataPath(dbDir.getParent().toString());
+ containerData.setMetadataPath(metaDir.toString());
containerData.setDbFile(dbDir.toFile());
return containerData;
@@ -147,8 +148,6 @@ public class TestTarContainerPacker {
@Test
public void pack() throws IOException, CompressorException {
- assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
-
//GIVEN
KeyValueContainerData sourceContainerData =
createContainer(SOURCE_CONTAINER_ROOT);
@@ -217,7 +216,7 @@ public class TestTarContainerPacker {
}
assertExampleMetadataDbIsGood(
- destinationContainerData.getDbFile().toPath(),
+ TarContainerPacker.getDbPath(destinationContainerData),
TEST_DB_FILE_NAME);
assertExampleChunkFileIsGood(
Paths.get(destinationContainerData.getChunksPath()),
@@ -232,8 +231,6 @@ public class TestTarContainerPacker {
@Test
public void unpackContainerDataWithValidRelativeDbFilePath()
throws Exception {
- assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
-
//GIVEN
KeyValueContainerData sourceContainerData =
createContainer(SOURCE_CONTAINER_ROOT);
@@ -248,14 +245,13 @@ public class TestTarContainerPacker {
KeyValueContainerData dest = unpackContainerData(containerFile);
// THEN
- assertExampleMetadataDbIsGood(dest.getDbFile().toPath(), fileName);
+ assertExampleMetadataDbIsGood(
+ TarContainerPacker.getDbPath(dest), fileName);
}
@Test
public void unpackContainerDataWithValidRelativeChunkFilePath()
throws Exception {
- assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
-
//GIVEN
KeyValueContainerData sourceContainerData =
createContainer(SOURCE_CONTAINER_ROOT);
@@ -276,8 +272,6 @@ public class TestTarContainerPacker {
@Test
public void unpackContainerDataWithInvalidRelativeDbFilePath()
throws Exception {
- assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
-
//GIVEN
KeyValueContainerData sourceContainerData =
createContainer(SOURCE_CONTAINER_ROOT);
@@ -295,8 +289,6 @@ public class TestTarContainerPacker {
@Test
public void unpackContainerDataWithInvalidRelativeChunkFilePath()
throws Exception {
- assumeFalse(schemaVersion.equals(OzoneConsts.SCHEMA_V3));
-
//GIVEN
KeyValueContainerData sourceContainerData =
createContainer(SOURCE_CONTAINER_ROOT);
@@ -333,29 +325,26 @@ public class TestTarContainerPacker {
private File writeChunkFile(
KeyValueContainerData containerData, String chunkFileName)
throws IOException {
- Path path = Paths.get(containerData.getChunksPath())
- .resolve(chunkFileName);
- Files.createDirectories(path.getParent());
- File file = path.toFile();
- FileOutputStream fileStream = new FileOutputStream(file);
- try (OutputStreamWriter writer = new OutputStreamWriter(fileStream,
- UTF_8)) {
- IOUtils.write(TEST_CHUNK_FILE_CONTENT, writer);
- }
- return file;
+ return writeSingleFile(Paths.get(containerData.getChunksPath()),
+ chunkFileName, TEST_CHUNK_FILE_CONTENT);
}
private File writeDbFile(
KeyValueContainerData containerData, String dbFileName)
throws IOException {
- Path path = containerData.getDbFile().toPath()
- .resolve(dbFileName);
+ return writeSingleFile(TarContainerPacker.getDbPath(containerData),
+ dbFileName, TEST_DB_FILE_CONTENT);
+ }
+
+ private File writeSingleFile(Path parentPath, String fileName,
+ String content) throws IOException {
+ Path path = parentPath.resolve(fileName);
Files.createDirectories(path.getParent());
File file = path.toFile();
FileOutputStream fileStream = new FileOutputStream(file);
try (OutputStreamWriter writer = new OutputStreamWriter(fileStream,
UTF_8)) {
- IOUtils.write(TEST_DB_FILE_CONTENT, writer);
+ IOUtils.write(content, writer);
}
return file;
}
@@ -374,35 +363,29 @@ public class TestTarContainerPacker {
private void assertExampleMetadataDbIsGood(Path dbPath, String filename)
throws IOException {
-
- Path dbFile = dbPath.resolve(filename);
-
- Assert.assertTrue(
- "example DB file is missing after pack/unpackContainerData: " + dbFile,
- Files.exists(dbFile));
-
- try (FileInputStream testFile = new FileInputStream(dbFile.toFile())) {
- List<String> strings = IOUtils.readLines(testFile, UTF_8);
- Assert.assertEquals(1, strings.size());
- Assert.assertEquals(TEST_DB_FILE_CONTENT, strings.get(0));
- }
+ assertExampleFileIsGood(dbPath, filename, TEST_DB_FILE_CONTENT);
}
private void assertExampleChunkFileIsGood(Path chunkPath, String filename)
throws IOException {
+ assertExampleFileIsGood(chunkPath, filename, TEST_CHUNK_FILE_CONTENT);
+ }
+
+ private void assertExampleFileIsGood(Path parentPath, String filename,
+ String content) throws IOException {
- Path chunkFile = chunkPath.resolve(filename);
+ Path exampleFile = parentPath.resolve(filename);
Assert.assertTrue(
- "example chunk file is missing after pack/unpackContainerData: "
- + chunkFile,
- Files.exists(chunkFile));
+ "example file is missing after pack/unpackContainerData: "
+ + exampleFile,
+ Files.exists(exampleFile));
- try (FileInputStream testFile = new FileInputStream(chunkFile.toFile())) {
+ try (FileInputStream testFile =
+ new FileInputStream(exampleFile.toFile())) {
List<String> strings = IOUtils.readLines(testFile, UTF_8);
Assert.assertEquals(1, strings.size());
- Assert.assertEquals(TEST_CHUNK_FILE_CONTENT, strings.get(0));
+ Assert.assertEquals(content, strings.get(0));
}
}
-
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DumpFileLoader.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DumpFileLoader.java
new file mode 100644
index 0000000000..9355ba7399
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DumpFileLoader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Interface for loading data from a dump file.
+ */
+public interface DumpFileLoader {
+
+ /**
+ * Load key value pairs from an external dump file.
+ */
+ void load(File externalFile) throws IOException;
+}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DumpFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DumpFileWriter.java
new file mode 100644
index 0000000000..f26ba50b59
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DumpFileWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Interface for write data into a dump file.
+ */
+public interface DumpFileWriter extends Closeable {
+ /**
+ * Open an external file for dump.
+ * @param externalFile
+ */
+ void open(File externalFile) throws IOException;
+
+ /**
+ * Put a key value pair into the file.
+ * @param key
+ * @param value
+ */
+ void put(byte[] key, byte[] value) throws IOException;
+
+ /**
+ * Finish dumping.
+ */
+ @Override
+ void close() throws IOException;
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java
new file mode 100644
index 0000000000..f1d4149b83
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.toIOException;
+
+/**
+ * DumpFileLoader using rocksdb sst files.
+ */
+public class RDBSstFileLoader implements DumpFileLoader {
+
+ private final RocksDB db;
+ private final ColumnFamilyHandle handle;
+ private final IngestExternalFileOptions ingestOptions;
+
+
+ public RDBSstFileLoader(RocksDB db, ColumnFamilyHandle handle) {
+ this.db = db;
+ this.handle = handle;
+ this.ingestOptions = new IngestExternalFileOptions()
+ .setIngestBehind(false);
+ }
+
+ @Override
+ public void load(File externalFile) throws IOException {
+ // Ingest an empty sst file results in exception.
+ if (externalFile.length() == 0) {
+ return;
+ }
+
+ try {
+ db.ingestExternalFile(handle,
+ Collections.singletonList(externalFile.getAbsolutePath()),
+ ingestOptions);
+ } catch (RocksDBException e) {
+ throw toIOException("Failed to ingest external file "
+ + externalFile.getAbsolutePath() + ", ingestBehind:"
+ + ingestOptions.ingestBehind(), e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
new file mode 100644
index 0000000000..a568632f91
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.rocksdb.EnvOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.toIOException;
+
+/**
+ * DumpFileWriter using rocksdb sst files.
+ */
+public class RDBSstFileWriter implements DumpFileWriter {
+
+ private final SstFileWriter sstFileWriter;
+ private File sstFile;
+ private AtomicLong keyCounter;
+
+ public RDBSstFileWriter() {
+ EnvOptions envOptions = new EnvOptions();
+ Options options = new Options();
+ this.sstFileWriter = new SstFileWriter(envOptions, options);
+ this.keyCounter = new AtomicLong(0);
+ }
+
+ @Override
+ public void open(File externalFile) throws IOException {
+ this.sstFile = externalFile;
+ try {
+ // Here will create a new sst file each time, not append to existing
+ sstFileWriter.open(sstFile.getAbsolutePath());
+ } catch (RocksDBException e) {
+ throw toIOException("Failed to open external file for dump "
+ + sstFile.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) throws IOException {
+ try {
+ sstFileWriter.put(key, value);
+ keyCounter.incrementAndGet();
+ } catch (RocksDBException e) {
+ throw toIOException("Failed to put kv into dump file "
+ + sstFile.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ // We should check for empty sst file, or we'll get exception.
+ if (keyCounter.get() > 0) {
+ sstFileWriter.finish();
+ }
+ } catch (RocksDBException e) {
+ throw toIOException("Failed to finish dumping into file "
+ + sstFile.getAbsolutePath(), e);
+ } finally {
+ sstFileWriter.close();
+ }
+
+ keyCounter.set(0);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 81b8872a82..3767fa7474 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.utils.db;
+import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -270,6 +271,25 @@ class RDBTable implements Table<byte[], byte[]> {
}
}
+ @Override
+ public void dumpToFileWithPrefix(File externalFile, byte[] prefix)
+ throws IOException {
+ try (TableIterator<byte[], ByteArrayKeyValue> iter = iterator(prefix);
+ DumpFileWriter fileWriter = new RDBSstFileWriter()) {
+ fileWriter.open(externalFile);
+ while (iter.hasNext()) {
+ ByteArrayKeyValue entry = iter.next();
+ fileWriter.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ public void loadFromFile(File externalFile) throws IOException {
+ DumpFileLoader fileLoader = new RDBSstFileLoader(db, handle);
+ fileLoader.load(externalFile);
+ }
+
private List<ByteArrayKeyValue> getRangeKVs(byte[] startKey,
int count, boolean sequential, byte[] prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index b99e1d24ec..3202431474 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.utils.db;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -281,6 +282,22 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
throws IOException;
+ /**
+ * Dump all key value pairs with a prefix into an external file.
+ * @param externalFile
+ * @param prefix
+ * @throws IOException
+ */
+ void dumpToFileWithPrefix(File externalFile, KEY prefix) throws IOException;
+
+ /**
+ * Load key value pairs from an external file created by
+ * dumpToFileWithPrefix.
+ * @param externalFile
+ * @throws IOException
+ */
+ void loadFromFile(File externalFile) throws IOException;
+
/**
* Class used to represent the key and value pair of a db entry.
*/
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 6469bfe911..c43855a065 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hdds.utils.db;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -380,6 +381,18 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
rawTable.deleteBatchWithPrefix(batch, codecRegistry.asRawData(prefix));
}
+ @Override
+ public void dumpToFileWithPrefix(File externalFile, KEY prefix)
+ throws IOException {
+ rawTable.dumpToFileWithPrefix(externalFile,
+ codecRegistry.asRawData(prefix));
+ }
+
+ @Override
+ public void loadFromFile(File externalFile) throws IOException {
+ rawTable.loadFromFile(externalFile);
+ }
+
@Override
public void cleanupCache(List<Long> epochs) {
cache.cleanup(epochs);
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
index 387c299628..e721efc3ac 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
@@ -58,7 +58,9 @@ public class TestRDBTableStore {
"Sixth", "Seventh",
"Eighth");
private final List<String> prefixedFamilies = Arrays.asList(
- "PrefixFirst"
+ "PrefixFirst",
+ "PrefixTwo", "PrefixThree",
+ "PrefixFour", "PrefixFifth"
);
private static final int PREFIX_LENGTH = 8;
@Rule
@@ -524,6 +526,93 @@ public class TestRDBTableStore {
}
}
+ @Test
+ public void testDumpAndLoadBasic() throws Exception {
+ int containerCount = 3;
+ int blockCount = 5;
+ List<String> testPrefixes = generatePrefixes(containerCount);
+ List<Map<String, String>> testData = generateKVs(testPrefixes, blockCount);
+ File dumpFile = folder.newFile("PrefixTwo.dump");
+ byte[] samplePrefix = testPrefixes.get(2).getBytes(StandardCharsets.UTF_8);
+
+ try (Table<byte[], byte[]> testTable1 = rdbStore.getTable("PrefixTwo")) {
+ // write data
+ populatePrefixedTable(testTable1, testData);
+
+ // dump to external file
+ testTable1.dumpToFileWithPrefix(dumpFile, samplePrefix);
+
+ // check dump file exist
+ Assert.assertTrue(dumpFile.exists());
+ Assert.assertTrue(dumpFile.length() != 0);
+ }
+
+ // load dump file into another table
+ try (Table<byte[], byte[]> testTable2 = rdbStore.getTable("PrefixThree")) {
+ testTable2.loadFromFile(dumpFile);
+
+ // check loaded keys
+ try (TableIterator<byte[],
+ ? extends Table.KeyValue<byte[], byte[]>> iter = testTable2.iterator(
+ samplePrefix)) {
+ int keyCount = 0;
+ while (iter.hasNext()) {
+ // check prefix
+ Assert.assertTrue(Arrays.equals(
+ Arrays.copyOf(iter.next().getKey(), PREFIX_LENGTH),
+ samplePrefix));
+ keyCount++;
+ }
+
+ // check block count
+ Assert.assertEquals(blockCount, keyCount);
+ }
+ }
+ }
+
+ @Test
+ public void testDumpAndLoadEmpty() throws Exception {
+ int containerCount = 3;
+ List<String> testPrefixes = generatePrefixes(containerCount);
+
+ File dumpFile = folder.newFile("PrefixFour.dump");
+ byte[] samplePrefix = testPrefixes.get(2).getBytes(StandardCharsets.UTF_8);
+
+ try (Table<byte[], byte[]> testTable1 = rdbStore.getTable("PrefixFour")) {
+ // no data
+
+ // dump to external file
+ testTable1.dumpToFileWithPrefix(dumpFile, samplePrefix);
+
+ // check dump file exist
+ Assert.assertTrue(dumpFile.exists());
+ // empty dump file
+ Assert.assertTrue(dumpFile.length() == 0);
+ }
+
+ // load dump file into another table
+ try (Table<byte[], byte[]> testTable2 = rdbStore.getTable("PrefixFifth")) {
+ testTable2.loadFromFile(dumpFile);
+
+ // check loaded keys
+ try (TableIterator<byte[],
+ ? extends Table.KeyValue<byte[], byte[]>> iter = testTable2.iterator(
+ samplePrefix)) {
+ int keyCount = 0;
+ while (iter.hasNext()) {
+ // check prefix
+ Assert.assertTrue(Arrays.equals(
+ Arrays.copyOf(iter.next().getKey(), PREFIX_LENGTH),
+ samplePrefix));
+ keyCount++;
+ }
+
+ // check block count
+ Assert.assertEquals(0, keyCount);
+ }
+ }
+ }
+
private List<String> generatePrefixes(int prefixCount) {
List<String> prefixes = new ArrayList<>();
for (int i = 0; i < prefixCount; i++) {
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 540ac7b571..20a2231450 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -146,6 +146,8 @@ enum Result {
BLOCK_TOKEN_VERIFICATION_FAILED = 41;
ERROR_IN_DB_SYNC = 42;
CHUNK_FILE_INCONSISTENCY = 43;
+ EXPORT_CONTAINER_METADATA_FAILED = 44;
+ IMPORT_CONTAINER_METADATA_FAILED = 45;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org