You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2023/02/01 21:33:27 UTC

[ozone] branch master updated: HDDS-7821. Let push replication use compression from configuration (#4229)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a796f1b54 HDDS-7821. Let push replication use compression from configuration (#4229)
1a796f1b54 is described below

commit 1a796f1b54610efd86d5aba0a91e8ae4d32e4af5
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Wed Feb 1 22:33:21 2023 +0100

    HDDS-7821. Let push replication use compression from configuration (#4229)
---
 .../hadoop/hdds/conf/ConfigurationTarget.java      |   4 +
 hadoop-hdds/container-service/pom.xml              |   4 +
 .../common/statemachine/DatanodeStateMachine.java  |   7 +-
 .../container/keyvalue/TarContainerPacker.java     |  41 ++------
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   3 +-
 .../container/replication/ContainerDownloader.java |   3 +-
 .../container/replication/ContainerImporter.java   |   9 +-
 .../replication/ContainerReplicationSource.java    |   3 +-
 .../container/replication/ContainerUploader.java   |   3 +-
 .../replication/CopyContainerCompression.java      | 103 +++++++++++++++------
 .../replication/DownloadAndImportReplicator.java   |  10 +-
 .../replication/GrpcContainerUploader.java         |   9 +-
 .../replication/GrpcReplicationClient.java         |  10 +-
 .../replication/GrpcReplicationService.java        |   6 +-
 .../OnDemandContainerReplicationSource.java        |  20 +---
 .../container/replication/PushReplicator.java      |  13 +--
 .../replication/SendContainerOutputStream.java     |   7 +-
 .../replication/SendContainerRequestHandler.java   |   4 +-
 .../replication/SimpleContainerDownloader.java     |  12 +--
 .../container/keyvalue/TestKeyValueContainer.java  |  10 +-
 .../container/keyvalue/TestTarContainerPacker.java |  13 +--
 .../replication/GrpcOutputStreamTest.java          |   2 +-
 .../replication/TestCopyContainerCompression.java  |  95 +++++++++++++++++++
 .../container/replication/TestPushReplicator.java  |  48 +++++++---
 .../replication/TestReplicationSupervisor.java     |   6 +-
 .../replication/TestSendContainerOutputStream.java |  27 +++++-
 .../replication/TestSimpleContainerDownloader.java |  18 ++--
 .../upgrade/TestDatanodeUpgradeToScmHA.java        |  10 +-
 .../src/main/proto/DatanodeClientProtocol.proto    |   1 +
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |   5 +-
 .../ozone/debug/container/ExportSubcommand.java    |   4 +-
 .../ozone/freon/ClosedContainerReplicator.java     |   5 +-
 32 files changed, 344 insertions(+), 171 deletions(-)

diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
index b6be89cd0c..d4bfb360b9 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
@@ -44,6 +44,10 @@ public interface ConfigurationTarget {
     set(name, Boolean.toString(value));
   }
 
+  default <T extends Enum<T>> void setEnum(String name, T value) {
+    set(name, value.name());
+  }
+
   default void setTimeDuration(String name, long value, TimeUnit unit) {
     set(name, value + ParsedTimeDuration.unitFor(unit).suffix());
   }
diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml
index 21d4b755d9..1144b6018a 100644
--- a/hadoop-hdds/container-service/pom.xml
+++ b/hadoop-hdds/container-service/pom.xml
@@ -140,6 +140,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>mockito-junit-jupiter</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+    </dependency>
 
   </dependencies>
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 69b22885e3..01d114245a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Repl
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
 import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
 import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
@@ -181,12 +180,12 @@ public class DatanodeStateMachine implements Closeable {
     ContainerImporter importer = new ContainerImporter(conf,
         container.getContainerSet(),
         container.getController(),
-        new TarContainerPacker(), container.getVolumeSet());
+        container.getVolumeSet());
     ContainerReplicator pullReplicator = new DownloadAndImportReplicator(
-        container.getContainerSet(),
+        conf, container.getContainerSet(),
         importer,
         new SimpleContainerDownloader(conf, dnCertClient));
-    ContainerReplicator pushReplicator = new PushReplicator(
+    ContainerReplicator pushReplicator = new PushReplicator(conf,
         new OnDemandContainerReplicationSource(container.getController()),
         new GrpcContainerUploader(conf, dnCertClient)
     );
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index f5763958a1..77eb825b39 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -30,7 +30,6 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.stream.Stream;
-import java.util.Objects;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -40,18 +39,16 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 
-
 import org.apache.commons.compress.archivers.ArchiveEntry;
 import org.apache.commons.compress.archivers.ArchiveInputStream;
 import org.apache.commons.compress.archivers.ArchiveOutputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
+import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
@@ -69,15 +66,9 @@ public class TarContainerPacker
 
   static final String CONTAINER_FILE_NAME = "container.yaml";
 
-  private final String compression;
-
-  private static final String NO_COMPRESSION = "no_compression";
+  private final CopyContainerCompression compression;
 
-  public TarContainerPacker() {
-    this.compression = NO_COMPRESSION;
-  }
-
-  public TarContainerPacker(String compression) {
+  public TarContainerPacker(CopyContainerCompression compression) {
     this.compression = compression;
   }
 
@@ -175,10 +166,6 @@ public class TarContainerPacker
 
       includePath(Paths.get(containerData.getChunksPath()), CHUNKS_DIR_NAME,
           archiveOutput);
-    } catch (CompressorException e) {
-      throw new IOException(
-          "Can't compress the container: " + containerData.getContainerID(),
-          e);
     }
   }
 
@@ -195,10 +182,6 @@ public class TarContainerPacker
         }
         entry = archiveInput.getNextEntry();
       }
-    } catch (CompressorException e) {
-      throw new IOException(
-          "Can't read the container descriptor from the container archive",
-          e);
     }
 
     throw new IOException(
@@ -293,19 +276,13 @@ public class TarContainerPacker
   }
 
   @VisibleForTesting
-  InputStream decompress(InputStream input)
-      throws CompressorException {
-    return Objects.equals(compression, NO_COMPRESSION) ?
-        input : new CompressorStreamFactory()
-        .createCompressorInputStream(compression, input);
+  InputStream decompress(InputStream input) throws IOException {
+    return compression.wrap(input);
   }
 
   @VisibleForTesting
-  OutputStream compress(OutputStream output)
-      throws CompressorException {
-    return Objects.equals(compression, NO_COMPRESSION) ?
-        output : new CompressorStreamFactory()
-        .createCompressorOutputStream(compression, output);
+  OutputStream compress(OutputStream output) throws IOException {
+    return compression.wrap(output);
   }
 
   private byte[] innerUnpack(InputStream input, Path dbRoot, Path chunksRoot)
@@ -337,10 +314,6 @@ public class TarContainerPacker
         entry = archiveInput.getNextEntry();
       }
       return descriptorFileContent;
-
-    } catch (CompressorException e) {
-      throw new IOException("Can't uncompress to dbRoot: " + dbRoot +
-              ", chunksRoot: " + chunksRoot, e);
     }
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 3c63aaa799..b3db557b6f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
@@ -208,7 +207,7 @@ public class OzoneContainer {
         secConf,
         certClient,
         new ContainerImporter(conf, containerSet, controller,
-            new TarContainerPacker(), volumeSet));
+            volumeSet));
 
     readChannel = new XceiverServerGrpc(
         datanodeDetails, config, hddsDispatcher, certClient);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
index 45c70f8e72..72ebef9ac9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 public interface ContainerDownloader extends Closeable {
 
   Path getContainerDataFromReplicas(long containerId,
-      List<DatanodeDetails> sources, Path downloadDir);
+      List<DatanodeDetails> sources, Path downloadDir,
+      CopyContainerCompression compression);
 
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index 59316a8c8f..c4955931e1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -54,17 +54,15 @@ public class ContainerImporter {
   private static final String CONTAINER_COPY_TMP_DIR = "tmp";
   private final ContainerSet containerSet;
   private final ContainerController controller;
-  private final TarContainerPacker packer;
   private final MutableVolumeSet volumeSet;
   private final VolumeChoosingPolicy volumeChoosingPolicy;
   private final long containerSize;
 
   public ContainerImporter(ConfigurationSource conf, ContainerSet containerSet,
-      ContainerController controller, TarContainerPacker tarContainerPacker,
+      ContainerController controller,
       MutableVolumeSet volumeSet) {
     this.containerSet = containerSet;
     this.controller = controller;
-    this.packer = tarContainerPacker;
     this.volumeSet = volumeSet;
     try {
       volumeChoosingPolicy = conf.getClass(
@@ -79,7 +77,8 @@ public class ContainerImporter {
   }
 
   public void importContainer(long containerID, Path tarFilePath,
-      HddsVolume hddsVolume) throws IOException {
+      HddsVolume hddsVolume, CopyContainerCompression compression)
+      throws IOException {
 
     HddsVolume targetVolume = hddsVolume;
     if (targetVolume == null) {
@@ -88,6 +87,8 @@ public class ContainerImporter {
     try {
       KeyValueContainerData containerData;
 
+      TarContainerPacker packer = new TarContainerPacker(compression);
+
       try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
         byte[] containerDescriptorYaml =
             packer.unpackContainerDescriptor(input);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
index 72f32b2089..58c7521a22 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
@@ -44,7 +44,8 @@ public interface ContainerReplicationSource {
    * @param compression Compression algorithm.
    * @throws IOException
    */
-  void copyData(long containerId, OutputStream destination, String compression)
+  void copyData(long containerId, OutputStream destination,
+      CopyContainerCompression compression)
       throws IOException;
 
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
index cb9264f2b5..c2b82633bd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
@@ -28,5 +28,6 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface ContainerUploader {
   OutputStream startUpload(long containerId, DatanodeDetails target,
-      CompletableFuture<Void> callback) throws IOException;
+      CompletableFuture<Void> callback, CopyContainerCompression compression)
+      throws IOException;
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
index 67f0d6fc85..fc1ae676d0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
@@ -17,15 +17,17 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
-import com.google.common.collect.ImmutableMap;
+import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.ConfigurationTarget;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPLICATION_COMPRESSION;
 
@@ -34,45 +36,94 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPLICATION_C
  */
 public enum CopyContainerCompression {
 
-  NO_COMPRESSION,
-  GZIP,
-  LZ4,
-  SNAPPY,
-  ZSTD;
+  NO_COMPRESSION("no_compression") {
+    @Override
+    public InputStream wrap(InputStream input) {
+      return input;
+    }
+
+    @Override
+    public OutputStream wrap(OutputStream output) {
+      return output;
+    }
+  },
+  GZIP(CompressorStreamFactory.GZIP),
+  LZ4(CompressorStreamFactory.LZ4_FRAMED),
+  SNAPPY(CompressorStreamFactory.SNAPPY_FRAMED),
+  ZSTD(CompressorStreamFactory.ZSTANDARD);
+
+  private final String compressorFactoryName;
+
+  CopyContainerCompression(String compressorFactoryName) {
+    this.compressorFactoryName = compressorFactoryName;
+  }
 
   private static final Logger LOG =
       LoggerFactory.getLogger(CopyContainerCompression.class);
 
   private static final CopyContainerCompression DEFAULT_COMPRESSION =
       CopyContainerCompression.NO_COMPRESSION;
-  private static final Map<CopyContainerCompression, String>
-      COMPRESSION_MAPPING = ImmutableMap.copyOf(getMapping());
-
-  private static Map<CopyContainerCompression, String> getMapping() {
-    return new HashMap<CopyContainerCompression, String>() { {
-        put(NO_COMPRESSION, "no_compression");
-        put(GZIP, CompressorStreamFactory.GZIP);
-        put(LZ4, CompressorStreamFactory.LZ4_FRAMED);
-        put(SNAPPY, CompressorStreamFactory.SNAPPY_FRAMED);
-        put(ZSTD, CompressorStreamFactory.ZSTANDARD);
-      }};
-  }
-
-  public static Map<CopyContainerCompression, String> getCompressionMapping() {
-    return COMPRESSION_MAPPING;
-  }
 
   public static CopyContainerCompression getConf(ConfigurationSource conf) {
     try {
       return conf.getEnum(HDDS_CONTAINER_REPLICATION_COMPRESSION,
           DEFAULT_COMPRESSION);
     } catch (IllegalArgumentException e) {
-      LOG.warn("Unsupported compression codec. Skip compression.");
+      LOG.warn("Unsupported compression codec {}, defaulting to {}",
+          conf.get(HDDS_CONTAINER_REPLICATION_COMPRESSION),
+          DEFAULT_COMPRESSION);
       return DEFAULT_COMPRESSION;
     }
   }
 
+  public void setOn(ConfigurationTarget conf) {
+    conf.setEnum(HDDS_CONTAINER_REPLICATION_COMPRESSION, this);
+  }
+
   public static CopyContainerCompression getDefaultCompression() {
     return NO_COMPRESSION;
   }
+
+  public ContainerProtos.CopyContainerCompressProto toProto() {
+    return ContainerProtos.CopyContainerCompressProto.valueOf(name());
+  }
+
+  public static CopyContainerCompression fromProto(
+      ContainerProtos.CopyContainerCompressProto proto) {
+    if (proto == null) {
+      return getDefaultCompression();
+    }
+
+    try {
+      return valueOf(proto.name());
+    } catch (IllegalArgumentException e) {
+      return getDefaultCompression();
+    }
+  }
+
+  public InputStream wrap(InputStream input) throws IOException {
+    try {
+      return new CompressorStreamFactory().createCompressorInputStream(
+          compressorFactoryName, input);
+    } catch (CompressorException e) {
+      throw toIOException(e);
+    }
+  }
+
+  public OutputStream wrap(OutputStream output) throws IOException {
+    try {
+      return new CompressorStreamFactory().createCompressorOutputStream(
+          compressorFactoryName, output);
+    } catch (CompressorException e) {
+      throw toIOException(e);
+    }
+  }
+
+  private static IOException toIOException(CompressorException e) {
+    Throwable cause = e.getCause();
+    if (cause instanceof IOException) {
+      return (IOException) cause;
+    }
+    return new IOException(e);
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 1c23d17475..5b5f954afd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -22,6 +22,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -44,14 +45,16 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
   private final ContainerDownloader downloader;
   private final ContainerImporter containerImporter;
   private final ContainerSet containerSet;
+  private final CopyContainerCompression compression;
 
   public DownloadAndImportReplicator(
-      ContainerSet containerSet,
+      ConfigurationSource conf, ContainerSet containerSet,
       ContainerImporter containerImporter,
       ContainerDownloader downloader) {
     this.containerSet = containerSet;
     this.downloader = downloader;
     this.containerImporter = containerImporter;
+    compression = CopyContainerCompression.getConf(conf);
   }
 
   @Override
@@ -74,7 +77,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
       // downloads, so it's ok to block here and wait for the full download.
       Path tarFilePath =
           downloader.getContainerDataFromReplicas(containerID, sourceDatanodes,
-              ContainerImporter.getUntarDirectory(targetVolume));
+              ContainerImporter.getUntarDirectory(targetVolume), compression);
       if (tarFilePath == null) {
         task.setStatus(Status.FAILED);
         return;
@@ -84,7 +87,8 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
               containerID, bytes);
       task.setTransferredBytes(bytes);
 
-      containerImporter.importContainer(containerID, tarFilePath, targetVolume);
+      containerImporter.importContainer(containerID, tarFilePath, targetVolume,
+          compression);
 
       LOG.info("Container {} is replicated successfully", containerID);
       task.setStatus(Status.DONE);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
index aa7322428e..0f8a8fdd71 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
@@ -42,26 +42,25 @@ public class GrpcContainerUploader implements ContainerUploader {
 
   private final SecurityConfig securityConfig;
   private final CertificateClient certClient;
-  private final CopyContainerCompression compression;
 
   public GrpcContainerUploader(
       ConfigurationSource conf, CertificateClient certClient) {
     this.certClient = certClient;
     securityConfig = new SecurityConfig(conf);
-    compression = CopyContainerCompression.getConf(conf);
   }
 
   @Override
   public OutputStream startUpload(long containerId, DatanodeDetails target,
-      CompletableFuture<Void> callback) throws IOException {
+      CompletableFuture<Void> callback, CopyContainerCompression compression)
+      throws IOException {
     GrpcReplicationClient client =
         new GrpcReplicationClient(target.getIpAddress(),
             target.getPort(Port.Name.REPLICATION).getValue(),
-            securityConfig, certClient, compression.toString());
+            securityConfig, certClient, compression);
     StreamObserver<SendContainerRequest> requestStream = client.upload(
         new SendContainerResponseStreamObserver(containerId, target, callback));
     return new SendContainerOutputStream(requestStream, containerId,
-        GrpcReplicationService.BUFFER_SIZE);
+        GrpcReplicationService.BUFFER_SIZE, compression);
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index be9a0ece48..7301a6a2c6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -27,7 +27,6 @@ import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
@@ -62,12 +61,12 @@ public class GrpcReplicationClient implements AutoCloseable {
 
   private final IntraDatanodeProtocolServiceStub client;
 
-  private final ContainerProtos.CopyContainerCompressProto compression;
+  private final CopyContainerCompression compression;
 
   public GrpcReplicationClient(
       String host, int port,
       SecurityConfig secConfig, CertificateClient certClient,
-      String compression)
+      CopyContainerCompression compression)
       throws IOException {
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forAddress(host, port)
@@ -92,8 +91,7 @@ public class GrpcReplicationClient implements AutoCloseable {
     }
     channel = channelBuilder.build();
     client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
-    this.compression =
-        ContainerProtos.CopyContainerCompressProto.valueOf(compression);
+    this.compression = compression;
   }
 
   public CompletableFuture<Path> download(long containerId, Path dir) {
@@ -102,7 +100,7 @@ public class GrpcReplicationClient implements AutoCloseable {
             .setContainerID(containerId)
             .setLen(-1)
             .setReadOffset(0)
-            .setCompression(compression)
+            .setCompression(compression.toProto())
             .build();
 
     CompletableFuture<Path> response = new CompletableFuture<>();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index 533a3b40fe..9e8c2c12c9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -31,6 +31,8 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.fromProto;
+
 /**
  * Service to make containers available for replication.
  */
@@ -55,9 +57,7 @@ public class GrpcReplicationService extends
   public void download(CopyContainerRequestProto request,
       StreamObserver<CopyContainerResponseProto> responseObserver) {
     long containerID = request.getContainerID();
-    String compression = request.hasCompression() ?
-        request.getCompression().toString() : CopyContainerCompression
-        .getDefaultCompression().toString();
+    CopyContainerCompression compression = fromProto(request.getCompression());
     LOG.info("Streaming container data ({}) to other datanode " +
         "with compression {}", containerID, compression);
     try {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
index d17e29b097..bf05a3ad4d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -19,13 +19,11 @@ package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
@@ -39,16 +37,9 @@ public class OnDemandContainerReplicationSource
 
   private final ContainerController controller;
 
-  private Map<String, TarContainerPacker> packer = new HashMap<>();
-
   public OnDemandContainerReplicationSource(
       ContainerController controller) {
     this.controller = controller;
-    for (Map.Entry<CopyContainerCompression, String> entry :
-        CopyContainerCompression.getCompressionMapping().entrySet()) {
-      packer.put(
-          entry.getKey().toString(), new TarContainerPacker(entry.getValue()));
-    }
   }
 
   @Override
@@ -58,7 +49,7 @@ public class OnDemandContainerReplicationSource
 
   @Override
   public void copyData(long containerId, OutputStream destination,
-                       String compression)
+                       CopyContainerCompression compression)
       throws IOException {
 
     Container container = controller.getContainer(containerId);
@@ -68,13 +59,8 @@ public class OnDemandContainerReplicationSource
           " is not found.", CONTAINER_NOT_FOUND);
     }
 
-    if (!packer.containsKey(compression)) {
-      throw new IOException("Can't compress the container. Compression " +
-          compression + " is not found.");
-    }
     controller.exportContainer(
         container.getContainerType(), containerId, destination,
-        packer.get(compression));
-
+        new TarContainerPacker(compression));
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
index 9a0f6f5a44..eb9aafff95 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
@@ -27,8 +28,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
-
 /**
  * Pushes the container to the target datanode.
  */
@@ -39,11 +38,13 @@ public class PushReplicator implements ContainerReplicator {
 
   private final ContainerReplicationSource source;
   private final ContainerUploader uploader;
+  private final CopyContainerCompression compression;
 
-  public PushReplicator(ContainerReplicationSource source,
-      ContainerUploader uploader) {
+  public PushReplicator(ConfigurationSource conf,
+      ContainerReplicationSource source, ContainerUploader uploader) {
     this.source = source;
     this.uploader = uploader;
+    compression = CopyContainerCompression.getConf(conf);
   }
 
   @Override
@@ -57,8 +58,8 @@ public class PushReplicator implements ContainerReplicator {
     CountingOutputStream output = null;
     try {
       output = new CountingOutputStream(
-          uploader.startUpload(containerID, target, fut));
-      source.copyData(containerID, output, NO_COMPRESSION.name());
+          uploader.startUpload(containerID, target, fut, compression));
+      source.copyData(containerID, output, compression);
       fut.get();
       task.setTransferredBytes(output.getByteCount());
       task.setStatus(Status.DONE);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
index 622bd4f4a3..b24dcc10f5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
@@ -25,10 +25,14 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
  * Output stream adapter for SendContainerResponse.
  */
 class SendContainerOutputStream extends GrpcOutputStream<SendContainerRequest> {
+
+  private final CopyContainerCompression compression;
+
   SendContainerOutputStream(
       StreamObserver<SendContainerRequest> streamObserver,
-      long containerId, int bufferSize) {
+      long containerId, int bufferSize, CopyContainerCompression compression) {
     super(streamObserver, containerId, bufferSize);
+    this.compression = compression;
   }
 
   @Override
@@ -37,6 +41,7 @@ class SendContainerOutputStream extends GrpcOutputStream<SendContainerRequest> {
         .setContainerID(getContainerId())
         .setData(data)
         .setOffset(getWrittenBytes())
+        .setCompression(compression.toProto())
         .build();
     getStreamObserver().onNext(request);
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index b12aad452c..02a30986c4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -50,6 +50,7 @@ class SendContainerRequestHandler
   private OutputStream output;
   private HddsVolume volume;
   private Path path;
+  private CopyContainerCompression compression;
 
   SendContainerRequestHandler(
       ContainerImporter importer,
@@ -74,6 +75,7 @@ class SendContainerRequestHandler
         Files.createDirectories(dir);
         path = dir.resolve(ContainerUtils.getContainerTarName(containerId));
         output = Files.newOutputStream(path);
+        compression = CopyContainerCompression.fromProto(req.getCompression());
       }
 
       assertSame(containerId, req.getContainerID(), "containerID");
@@ -105,7 +107,7 @@ class SendContainerRequestHandler
     closeOutput();
 
     try {
-      importer.importContainer(containerId, path, volume);
+      importer.importContainer(containerId, path, volume, compression);
       LOG.info("Imported container {}", containerId);
       responseObserver.onNext(SendContainerResponse.newBuilder().build());
       responseObserver.onCompleted();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 0e81e512d3..2a6b6cbf86 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -51,19 +51,17 @@ public class SimpleContainerDownloader implements ContainerDownloader {
 
   private final SecurityConfig securityConfig;
   private final CertificateClient certClient;
-  private final String compression;
 
   public SimpleContainerDownloader(
       ConfigurationSource conf, CertificateClient certClient) {
     securityConfig = new SecurityConfig(conf);
     this.certClient = certClient;
-    this.compression = CopyContainerCompression.getConf(conf).toString();
   }
 
   @Override
   public Path getContainerDataFromReplicas(
       long containerId, List<DatanodeDetails> sourceDatanodes,
-      Path downloadDir) {
+      Path downloadDir, CopyContainerCompression compression) {
 
     if (downloadDir == null) {
       downloadDir = Paths.get(System.getProperty("java.io.tmpdir"))
@@ -76,7 +74,7 @@ public class SimpleContainerDownloader implements ContainerDownloader {
     for (DatanodeDetails datanode : shuffledDatanodes) {
       try {
         CompletableFuture<Path> result =
-            downloadContainer(containerId, datanode, downloadDir);
+            downloadContainer(containerId, datanode, downloadDir, compression);
         return result.get();
       } catch (ExecutionException | IOException e) {
         LOG.error("Error on replicating container: {} from {}/{}", containerId,
@@ -108,9 +106,9 @@ public class SimpleContainerDownloader implements ContainerDownloader {
   }
 
   @VisibleForTesting
-  protected CompletableFuture<Path> downloadContainer(
-      long containerId, DatanodeDetails datanode, Path downloadDir)
-      throws IOException {
+  protected CompletableFuture<Path> downloadContainer(long containerId,
+      DatanodeDetails datanode, Path downloadDir,
+      CopyContainerCompression compression) throws IOException {
     CompletableFuture<Path> result;
     GrpcReplicationClient grpcReplicationClient =
         new GrpcReplicationClient(datanode.getIpAddress(),
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 734b8d3193..835bbab7c6 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -80,6 +80,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
 import static org.apache.ratis.util.Preconditions.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -194,7 +195,7 @@ public class TestKeyValueContainer {
 
     //destination path
     File exportTar = folder.newFile("exported.tar");
-    TarContainerPacker packer = new TarContainerPacker();
+    TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION);
     //export the container
     try (FileOutputStream fos = new FileOutputStream(exportTar)) {
       keyValueContainer.exportContainerData(fos, packer);
@@ -221,9 +222,8 @@ public class TestKeyValueContainer {
 
     //destination path
     File folderToExport = folder.newFile("exported.tar");
-    for (Map.Entry<CopyContainerCompression, String> entry :
-        CopyContainerCompression.getCompressionMapping().entrySet()) {
-      TarContainerPacker packer = new TarContainerPacker(entry.getValue());
+    for (CopyContainerCompression compr : CopyContainerCompression.values()) {
+      TarContainerPacker packer = new TarContainerPacker(compr);
 
       //export the container
       try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
@@ -364,7 +364,7 @@ public class TestKeyValueContainer {
 
     AtomicReference<String> failed = new AtomicReference<>();
 
-    TarContainerPacker packer = new TarContainerPacker();
+    TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION);
     List<Thread> threads = IntStream.range(0, 20)
         .mapToObj(i -> new Thread(() -> {
           try {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index 61110023e2..0b1757155f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
@@ -94,8 +93,8 @@ public class TestTarContainerPacker {
   private final String schemaVersion;
   private OzoneConfiguration conf;
 
-  public TestTarContainerPacker(
-      ContainerTestVersionInfo versionInfo, String compression) {
+  public TestTarContainerPacker(ContainerTestVersionInfo versionInfo,
+      CopyContainerCompression compression) {
     this.layout = versionInfo.getLayout();
     this.schemaVersion = versionInfo.getSchemaVersion();
     this.conf = new OzoneConfiguration();
@@ -110,10 +109,8 @@ public class TestTarContainerPacker {
         ContainerTestVersionInfo.getLayoutList();
     List<Object[]> parameterList = new ArrayList<>();
     for (ContainerTestVersionInfo containerTestVersionInfo : layoutList) {
-      for (Map.Entry<CopyContainerCompression, String> entry :
-          CopyContainerCompression.getCompressionMapping().entrySet()) {
-        parameterList.add(
-            new Object[]{containerTestVersionInfo, entry.getValue()});
+      for (CopyContainerCompression compr : CopyContainerCompression.values()) {
+        parameterList.add(new Object[]{containerTestVersionInfo, compr});
       }
     }
     return parameterList;
@@ -170,7 +167,7 @@ public class TestTarContainerPacker {
   }
 
   @Test
-  public void pack() throws IOException, CompressorException {
+  public void pack() throws IOException {
     //GIVEN
     KeyValueContainerData sourceContainerData =
         createContainer(SOURCE_CONTAINER_ROOT);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
index e342ffb338..fe1b9f785b 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
@@ -207,7 +207,7 @@ abstract class GrpcOutputStreamTest<T> {
     return bytes;
   }
 
-  private static byte[] getRandomBytes(int size) {
+  static byte[] getRandomBytes(int size) {
     byte[] bytes = new byte[size];
     RND.nextBytes(bytes);
     return bytes;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerCompression.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerCompression.java
new file mode 100644
index 0000000000..d50569c793
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerCompression.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.commons.io.IOUtils.readFully;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPLICATION_COMPRESSION;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.fromProto;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.getDefaultCompression;
+import static org.apache.hadoop.ozone.container.replication.GrpcOutputStreamTest.getRandomBytes;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+/**
+ * Test for {@link CopyContainerCompression}.
+ */
+class TestCopyContainerCompression {
+
+  @ParameterizedTest
+  @EnumSource
+  void protoConversion(CopyContainerCompression compression) {
+    assertEquals(compression, fromProto(compression.toProto()));
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void getConfReturnsValidSetting(CopyContainerCompression compression) {
+    MutableConfigurationSource conf = new OzoneConfiguration();
+    compression.setOn(conf);
+
+    assertEquals(compression, CopyContainerCompression.getConf(conf));
+  }
+
+  @Test
+  void getConfReturnsDefaultForUnknown() {
+    MutableConfigurationSource conf = new OzoneConfiguration();
+    conf.set(HDDS_CONTAINER_REPLICATION_COMPRESSION, "garbage");
+
+    assertEquals(getDefaultCompression(),
+        CopyContainerCompression.getConf(conf));
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void testInputOutput(CopyContainerCompression compression) throws Exception {
+    byte[] original = getRandomBytes(16);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    try (OutputStream compressed = compression.wrap(out)) {
+      compressed.write(original);
+    }
+
+    byte[] written = out.toByteArray();
+    if (compression == CopyContainerCompression.NO_COMPRESSION) {
+      assertArrayEquals(original, written);
+    } else {
+      assertNotEquals(original, written);
+    }
+
+    ByteArrayInputStream input = new ByteArrayInputStream(written);
+    try (InputStream uncompressed = compression.wrap(input)) {
+      byte[] read = new byte[original.length];
+      readFully(uncompressed, read);
+      assertArrayEquals(original, read);
+      assertEquals(0, uncompressed.available());
+    }
+  }
+
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java
index 791a0caa14..b678f6e404 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
 import org.apache.ozone.test.SpyOutputStream;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.api.Timeout;
 import org.mockito.ArgumentCaptor;
 
@@ -47,16 +51,26 @@ import static org.mockito.Mockito.when;
 @Timeout(30)
 class TestPushReplicator {
 
-  @Test
-  void uploadCompletesNormally() throws IOException {
+  private OzoneConfiguration conf;
+
+  @BeforeEach
+  void setup() {
+    conf = new OzoneConfiguration();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void uploadCompletesNormally(CopyContainerCompression compression)
+      throws IOException {
     // GIVEN
+    compression.setOn(conf);
     long containerID = randomContainerID();
     DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails();
     Consumer<CompletableFuture<Void>> completion =
         fut -> fut.complete(null);
     SpyOutputStream output = new SpyOutputStream(NULL_OUTPUT_STREAM);
     ContainerReplicator subject = createSubject(containerID, target,
-        output, completion);
+        output, completion, compression);
     ReplicationTask task = new ReplicationTask(toTarget(containerID, target),
         subject);
 
@@ -78,7 +92,7 @@ class TestPushReplicator {
     Consumer<CompletableFuture<Void>> completion =
         fut -> fut.completeExceptionally(new Exception("testing"));
     ContainerReplicator subject = createSubject(containerID, target,
-        output, completion);
+        output, completion, NO_COMPRESSION);
     ReplicationTask task = new ReplicationTask(toTarget(containerID, target),
         subject);
 
@@ -100,7 +114,7 @@ class TestPushReplicator {
       throw new RuntimeException();
     };
     ContainerReplicator subject = createSubject(containerID, target,
-        output, completion);
+        output, completion, NO_COMPRESSION);
     ReplicationTask task = new ReplicationTask(toTarget(containerID, target),
         subject);
 
@@ -116,25 +130,35 @@ class TestPushReplicator {
     return ThreadLocalRandom.current().nextLong();
   }
 
-  private static ContainerReplicator createSubject(
+  private ContainerReplicator createSubject(
       long containerID, DatanodeDetails target, OutputStream outputStream,
-      Consumer<CompletableFuture<Void>> completion) throws IOException {
+      Consumer<CompletableFuture<Void>> completion,
+      CopyContainerCompression compression
+  ) throws IOException {
     ContainerReplicationSource source = mock(ContainerReplicationSource.class);
     ContainerUploader uploader = mock(ContainerUploader.class);
-    ArgumentCaptor<CompletableFuture<Void>> captor =
+    ArgumentCaptor<CompletableFuture<Void>> futureArgument =
         ArgumentCaptor.forClass(CompletableFuture.class);
+    ArgumentCaptor<CopyContainerCompression> compressionArgument =
+        ArgumentCaptor.forClass(CopyContainerCompression.class);
 
-    when(uploader.startUpload(eq(containerID), eq(target), captor.capture()))
+    when(
+        uploader.startUpload(eq(containerID), eq(target),
+            futureArgument.capture(), compressionArgument.capture()
+        ))
         .thenReturn(outputStream);
 
     doAnswer(invocation -> {
-      completion.accept(captor.getValue());
+      compressionArgument.getAllValues().forEach(
+          c -> assertEquals(compression, c)
+      );
+      completion.accept(futureArgument.getValue());
       return null;
     })
         .when(source)
-        .copyData(eq(containerID), any(), eq(NO_COMPRESSION.name()));
+        .copyData(eq(containerID), any(), compressionArgument.capture());
 
-    return new PushReplicator(source, uploader);
+    return new PushReplicator(conf, source, uploader);
   }
 
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index b598698945..167010923a 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -285,7 +285,7 @@ public class TestReplicationSupervisor {
     Path res = Paths.get("file:/tmp/no-such-file");
     Mockito.when(
         moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList(),
-            Mockito.any(Path.class)))
+            Mockito.any(Path.class), Mockito.any()))
         .thenReturn(res);
 
     final String testDir = GenericTestUtils.getTempPath(
@@ -296,9 +296,9 @@ public class TestReplicationSupervisor {
         .thenReturn(Collections.singletonList(
             new HddsVolume.Builder(testDir).conf(conf).build()));
     ContainerImporter importer =
-        new ContainerImporter(conf, set, null, null, volumeSet);
+        new ContainerImporter(conf, set, null, volumeSet);
     ContainerReplicator replicator =
-        new DownloadAndImportReplicator(set, importer, moc);
+        new DownloadAndImportReplicator(conf, set, importer, moc);
 
     replicatorRef.set(replicator);
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java
index ee1ad73b5d..747a4d2935 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java
@@ -19,10 +19,14 @@ package org.apache.hadoop.ozone.container.replication;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.OutputStream;
 
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.verify;
 
 /**
  * Test for {@link SendContainerOutputStream}.
@@ -37,7 +41,28 @@ class TestSendContainerOutputStream
   @Override
   protected OutputStream createSubject() {
     return new SendContainerOutputStream(getObserver(),
-        getContainerId(), getBufferSize());
+        getContainerId(), getBufferSize(), NO_COMPRESSION);
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void usesCompression(CopyContainerCompression compression) throws Exception {
+    OutputStream subject = new SendContainerOutputStream(
+        getObserver(), getContainerId(), getBufferSize(), compression);
+
+    byte[] bytes = getRandomBytes(16);
+    subject.write(bytes, 0, bytes.length);
+    subject.close();
+
+    SendContainerRequest req = SendContainerRequest.newBuilder()
+        .setContainerID(getContainerId())
+        .setOffset(0)
+        .setData(ByteString.copyFrom(bytes))
+        .setCompression(compression.toProto())
+        .build();
+
+    verify(getObserver()).onNext(req);
+    verify(getObserver()).onCompleted();
   }
 
   protected ByteString verifyPart(SendContainerRequest response,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
index 42d44a7c23..ad90f3cdfb 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
@@ -44,6 +44,8 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Timeout;
 
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+
 /**
  * Test SimpleContainerDownloader.
  */
@@ -66,7 +68,7 @@ public class TestSimpleContainerDownloader {
     //WHEN
     final Path result =
         downloader.getContainerDataFromReplicas(1L, datanodes,
-            tempDir.newFolder().toPath());
+            tempDir.newFolder().toPath(), NO_COMPRESSION);
 
     //THEN
     Assertions.assertEquals(datanodes.get(0).getUuidString(),
@@ -86,7 +88,7 @@ public class TestSimpleContainerDownloader {
     //WHEN
     final Path result =
         downloader.getContainerDataFromReplicas(1L, datanodes,
-            tempDir.newFolder().toPath());
+            tempDir.newFolder().toPath(), NO_COMPRESSION);
 
     //THEN
     //first datanode is failed, second worked
@@ -106,7 +108,7 @@ public class TestSimpleContainerDownloader {
     //WHEN
     final Path result =
         downloader.getContainerDataFromReplicas(1L, datanodes,
-            tempDir.newFolder().toPath());
+            tempDir.newFolder().toPath(), NO_COMPRESSION);
 
     //THEN
     //first datanode is failed, second worked
@@ -130,8 +132,8 @@ public class TestSimpleContainerDownloader {
 
           @Override
           protected CompletableFuture<Path> downloadContainer(
-              long containerId, DatanodeDetails datanode, Path downloadPath
-          ) {
+              long containerId, DatanodeDetails datanode, Path downloadPath,
+              CopyContainerCompression compression) {
             //download is always successful.
             return CompletableFuture
                 .completedFuture(Paths.get(datanode.getUuidString()));
@@ -142,7 +144,7 @@ public class TestSimpleContainerDownloader {
     //returned.
     for (int i = 0; i < 10000; i++) {
       Path path = downloader.getContainerDataFromReplicas(1L, datanodes,
-          tempDir.newFolder().toPath());
+          tempDir.newFolder().toPath(), NO_COMPRESSION);
       if (path.toString().equals(datanodes.get(1).getUuidString())) {
         return;
       }
@@ -183,8 +185,8 @@ public class TestSimpleContainerDownloader {
 
       @Override
       protected CompletableFuture<Path> downloadContainer(
-          long containerId, DatanodeDetails datanode, Path downloadPath
-      ) {
+          long containerId, DatanodeDetails datanode, Path downloadPath,
+          CopyContainerCompression compression) {
 
         if (datanodes.contains(datanode)) {
           if (directException) {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index 85c7bc0f08..3ff347fbdf 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachin
 import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicationSource;
 import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
@@ -70,6 +69,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+
 /**
  * Tests upgrading a single datanode from pre-SCM HA volume format that used
  * SCM ID to the post-SCM HA volume format using cluster ID. If SCM HA was
@@ -655,7 +656,7 @@ public class TestDatanodeUpgradeToScmHA {
 
     File destination = tempFolder.newFile();
     try (FileOutputStream fos = new FileOutputStream(destination)) {
-      replicationSource.copyData(containerId, fos, "NO_COMPRESSION");
+      replicationSource.copyData(containerId, fos, NO_COMPRESSION);
     }
     return destination;
   }
@@ -669,13 +670,14 @@ public class TestDatanodeUpgradeToScmHA {
         new ContainerImporter(dsm.getConf(),
             dsm.getContainer().getContainerSet(),
             dsm.getContainer().getController(),
-            new TarContainerPacker(), dsm.getContainer().getVolumeSet());
+            dsm.getContainer().getVolumeSet());
 
     File tempFile = tempFolder.newFile(
         ContainerUtils.getContainerTarName(containerID));
     Files.copy(source.toPath(), tempFile.toPath(),
         StandardCopyOption.REPLACE_EXISTING);
-    replicator.importContainer(containerID, tempFile.toPath(), null);
+    replicator.importContainer(containerID, tempFile.toPath(), null,
+        NO_COMPRESSION);
   }
 
   public void dispatchRequest(
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 7312fd6a9e..718e2a108c 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -507,6 +507,7 @@ message SendContainerRequest {
   required uint64 offset = 2;
   required bytes data = 3;
   optional int64 checksum = 4;
+  optional CopyContainerCompressProto compression = 5;
 }
 
 message SendContainerResponse {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index ad96d4fa4b..551cf9aef0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -80,6 +80,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATI
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
 
 /**
  * Tests ozone containers via secure grpc/netty.
@@ -271,7 +272,7 @@ public class TestOzoneContainerWithTLS {
       SimpleContainerDownloader downloader =
           new SimpleContainerDownloader(conf, caClient);
       Path file = downloader.getContainerDataFromReplicas(
-          containerId, sourceDatanodes, null);
+          containerId, sourceDatanodes, null, NO_COMPRESSION);
       downloader.close();
       Assert.assertNull(file);
       Assert.assertTrue(logCapture.getOutput().contains(
@@ -309,7 +310,7 @@ public class TestOzoneContainerWithTLS {
         downloader = new SimpleContainerDownloader(conf, caClient);
         try {
           file = downloader.getContainerDataFromReplicas(cId, sourceDatanodes,
-                  null);
+                  null, NO_COMPRESSION);
           downloader.close();
           Assert.assertNotNull(file);
         } finally {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
index c5e37a1867..d0337f6534 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
@@ -31,8 +31,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.util.concurrent.Callable;
 
-import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
 
 /**
  * Handles {@code ozone debug container export} command.
@@ -76,7 +76,7 @@ public class ExportSubcommand implements Callable<Void> {
           new File(destination, "container-" + containerId + ".tar");
       try (FileOutputStream fos = new FileOutputStream(destinationFile)) {
         try {
-          replicationSource.copyData(containerId, fos, GZIP);
+          replicationSource.copyData(containerId, fos, NO_COMPRESSION);
         } catch (StorageContainerException e) {
           if (e.getResult() == CONTAINER_NOT_FOUND) {
             continue;
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 1abd8aa273..6d5ae0b988 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
@@ -206,8 +205,8 @@ public class ClosedContainerReplicator extends BaseFreonGenerator implements
         new ContainerController(containerSet, handlers);
 
     ContainerImporter importer = new ContainerImporter(conf, containerSet,
-        controller, new TarContainerPacker(), null);
-    replicator = new DownloadAndImportReplicator(containerSet, importer,
+        controller, null);
+    replicator = new DownloadAndImportReplicator(conf, containerSet, importer,
         new SimpleContainerDownloader(conf, null));
 
     ReplicationServer.ReplicationConfig replicationConfig


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org