You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:45:50 UTC
[ozone] 01/27: disable compression for closed container replication
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit f0c1a2b09565b83e45be86869caf42551dcbe763
Author: Elek Márton <el...@apache.org>
AuthorDate: Wed Jan 13 15:43:12 2021 +0100
disable compression for closed container replication
---
.../container/keyvalue/TarContainerPacker.java | 63 ++++++++--------------
.../replication/DownloadAndImportReplicator.java | 8 ++-
.../container/keyvalue/TestTarContainerPacker.java | 24 +++------
3 files changed, 36 insertions(+), 59 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index 349a15d..025cadf 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -74,39 +74,32 @@ public class TarContainerPacker
Path dbRoot = containerData.getDbFile().toPath();
Path chunksRoot = Paths.get(containerData.getChunksPath());
- try (InputStream decompressed = decompress(input);
- ArchiveInputStream archiveInput = untar(decompressed)) {
-
- ArchiveEntry entry = archiveInput.getNextEntry();
- while (entry != null) {
- String name = entry.getName();
- long size = entry.getSize();
- if (name.startsWith(DB_DIR_NAME + "/")) {
- Path destinationPath = dbRoot
- .resolve(name.substring(DB_DIR_NAME.length() + 1));
- extractEntry(archiveInput, size, dbRoot, destinationPath);
- } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
+ ArchiveInputStream archiveInput = untar(input);
+
+ ArchiveEntry entry = archiveInput.getNextEntry();
+ while (entry != null) {
+ String name = entry.getName();
+ long size = entry.getSize();
+ if (name.startsWith(DB_DIR_NAME + "/")) {
+ Path destinationPath = dbRoot
+ .resolve(name.substring(DB_DIR_NAME.length() + 1));
+ extractEntry(archiveInput, size, dbRoot, destinationPath);
+ } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
Path destinationPath = chunksRoot
.resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
extractEntry(archiveInput, size, chunksRoot, destinationPath);
} else if (CONTAINER_FILE_NAME.equals(name)) {
- //Don't do anything. Container file should be unpacked in a
- //separated step by unpackContainerDescriptor call.
- descriptorFileContent = readEntry(archiveInput, size);
- } else {
- throw new IllegalArgumentException(
- "Unknown entry in the tar file: " + "" + name);
- }
- entry = archiveInput.getNextEntry();
+ //Don't do anything. Container file should be unpacked in a
+ //separated step by unpackContainerDescriptor call.
+ descriptorFileContent = readEntry(archiveInput, size);
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown entry in the tar file: " + "" + name);
}
- return descriptorFileContent;
-
- } catch (CompressorException e) {
- throw new IOException(
- "Can't uncompress the given container: " + container
- .getContainerData().getContainerID(),
- e);
+ entry = archiveInput.getNextEntry();
}
+ return descriptorFileContent;
+
}
private void extractEntry(InputStream input, long size,
@@ -149,8 +142,7 @@ public class TarContainerPacker
KeyValueContainerData containerData = container.getContainerData();
- try (OutputStream compressed = compress(output);
- ArchiveOutputStream archiveOutput = tar(compressed)) {
+ try (ArchiveOutputStream archiveOutput = tar(output)) {
includePath(containerData.getDbFile().toPath(), DB_DIR_NAME,
archiveOutput);
@@ -160,18 +152,14 @@ public class TarContainerPacker
includeFile(container.getContainerFile(), CONTAINER_FILE_NAME,
archiveOutput);
- } catch (CompressorException e) {
- throw new IOException(
- "Can't compress the container: " + containerData.getContainerID(),
- e);
}
+
}
@Override
public byte[] unpackContainerDescriptor(InputStream input)
throws IOException {
- try (InputStream decompressed = decompress(input);
- ArchiveInputStream archiveInput = untar(decompressed)) {
+ try (ArchiveInputStream archiveInput = untar(input)) {
ArchiveEntry entry = archiveInput.getNextEntry();
while (entry != null) {
@@ -181,12 +169,7 @@ public class TarContainerPacker
}
entry = archiveInput.getNextEntry();
}
- } catch (CompressorException e) {
- throw new IOException(
- "Can't read the container descriptor from the container archive",
- e);
}
-
throw new IOException(
"Container descriptor is missing from the container archive.");
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index cdab0fd..ee78a3c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -114,8 +114,12 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
//wait for the download. This thread pool is limiting the paralell
//downloads, so it's ok to block here and wait for the full download.
Path path = tempTarFile.get();
- LOG.info("Container {} is downloaded, starting to import.",
- containerID);
+ long bytes = Files.size(path);
+
+ LOG.info("Container {} is downloaded with size {}, starting to import.",
+ containerID, bytes);
+ task.setTransferredBytes(bytes);
+
importContainer(containerID, path);
LOG.info("Container {} is replicated successfully", containerID);
task.setStatus(Status.DONE);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index d248ac1..c60c8ba 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -33,21 +33,18 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorInputStream;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -55,8 +52,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
-
/**
* Test the tar/untar for a given container.
*/
@@ -159,7 +154,7 @@ public class TestTarContainerPacker {
//sample container descriptor file
writeDescriptor(sourceContainer);
- Path targetFile = TEMP_DIR.resolve("container.tar.gz");
+ Path targetFile = TEMP_DIR.resolve("container.tar");
//WHEN: pack it
try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) {
@@ -168,16 +163,13 @@ public class TestTarContainerPacker {
//THEN: check the result
try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
- CompressorInputStream uncompressed = new CompressorStreamFactory()
- .createCompressorInputStream(GZIP, input);
- TarArchiveInputStream tarStream = new TarArchiveInputStream(uncompressed);
+ TarArchiveInputStream tarStream = new TarArchiveInputStream(input);
TarArchiveEntry entry;
Map<String, TarArchiveEntry> entries = new HashMap<>();
while ((entry = tarStream.getNextTarEntry()) != null) {
entries.put(entry.getName(), entry);
}
-
Assert.assertTrue(
entries.containsKey("container.yaml"));
@@ -337,11 +329,9 @@ public class TestTarContainerPacker {
private File packContainerWithSingleFile(File file, String entryName)
throws Exception {
- File targetFile = TEMP_DIR.resolve("container.tar.gz").toFile();
+ File targetFile = TEMP_DIR.resolve("container.tar").toFile();
try (FileOutputStream output = new FileOutputStream(targetFile);
- CompressorOutputStream gzipped = new CompressorStreamFactory()
- .createCompressorOutputStream(GZIP, output);
- ArchiveOutputStream archive = new TarArchiveOutputStream(gzipped)) {
+ ArchiveOutputStream archive = new TarArchiveOutputStream(output)) {
TarContainerPacker.includeFile(file, entryName, archive);
}
return targetFile;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org