You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:45:50 UTC

[ozone] 01/27: disable compression for closed container replication

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit f0c1a2b09565b83e45be86869caf42551dcbe763
Author: Elek Márton <el...@apache.org>
AuthorDate: Wed Jan 13 15:43:12 2021 +0100

    disable compression for closed container replication
---
 .../container/keyvalue/TarContainerPacker.java     | 63 ++++++++--------------
 .../replication/DownloadAndImportReplicator.java   |  8 ++-
 .../container/keyvalue/TestTarContainerPacker.java | 24 +++------
 3 files changed, 36 insertions(+), 59 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index 349a15d..025cadf 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -74,39 +74,32 @@ public class TarContainerPacker
     Path dbRoot = containerData.getDbFile().toPath();
     Path chunksRoot = Paths.get(containerData.getChunksPath());
 
-    try (InputStream decompressed = decompress(input);
-         ArchiveInputStream archiveInput = untar(decompressed)) {
-
-      ArchiveEntry entry = archiveInput.getNextEntry();
-      while (entry != null) {
-        String name = entry.getName();
-        long size = entry.getSize();
-        if (name.startsWith(DB_DIR_NAME + "/")) {
-          Path destinationPath = dbRoot
-              .resolve(name.substring(DB_DIR_NAME.length() + 1));
-          extractEntry(archiveInput, size, dbRoot, destinationPath);
-        } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
+    ArchiveInputStream archiveInput = untar(input);
+
+    ArchiveEntry entry = archiveInput.getNextEntry();
+    while (entry != null) {
+      String name = entry.getName();
+      long size = entry.getSize();
+      if (name.startsWith(DB_DIR_NAME + "/")) {
+        Path destinationPath = dbRoot
+            .resolve(name.substring(DB_DIR_NAME.length() + 1));
+        extractEntry(archiveInput, size, dbRoot, destinationPath);
+      } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
           Path destinationPath = chunksRoot
               .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
           extractEntry(archiveInput, size, chunksRoot, destinationPath);
         } else if (CONTAINER_FILE_NAME.equals(name)) {
-          //Don't do anything. Container file should be unpacked in a
-          //separated step by unpackContainerDescriptor call.
-          descriptorFileContent = readEntry(archiveInput, size);
-        } else {
-          throw new IllegalArgumentException(
-              "Unknown entry in the tar file: " + "" + name);
-        }
-        entry = archiveInput.getNextEntry();
+        //Don't do anything. Container file should be unpacked in a
+        //separated step by unpackContainerDescriptor call.
+        descriptorFileContent = readEntry(archiveInput, size);
+      } else {
+        throw new IllegalArgumentException(
+            "Unknown entry in the tar file: " + "" + name);
       }
-      return descriptorFileContent;
-
-    } catch (CompressorException e) {
-      throw new IOException(
-          "Can't uncompress the given container: " + container
-              .getContainerData().getContainerID(),
-          e);
+      entry = archiveInput.getNextEntry();
     }
+    return descriptorFileContent;
+
   }
 
   private void extractEntry(InputStream input, long size,
@@ -149,8 +142,7 @@ public class TarContainerPacker
 
     KeyValueContainerData containerData = container.getContainerData();
 
-    try (OutputStream compressed = compress(output);
-         ArchiveOutputStream archiveOutput = tar(compressed)) {
+    try (ArchiveOutputStream archiveOutput = tar(output)) {
 
       includePath(containerData.getDbFile().toPath(), DB_DIR_NAME,
           archiveOutput);
@@ -160,18 +152,14 @@ public class TarContainerPacker
 
       includeFile(container.getContainerFile(), CONTAINER_FILE_NAME,
           archiveOutput);
-    } catch (CompressorException e) {
-      throw new IOException(
-          "Can't compress the container: " + containerData.getContainerID(),
-          e);
     }
+
   }
 
   @Override
   public byte[] unpackContainerDescriptor(InputStream input)
       throws IOException {
-    try (InputStream decompressed = decompress(input);
-         ArchiveInputStream archiveInput = untar(decompressed)) {
+    try (ArchiveInputStream archiveInput = untar(input)) {
 
       ArchiveEntry entry = archiveInput.getNextEntry();
       while (entry != null) {
@@ -181,12 +169,7 @@ public class TarContainerPacker
         }
         entry = archiveInput.getNextEntry();
       }
-    } catch (CompressorException e) {
-      throw new IOException(
-          "Can't read the container descriptor from the container archive",
-          e);
     }
-
     throw new IOException(
         "Container descriptor is missing from the container archive.");
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index cdab0fd..ee78a3c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -114,8 +114,12 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
       //wait for the download. This thread pool is limiting the paralell
       //downloads, so it's ok to block here and wait for the full download.
       Path path = tempTarFile.get();
-      LOG.info("Container {} is downloaded, starting to import.",
-          containerID);
+      long bytes = Files.size(path);
+
+      LOG.info("Container {} is downloaded with size {}, starting to import.",
+          containerID, bytes);
+      task.setTransferredBytes(bytes);
+
       importContainer(containerID, path);
       LOG.info("Container {} is replicated successfully", containerID);
       task.setStatus(Status.DONE);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index d248ac1..c60c8ba 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -33,21 +33,18 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorOutputStream;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+import org.apache.hadoop.test.LambdaTestUtils;
 
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorInputStream;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -55,8 +52,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
-
 /**
  * Test the tar/untar for a given container.
  */
@@ -159,7 +154,7 @@ public class TestTarContainerPacker {
     //sample container descriptor file
     writeDescriptor(sourceContainer);
 
-    Path targetFile = TEMP_DIR.resolve("container.tar.gz");
+    Path targetFile = TEMP_DIR.resolve("container.tar");
 
     //WHEN: pack it
     try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) {
@@ -168,16 +163,13 @@ public class TestTarContainerPacker {
 
     //THEN: check the result
     try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
-      CompressorInputStream uncompressed = new CompressorStreamFactory()
-          .createCompressorInputStream(GZIP, input);
-      TarArchiveInputStream tarStream = new TarArchiveInputStream(uncompressed);
+      TarArchiveInputStream tarStream = new TarArchiveInputStream(input);
 
       TarArchiveEntry entry;
       Map<String, TarArchiveEntry> entries = new HashMap<>();
       while ((entry = tarStream.getNextTarEntry()) != null) {
         entries.put(entry.getName(), entry);
       }
-
       Assert.assertTrue(
           entries.containsKey("container.yaml"));
 
@@ -337,11 +329,9 @@ public class TestTarContainerPacker {
 
   private File packContainerWithSingleFile(File file, String entryName)
       throws Exception {
-    File targetFile = TEMP_DIR.resolve("container.tar.gz").toFile();
+    File targetFile = TEMP_DIR.resolve("container.tar").toFile();
     try (FileOutputStream output = new FileOutputStream(targetFile);
-         CompressorOutputStream gzipped = new CompressorStreamFactory()
-             .createCompressorOutputStream(GZIP, output);
-         ArchiveOutputStream archive = new TarArchiveOutputStream(gzipped)) {
+         ArchiveOutputStream archive = new TarArchiveOutputStream(output)) {
       TarContainerPacker.includeFile(file, entryName, archive);
     }
     return targetFile;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org