You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2023/02/01 21:33:27 UTC
[ozone] branch master updated: HDDS-7821. Let push replication use compression from configuration (#4229)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1a796f1b54 HDDS-7821. Let push replication use compression from configuration (#4229)
1a796f1b54 is described below
commit 1a796f1b54610efd86d5aba0a91e8ae4d32e4af5
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Wed Feb 1 22:33:21 2023 +0100
HDDS-7821. Let push replication use compression from configuration (#4229)
---
.../hadoop/hdds/conf/ConfigurationTarget.java | 4 +
hadoop-hdds/container-service/pom.xml | 4 +
.../common/statemachine/DatanodeStateMachine.java | 7 +-
.../container/keyvalue/TarContainerPacker.java | 41 ++------
.../ozone/container/ozoneimpl/OzoneContainer.java | 3 +-
.../container/replication/ContainerDownloader.java | 3 +-
.../container/replication/ContainerImporter.java | 9 +-
.../replication/ContainerReplicationSource.java | 3 +-
.../container/replication/ContainerUploader.java | 3 +-
.../replication/CopyContainerCompression.java | 103 +++++++++++++++------
.../replication/DownloadAndImportReplicator.java | 10 +-
.../replication/GrpcContainerUploader.java | 9 +-
.../replication/GrpcReplicationClient.java | 10 +-
.../replication/GrpcReplicationService.java | 6 +-
.../OnDemandContainerReplicationSource.java | 20 +---
.../container/replication/PushReplicator.java | 13 +--
.../replication/SendContainerOutputStream.java | 7 +-
.../replication/SendContainerRequestHandler.java | 4 +-
.../replication/SimpleContainerDownloader.java | 12 +--
.../container/keyvalue/TestKeyValueContainer.java | 10 +-
.../container/keyvalue/TestTarContainerPacker.java | 13 +--
.../replication/GrpcOutputStreamTest.java | 2 +-
.../replication/TestCopyContainerCompression.java | 95 +++++++++++++++++++
.../container/replication/TestPushReplicator.java | 48 +++++++---
.../replication/TestReplicationSupervisor.java | 6 +-
.../replication/TestSendContainerOutputStream.java | 27 +++++-
.../replication/TestSimpleContainerDownloader.java | 18 ++--
.../upgrade/TestDatanodeUpgradeToScmHA.java | 10 +-
.../src/main/proto/DatanodeClientProtocol.proto | 1 +
.../ozoneimpl/TestOzoneContainerWithTLS.java | 5 +-
.../ozone/debug/container/ExportSubcommand.java | 4 +-
.../ozone/freon/ClosedContainerReplicator.java | 5 +-
32 files changed, 344 insertions(+), 171 deletions(-)
diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
index b6be89cd0c..d4bfb360b9 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
@@ -44,6 +44,10 @@ public interface ConfigurationTarget {
set(name, Boolean.toString(value));
}
+ default <T extends Enum<T>> void setEnum(String name, T value) {
+ set(name, value.name());
+ }
+
default void setTimeDuration(String name, long value, TimeUnit unit) {
set(name, value + ParsedTimeDuration.unitFor(unit).suffix());
}
diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml
index 21d4b755d9..1144b6018a 100644
--- a/hadoop-hdds/container-service/pom.xml
+++ b/hadoop-hdds/container-service/pom.xml
@@ -140,6 +140,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
</dependencies>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 69b22885e3..01d114245a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Repl
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
@@ -181,12 +180,12 @@ public class DatanodeStateMachine implements Closeable {
ContainerImporter importer = new ContainerImporter(conf,
container.getContainerSet(),
container.getController(),
- new TarContainerPacker(), container.getVolumeSet());
+ container.getVolumeSet());
ContainerReplicator pullReplicator = new DownloadAndImportReplicator(
- container.getContainerSet(),
+ conf, container.getContainerSet(),
importer,
new SimpleContainerDownloader(conf, dnCertClient));
- ContainerReplicator pushReplicator = new PushReplicator(
+ ContainerReplicator pushReplicator = new PushReplicator(conf,
new OnDemandContainerReplicationSource(container.getController()),
new GrpcContainerUploader(conf, dnCertClient)
);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index f5763958a1..77eb825b39 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -30,7 +30,6 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.stream.Stream;
-import java.util.Objects;
import com.google.common.annotations.VisibleForTesting;
@@ -40,18 +39,16 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
-
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
+import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
@@ -69,15 +66,9 @@ public class TarContainerPacker
static final String CONTAINER_FILE_NAME = "container.yaml";
- private final String compression;
-
- private static final String NO_COMPRESSION = "no_compression";
+ private final CopyContainerCompression compression;
- public TarContainerPacker() {
- this.compression = NO_COMPRESSION;
- }
-
- public TarContainerPacker(String compression) {
+ public TarContainerPacker(CopyContainerCompression compression) {
this.compression = compression;
}
@@ -175,10 +166,6 @@ public class TarContainerPacker
includePath(Paths.get(containerData.getChunksPath()), CHUNKS_DIR_NAME,
archiveOutput);
- } catch (CompressorException e) {
- throw new IOException(
- "Can't compress the container: " + containerData.getContainerID(),
- e);
}
}
@@ -195,10 +182,6 @@ public class TarContainerPacker
}
entry = archiveInput.getNextEntry();
}
- } catch (CompressorException e) {
- throw new IOException(
- "Can't read the container descriptor from the container archive",
- e);
}
throw new IOException(
@@ -293,19 +276,13 @@ public class TarContainerPacker
}
@VisibleForTesting
- InputStream decompress(InputStream input)
- throws CompressorException {
- return Objects.equals(compression, NO_COMPRESSION) ?
- input : new CompressorStreamFactory()
- .createCompressorInputStream(compression, input);
+ InputStream decompress(InputStream input) throws IOException {
+ return compression.wrap(input);
}
@VisibleForTesting
- OutputStream compress(OutputStream output)
- throws CompressorException {
- return Objects.equals(compression, NO_COMPRESSION) ?
- output : new CompressorStreamFactory()
- .createCompressorOutputStream(compression, output);
+ OutputStream compress(OutputStream output) throws IOException {
+ return compression.wrap(output);
}
private byte[] innerUnpack(InputStream input, Path dbRoot, Path chunksRoot)
@@ -337,10 +314,6 @@ public class TarContainerPacker
entry = archiveInput.getNextEntry();
}
return descriptorFileContent;
-
- } catch (CompressorException e) {
- throw new IOException("Can't uncompress to dbRoot: " + dbRoot +
- ", chunksRoot: " + chunksRoot, e);
}
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 3c63aaa799..b3db557b6f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
import org.apache.hadoop.ozone.container.replication.ContainerImporter;
@@ -208,7 +207,7 @@ public class OzoneContainer {
secConf,
certClient,
new ContainerImporter(conf, containerSet, controller,
- new TarContainerPacker(), volumeSet));
+ volumeSet));
readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, certClient);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
index 45c70f8e72..72ebef9ac9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
public interface ContainerDownloader extends Closeable {
Path getContainerDataFromReplicas(long containerId,
- List<DatanodeDetails> sources, Path downloadDir);
+ List<DatanodeDetails> sources, Path downloadDir,
+ CopyContainerCompression compression);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index 59316a8c8f..c4955931e1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -54,17 +54,15 @@ public class ContainerImporter {
private static final String CONTAINER_COPY_TMP_DIR = "tmp";
private final ContainerSet containerSet;
private final ContainerController controller;
- private final TarContainerPacker packer;
private final MutableVolumeSet volumeSet;
private final VolumeChoosingPolicy volumeChoosingPolicy;
private final long containerSize;
public ContainerImporter(ConfigurationSource conf, ContainerSet containerSet,
- ContainerController controller, TarContainerPacker tarContainerPacker,
+ ContainerController controller,
MutableVolumeSet volumeSet) {
this.containerSet = containerSet;
this.controller = controller;
- this.packer = tarContainerPacker;
this.volumeSet = volumeSet;
try {
volumeChoosingPolicy = conf.getClass(
@@ -79,7 +77,8 @@ public class ContainerImporter {
}
public void importContainer(long containerID, Path tarFilePath,
- HddsVolume hddsVolume) throws IOException {
+ HddsVolume hddsVolume, CopyContainerCompression compression)
+ throws IOException {
HddsVolume targetVolume = hddsVolume;
if (targetVolume == null) {
@@ -88,6 +87,8 @@ public class ContainerImporter {
try {
KeyValueContainerData containerData;
+ TarContainerPacker packer = new TarContainerPacker(compression);
+
try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
byte[] containerDescriptorYaml =
packer.unpackContainerDescriptor(input);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
index 72f32b2089..58c7521a22 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
@@ -44,7 +44,8 @@ public interface ContainerReplicationSource {
* @param compression Compression algorithm.
* @throws IOException
*/
- void copyData(long containerId, OutputStream destination, String compression)
+ void copyData(long containerId, OutputStream destination,
+ CopyContainerCompression compression)
throws IOException;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
index cb9264f2b5..c2b82633bd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
@@ -28,5 +28,6 @@ import java.util.concurrent.CompletableFuture;
*/
public interface ContainerUploader {
OutputStream startUpload(long containerId, DatanodeDetails target,
- CompletableFuture<Void> callback) throws IOException;
+ CompletableFuture<Void> callback, CopyContainerCompression compression)
+ throws IOException;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
index 67f0d6fc85..fc1ae676d0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
@@ -17,15 +17,17 @@
*/
package org.apache.hadoop.ozone.container.replication;
-import com.google.common.collect.ImmutableMap;
+import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.ConfigurationTarget;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPLICATION_COMPRESSION;
@@ -34,45 +36,94 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPLICATION_C
*/
public enum CopyContainerCompression {
- NO_COMPRESSION,
- GZIP,
- LZ4,
- SNAPPY,
- ZSTD;
+ NO_COMPRESSION("no_compression") {
+ @Override
+ public InputStream wrap(InputStream input) {
+ return input;
+ }
+
+ @Override
+ public OutputStream wrap(OutputStream output) {
+ return output;
+ }
+ },
+ GZIP(CompressorStreamFactory.GZIP),
+ LZ4(CompressorStreamFactory.LZ4_FRAMED),
+ SNAPPY(CompressorStreamFactory.SNAPPY_FRAMED),
+ ZSTD(CompressorStreamFactory.ZSTANDARD);
+
+ private final String compressorFactoryName;
+
+ CopyContainerCompression(String compressorFactoryName) {
+ this.compressorFactoryName = compressorFactoryName;
+ }
private static final Logger LOG =
LoggerFactory.getLogger(CopyContainerCompression.class);
private static final CopyContainerCompression DEFAULT_COMPRESSION =
CopyContainerCompression.NO_COMPRESSION;
- private static final Map<CopyContainerCompression, String>
- COMPRESSION_MAPPING = ImmutableMap.copyOf(getMapping());
-
- private static Map<CopyContainerCompression, String> getMapping() {
- return new HashMap<CopyContainerCompression, String>() { {
- put(NO_COMPRESSION, "no_compression");
- put(GZIP, CompressorStreamFactory.GZIP);
- put(LZ4, CompressorStreamFactory.LZ4_FRAMED);
- put(SNAPPY, CompressorStreamFactory.SNAPPY_FRAMED);
- put(ZSTD, CompressorStreamFactory.ZSTANDARD);
- }};
- }
-
- public static Map<CopyContainerCompression, String> getCompressionMapping() {
- return COMPRESSION_MAPPING;
- }
public static CopyContainerCompression getConf(ConfigurationSource conf) {
try {
return conf.getEnum(HDDS_CONTAINER_REPLICATION_COMPRESSION,
DEFAULT_COMPRESSION);
} catch (IllegalArgumentException e) {
- LOG.warn("Unsupported compression codec. Skip compression.");
+ LOG.warn("Unsupported compression codec {}, defaulting to {}",
+ conf.get(HDDS_CONTAINER_REPLICATION_COMPRESSION),
+ DEFAULT_COMPRESSION);
return DEFAULT_COMPRESSION;
}
}
+ public void setOn(ConfigurationTarget conf) {
+ conf.setEnum(HDDS_CONTAINER_REPLICATION_COMPRESSION, this);
+ }
+
public static CopyContainerCompression getDefaultCompression() {
return NO_COMPRESSION;
}
+
+ public ContainerProtos.CopyContainerCompressProto toProto() {
+ return ContainerProtos.CopyContainerCompressProto.valueOf(name());
+ }
+
+ public static CopyContainerCompression fromProto(
+ ContainerProtos.CopyContainerCompressProto proto) {
+ if (proto == null) {
+ return getDefaultCompression();
+ }
+
+ try {
+ return valueOf(proto.name());
+ } catch (IllegalArgumentException e) {
+ return getDefaultCompression();
+ }
+ }
+
+ public InputStream wrap(InputStream input) throws IOException {
+ try {
+ return new CompressorStreamFactory().createCompressorInputStream(
+ compressorFactoryName, input);
+ } catch (CompressorException e) {
+ throw toIOException(e);
+ }
+ }
+
+ public OutputStream wrap(OutputStream output) throws IOException {
+ try {
+ return new CompressorStreamFactory().createCompressorOutputStream(
+ compressorFactoryName, output);
+ } catch (CompressorException e) {
+ throw toIOException(e);
+ }
+ }
+
+ private static IOException toIOException(CompressorException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ return (IOException) cause;
+ }
+ return new IOException(e);
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 1c23d17475..5b5f954afd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -22,6 +22,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -44,14 +45,16 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
private final ContainerDownloader downloader;
private final ContainerImporter containerImporter;
private final ContainerSet containerSet;
+ private final CopyContainerCompression compression;
public DownloadAndImportReplicator(
- ContainerSet containerSet,
+ ConfigurationSource conf, ContainerSet containerSet,
ContainerImporter containerImporter,
ContainerDownloader downloader) {
this.containerSet = containerSet;
this.downloader = downloader;
this.containerImporter = containerImporter;
+ compression = CopyContainerCompression.getConf(conf);
}
@Override
@@ -74,7 +77,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
// downloads, so it's ok to block here and wait for the full download.
Path tarFilePath =
downloader.getContainerDataFromReplicas(containerID, sourceDatanodes,
- ContainerImporter.getUntarDirectory(targetVolume));
+ ContainerImporter.getUntarDirectory(targetVolume), compression);
if (tarFilePath == null) {
task.setStatus(Status.FAILED);
return;
@@ -84,7 +87,8 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
containerID, bytes);
task.setTransferredBytes(bytes);
- containerImporter.importContainer(containerID, tarFilePath, targetVolume);
+ containerImporter.importContainer(containerID, tarFilePath, targetVolume,
+ compression);
LOG.info("Container {} is replicated successfully", containerID);
task.setStatus(Status.DONE);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
index aa7322428e..0f8a8fdd71 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
@@ -42,26 +42,25 @@ public class GrpcContainerUploader implements ContainerUploader {
private final SecurityConfig securityConfig;
private final CertificateClient certClient;
- private final CopyContainerCompression compression;
public GrpcContainerUploader(
ConfigurationSource conf, CertificateClient certClient) {
this.certClient = certClient;
securityConfig = new SecurityConfig(conf);
- compression = CopyContainerCompression.getConf(conf);
}
@Override
public OutputStream startUpload(long containerId, DatanodeDetails target,
- CompletableFuture<Void> callback) throws IOException {
+ CompletableFuture<Void> callback, CopyContainerCompression compression)
+ throws IOException {
GrpcReplicationClient client =
new GrpcReplicationClient(target.getIpAddress(),
target.getPort(Port.Name.REPLICATION).getValue(),
- securityConfig, certClient, compression.toString());
+ securityConfig, certClient, compression);
StreamObserver<SendContainerRequest> requestStream = client.upload(
new SendContainerResponseStreamObserver(containerId, target, callback));
return new SendContainerOutputStream(requestStream, containerId,
- GrpcReplicationService.BUFFER_SIZE);
+ GrpcReplicationService.BUFFER_SIZE, compression);
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index be9a0ece48..7301a6a2c6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -27,7 +27,6 @@ import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
@@ -62,12 +61,12 @@ public class GrpcReplicationClient implements AutoCloseable {
private final IntraDatanodeProtocolServiceStub client;
- private final ContainerProtos.CopyContainerCompressProto compression;
+ private final CopyContainerCompression compression;
public GrpcReplicationClient(
String host, int port,
SecurityConfig secConfig, CertificateClient certClient,
- String compression)
+ CopyContainerCompression compression)
throws IOException {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(host, port)
@@ -92,8 +91,7 @@ public class GrpcReplicationClient implements AutoCloseable {
}
channel = channelBuilder.build();
client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
- this.compression =
- ContainerProtos.CopyContainerCompressProto.valueOf(compression);
+ this.compression = compression;
}
public CompletableFuture<Path> download(long containerId, Path dir) {
@@ -102,7 +100,7 @@ public class GrpcReplicationClient implements AutoCloseable {
.setContainerID(containerId)
.setLen(-1)
.setReadOffset(0)
- .setCompression(compression)
+ .setCompression(compression.toProto())
.build();
CompletableFuture<Path> response = new CompletableFuture<>();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index 533a3b40fe..9e8c2c12c9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -31,6 +31,8 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.fromProto;
+
/**
* Service to make containers available for replication.
*/
@@ -55,9 +57,7 @@ public class GrpcReplicationService extends
public void download(CopyContainerRequestProto request,
StreamObserver<CopyContainerResponseProto> responseObserver) {
long containerID = request.getContainerID();
- String compression = request.hasCompression() ?
- request.getCompression().toString() : CopyContainerCompression
- .getDefaultCompression().toString();
+ CopyContainerCompression compression = fromProto(request.getCompression());
LOG.info("Streaming container data ({}) to other datanode " +
"with compression {}", containerID, compression);
try {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
index d17e29b097..bf05a3ad4d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -19,13 +19,11 @@ package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
@@ -39,16 +37,9 @@ public class OnDemandContainerReplicationSource
private final ContainerController controller;
- private Map<String, TarContainerPacker> packer = new HashMap<>();
-
public OnDemandContainerReplicationSource(
ContainerController controller) {
this.controller = controller;
- for (Map.Entry<CopyContainerCompression, String> entry :
- CopyContainerCompression.getCompressionMapping().entrySet()) {
- packer.put(
- entry.getKey().toString(), new TarContainerPacker(entry.getValue()));
- }
}
@Override
@@ -58,7 +49,7 @@ public class OnDemandContainerReplicationSource
@Override
public void copyData(long containerId, OutputStream destination,
- String compression)
+ CopyContainerCompression compression)
throws IOException {
Container container = controller.getContainer(containerId);
@@ -68,13 +59,8 @@ public class OnDemandContainerReplicationSource
" is not found.", CONTAINER_NOT_FOUND);
}
- if (!packer.containsKey(compression)) {
- throw new IOException("Can't compress the container. Compression " +
- compression + " is not found.");
- }
controller.exportContainer(
container.getContainerType(), containerId, destination,
- packer.get(compression));
-
+ new TarContainerPacker(compression));
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
index 9a0f6f5a44..eb9aafff95 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.container.replication;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
@@ -27,8 +28,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
-import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
-
/**
* Pushes the container to the target datanode.
*/
@@ -39,11 +38,13 @@ public class PushReplicator implements ContainerReplicator {
private final ContainerReplicationSource source;
private final ContainerUploader uploader;
+ private final CopyContainerCompression compression;
- public PushReplicator(ContainerReplicationSource source,
- ContainerUploader uploader) {
+ public PushReplicator(ConfigurationSource conf,
+ ContainerReplicationSource source, ContainerUploader uploader) {
this.source = source;
this.uploader = uploader;
+ compression = CopyContainerCompression.getConf(conf);
}
@Override
@@ -57,8 +58,8 @@ public class PushReplicator implements ContainerReplicator {
CountingOutputStream output = null;
try {
output = new CountingOutputStream(
- uploader.startUpload(containerID, target, fut));
- source.copyData(containerID, output, NO_COMPRESSION.name());
+ uploader.startUpload(containerID, target, fut, compression));
+ source.copyData(containerID, output, compression);
fut.get();
task.setTransferredBytes(output.getByteCount());
task.setStatus(Status.DONE);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
index 622bd4f4a3..b24dcc10f5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
@@ -25,10 +25,14 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
* Output stream adapter for SendContainerResponse.
*/
class SendContainerOutputStream extends GrpcOutputStream<SendContainerRequest> {
+
+ private final CopyContainerCompression compression;
+
SendContainerOutputStream(
StreamObserver<SendContainerRequest> streamObserver,
- long containerId, int bufferSize) {
+ long containerId, int bufferSize, CopyContainerCompression compression) {
super(streamObserver, containerId, bufferSize);
+ this.compression = compression;
}
@Override
@@ -37,6 +41,7 @@ class SendContainerOutputStream extends GrpcOutputStream<SendContainerRequest> {
.setContainerID(getContainerId())
.setData(data)
.setOffset(getWrittenBytes())
+ .setCompression(compression.toProto())
.build();
getStreamObserver().onNext(request);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index b12aad452c..02a30986c4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -50,6 +50,7 @@ class SendContainerRequestHandler
private OutputStream output;
private HddsVolume volume;
private Path path;
+ private CopyContainerCompression compression;
SendContainerRequestHandler(
ContainerImporter importer,
@@ -74,6 +75,7 @@ class SendContainerRequestHandler
Files.createDirectories(dir);
path = dir.resolve(ContainerUtils.getContainerTarName(containerId));
output = Files.newOutputStream(path);
+ compression = CopyContainerCompression.fromProto(req.getCompression());
}
assertSame(containerId, req.getContainerID(), "containerID");
@@ -105,7 +107,7 @@ class SendContainerRequestHandler
closeOutput();
try {
- importer.importContainer(containerId, path, volume);
+ importer.importContainer(containerId, path, volume, compression);
LOG.info("Imported container {}", containerId);
responseObserver.onNext(SendContainerResponse.newBuilder().build());
responseObserver.onCompleted();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 0e81e512d3..2a6b6cbf86 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -51,19 +51,17 @@ public class SimpleContainerDownloader implements ContainerDownloader {
private final SecurityConfig securityConfig;
private final CertificateClient certClient;
- private final String compression;
public SimpleContainerDownloader(
ConfigurationSource conf, CertificateClient certClient) {
securityConfig = new SecurityConfig(conf);
this.certClient = certClient;
- this.compression = CopyContainerCompression.getConf(conf).toString();
}
@Override
public Path getContainerDataFromReplicas(
long containerId, List<DatanodeDetails> sourceDatanodes,
- Path downloadDir) {
+ Path downloadDir, CopyContainerCompression compression) {
if (downloadDir == null) {
downloadDir = Paths.get(System.getProperty("java.io.tmpdir"))
@@ -76,7 +74,7 @@ public class SimpleContainerDownloader implements ContainerDownloader {
for (DatanodeDetails datanode : shuffledDatanodes) {
try {
CompletableFuture<Path> result =
- downloadContainer(containerId, datanode, downloadDir);
+ downloadContainer(containerId, datanode, downloadDir, compression);
return result.get();
} catch (ExecutionException | IOException e) {
LOG.error("Error on replicating container: {} from {}/{}", containerId,
@@ -108,9 +106,9 @@ public class SimpleContainerDownloader implements ContainerDownloader {
}
@VisibleForTesting
- protected CompletableFuture<Path> downloadContainer(
- long containerId, DatanodeDetails datanode, Path downloadDir)
- throws IOException {
+ protected CompletableFuture<Path> downloadContainer(long containerId,
+ DatanodeDetails datanode, Path downloadDir,
+ CopyContainerCompression compression) throws IOException {
CompletableFuture<Path> result;
GrpcReplicationClient grpcReplicationClient =
new GrpcReplicationClient(datanode.getIpAddress(),
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 734b8d3193..835bbab7c6 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -80,6 +80,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
import static org.apache.ratis.util.Preconditions.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -194,7 +195,7 @@ public class TestKeyValueContainer {
//destination path
File exportTar = folder.newFile("exported.tar");
- TarContainerPacker packer = new TarContainerPacker();
+ TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION);
//export the container
try (FileOutputStream fos = new FileOutputStream(exportTar)) {
keyValueContainer.exportContainerData(fos, packer);
@@ -221,9 +222,8 @@ public class TestKeyValueContainer {
//destination path
File folderToExport = folder.newFile("exported.tar");
- for (Map.Entry<CopyContainerCompression, String> entry :
- CopyContainerCompression.getCompressionMapping().entrySet()) {
- TarContainerPacker packer = new TarContainerPacker(entry.getValue());
+ for (CopyContainerCompression compr : CopyContainerCompression.values()) {
+ TarContainerPacker packer = new TarContainerPacker(compr);
//export the container
try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
@@ -364,7 +364,7 @@ public class TestKeyValueContainer {
AtomicReference<String> failed = new AtomicReference<>();
- TarContainerPacker packer = new TarContainerPacker();
+ TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION);
List<Thread> threads = IntStream.range(0, 20)
.mapToObj(i -> new Thread(() -> {
try {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index 61110023e2..0b1757155f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
@@ -94,8 +93,8 @@ public class TestTarContainerPacker {
private final String schemaVersion;
private OzoneConfiguration conf;
- public TestTarContainerPacker(
- ContainerTestVersionInfo versionInfo, String compression) {
+ public TestTarContainerPacker(ContainerTestVersionInfo versionInfo,
+ CopyContainerCompression compression) {
this.layout = versionInfo.getLayout();
this.schemaVersion = versionInfo.getSchemaVersion();
this.conf = new OzoneConfiguration();
@@ -110,10 +109,8 @@ public class TestTarContainerPacker {
ContainerTestVersionInfo.getLayoutList();
List<Object[]> parameterList = new ArrayList<>();
for (ContainerTestVersionInfo containerTestVersionInfo : layoutList) {
- for (Map.Entry<CopyContainerCompression, String> entry :
- CopyContainerCompression.getCompressionMapping().entrySet()) {
- parameterList.add(
- new Object[]{containerTestVersionInfo, entry.getValue()});
+ for (CopyContainerCompression compr : CopyContainerCompression.values()) {
+ parameterList.add(new Object[]{containerTestVersionInfo, compr});
}
}
return parameterList;
@@ -170,7 +167,7 @@ public class TestTarContainerPacker {
}
@Test
- public void pack() throws IOException, CompressorException {
+ public void pack() throws IOException {
//GIVEN
KeyValueContainerData sourceContainerData =
createContainer(SOURCE_CONTAINER_ROOT);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
index e342ffb338..fe1b9f785b 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
@@ -207,7 +207,7 @@ abstract class GrpcOutputStreamTest<T> {
return bytes;
}
- private static byte[] getRandomBytes(int size) {
+ static byte[] getRandomBytes(int size) {
byte[] bytes = new byte[size];
RND.nextBytes(bytes);
return bytes;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerCompression.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerCompression.java
new file mode 100644
index 0000000000..d50569c793
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerCompression.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.commons.io.IOUtils.readFully;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPLICATION_COMPRESSION;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.fromProto;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.getDefaultCompression;
+import static org.apache.hadoop.ozone.container.replication.GrpcOutputStreamTest.getRandomBytes;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+/**
+ * Test for {@link CopyContainerCompression}.
+ */
+class TestCopyContainerCompression {
+
+ @ParameterizedTest
+ @EnumSource
+ void protoConversion(CopyContainerCompression compression) {
+ assertEquals(compression, fromProto(compression.toProto()));
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void getConfReturnsValidSetting(CopyContainerCompression compression) {
+ MutableConfigurationSource conf = new OzoneConfiguration();
+ compression.setOn(conf);
+
+ assertEquals(compression, CopyContainerCompression.getConf(conf));
+ }
+
+ @Test
+ void getConfReturnsDefaultForUnknown() {
+ MutableConfigurationSource conf = new OzoneConfiguration();
+ conf.set(HDDS_CONTAINER_REPLICATION_COMPRESSION, "garbage");
+
+ assertEquals(getDefaultCompression(),
+ CopyContainerCompression.getConf(conf));
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void testInputOutput(CopyContainerCompression compression) throws Exception {
+ byte[] original = getRandomBytes(16);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ try (OutputStream compressed = compression.wrap(out)) {
+ compressed.write(original);
+ }
+
+ byte[] written = out.toByteArray();
+ if (compression == CopyContainerCompression.NO_COMPRESSION) {
+ assertArrayEquals(original, written);
+ } else {
+ assertNotEquals(original, written);
+ }
+
+ ByteArrayInputStream input = new ByteArrayInputStream(written);
+ try (InputStream uncompressed = compression.wrap(input)) {
+ byte[] read = new byte[original.length];
+ readFully(uncompressed, read);
+ assertArrayEquals(original, read);
+ assertEquals(0, uncompressed.available());
+ }
+ }
+
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java
index 791a0caa14..b678f6e404 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.ozone.container.replication;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
import org.apache.ozone.test.SpyOutputStream;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
@@ -47,16 +51,26 @@ import static org.mockito.Mockito.when;
@Timeout(30)
class TestPushReplicator {
- @Test
- void uploadCompletesNormally() throws IOException {
+ private OzoneConfiguration conf;
+
+ @BeforeEach
+ void setup() {
+ conf = new OzoneConfiguration();
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void uploadCompletesNormally(CopyContainerCompression compression)
+ throws IOException {
// GIVEN
+ compression.setOn(conf);
long containerID = randomContainerID();
DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails();
Consumer<CompletableFuture<Void>> completion =
fut -> fut.complete(null);
SpyOutputStream output = new SpyOutputStream(NULL_OUTPUT_STREAM);
ContainerReplicator subject = createSubject(containerID, target,
- output, completion);
+ output, completion, compression);
ReplicationTask task = new ReplicationTask(toTarget(containerID, target),
subject);
@@ -78,7 +92,7 @@ class TestPushReplicator {
Consumer<CompletableFuture<Void>> completion =
fut -> fut.completeExceptionally(new Exception("testing"));
ContainerReplicator subject = createSubject(containerID, target,
- output, completion);
+ output, completion, NO_COMPRESSION);
ReplicationTask task = new ReplicationTask(toTarget(containerID, target),
subject);
@@ -100,7 +114,7 @@ class TestPushReplicator {
throw new RuntimeException();
};
ContainerReplicator subject = createSubject(containerID, target,
- output, completion);
+ output, completion, NO_COMPRESSION);
ReplicationTask task = new ReplicationTask(toTarget(containerID, target),
subject);
@@ -116,25 +130,35 @@ class TestPushReplicator {
return ThreadLocalRandom.current().nextLong();
}
- private static ContainerReplicator createSubject(
+ private ContainerReplicator createSubject(
long containerID, DatanodeDetails target, OutputStream outputStream,
- Consumer<CompletableFuture<Void>> completion) throws IOException {
+ Consumer<CompletableFuture<Void>> completion,
+ CopyContainerCompression compression
+ ) throws IOException {
ContainerReplicationSource source = mock(ContainerReplicationSource.class);
ContainerUploader uploader = mock(ContainerUploader.class);
- ArgumentCaptor<CompletableFuture<Void>> captor =
+ ArgumentCaptor<CompletableFuture<Void>> futureArgument =
ArgumentCaptor.forClass(CompletableFuture.class);
+ ArgumentCaptor<CopyContainerCompression> compressionArgument =
+ ArgumentCaptor.forClass(CopyContainerCompression.class);
- when(uploader.startUpload(eq(containerID), eq(target), captor.capture()))
+ when(
+ uploader.startUpload(eq(containerID), eq(target),
+ futureArgument.capture(), compressionArgument.capture()
+ ))
.thenReturn(outputStream);
doAnswer(invocation -> {
- completion.accept(captor.getValue());
+ compressionArgument.getAllValues().forEach(
+ c -> assertEquals(compression, c)
+ );
+ completion.accept(futureArgument.getValue());
return null;
})
.when(source)
- .copyData(eq(containerID), any(), eq(NO_COMPRESSION.name()));
+ .copyData(eq(containerID), any(), compressionArgument.capture());
- return new PushReplicator(source, uploader);
+ return new PushReplicator(conf, source, uploader);
}
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index b598698945..167010923a 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -285,7 +285,7 @@ public class TestReplicationSupervisor {
Path res = Paths.get("file:/tmp/no-such-file");
Mockito.when(
moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList(),
- Mockito.any(Path.class)))
+ Mockito.any(Path.class), Mockito.any()))
.thenReturn(res);
final String testDir = GenericTestUtils.getTempPath(
@@ -296,9 +296,9 @@ public class TestReplicationSupervisor {
.thenReturn(Collections.singletonList(
new HddsVolume.Builder(testDir).conf(conf).build()));
ContainerImporter importer =
- new ContainerImporter(conf, set, null, null, volumeSet);
+ new ContainerImporter(conf, set, null, volumeSet);
ContainerReplicator replicator =
- new DownloadAndImportReplicator(set, importer, moc);
+ new DownloadAndImportReplicator(conf, set, importer, moc);
replicatorRef.set(replicator);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java
index ee1ad73b5d..747a4d2935 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java
@@ -19,10 +19,14 @@ package org.apache.hadoop.ozone.container.replication;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.OutputStream;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.verify;
/**
* Test for {@link SendContainerOutputStream}.
@@ -37,7 +41,28 @@ class TestSendContainerOutputStream
@Override
protected OutputStream createSubject() {
return new SendContainerOutputStream(getObserver(),
- getContainerId(), getBufferSize());
+ getContainerId(), getBufferSize(), NO_COMPRESSION);
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void usesCompression(CopyContainerCompression compression) throws Exception {
+ OutputStream subject = new SendContainerOutputStream(
+ getObserver(), getContainerId(), getBufferSize(), compression);
+
+ byte[] bytes = getRandomBytes(16);
+ subject.write(bytes, 0, bytes.length);
+ subject.close();
+
+ SendContainerRequest req = SendContainerRequest.newBuilder()
+ .setContainerID(getContainerId())
+ .setOffset(0)
+ .setData(ByteString.copyFrom(bytes))
+ .setCompression(compression.toProto())
+ .build();
+
+ verify(getObserver()).onNext(req);
+ verify(getObserver()).onCompleted();
}
protected ByteString verifyPart(SendContainerRequest response,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
index 42d44a7c23..ad90f3cdfb 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
@@ -44,6 +44,8 @@ import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+
/**
* Test SimpleContainerDownloader.
*/
@@ -66,7 +68,7 @@ public class TestSimpleContainerDownloader {
//WHEN
final Path result =
downloader.getContainerDataFromReplicas(1L, datanodes,
- tempDir.newFolder().toPath());
+ tempDir.newFolder().toPath(), NO_COMPRESSION);
//THEN
Assertions.assertEquals(datanodes.get(0).getUuidString(),
@@ -86,7 +88,7 @@ public class TestSimpleContainerDownloader {
//WHEN
final Path result =
downloader.getContainerDataFromReplicas(1L, datanodes,
- tempDir.newFolder().toPath());
+ tempDir.newFolder().toPath(), NO_COMPRESSION);
//THEN
//first datanode is failed, second worked
@@ -106,7 +108,7 @@ public class TestSimpleContainerDownloader {
//WHEN
final Path result =
downloader.getContainerDataFromReplicas(1L, datanodes,
- tempDir.newFolder().toPath());
+ tempDir.newFolder().toPath(), NO_COMPRESSION);
//THEN
//first datanode is failed, second worked
@@ -130,8 +132,8 @@ public class TestSimpleContainerDownloader {
@Override
protected CompletableFuture<Path> downloadContainer(
- long containerId, DatanodeDetails datanode, Path downloadPath
- ) {
+ long containerId, DatanodeDetails datanode, Path downloadPath,
+ CopyContainerCompression compression) {
//download is always successful.
return CompletableFuture
.completedFuture(Paths.get(datanode.getUuidString()));
@@ -142,7 +144,7 @@ public class TestSimpleContainerDownloader {
//returned.
for (int i = 0; i < 10000; i++) {
Path path = downloader.getContainerDataFromReplicas(1L, datanodes,
- tempDir.newFolder().toPath());
+ tempDir.newFolder().toPath(), NO_COMPRESSION);
if (path.toString().equals(datanodes.get(1).getUuidString())) {
return;
}
@@ -183,8 +185,8 @@ public class TestSimpleContainerDownloader {
@Override
protected CompletableFuture<Path> downloadContainer(
- long containerId, DatanodeDetails datanode, Path downloadPath
- ) {
+ long containerId, DatanodeDetails datanode, Path downloadPath,
+ CopyContainerCompression compression) {
if (datanodes.contains(datanode)) {
if (directException) {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index 85c7bc0f08..3ff347fbdf 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachin
import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.replication.ContainerReplicationSource;
import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
@@ -70,6 +69,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+
/**
* Tests upgrading a single datanode from pre-SCM HA volume format that used
* SCM ID to the post-SCM HA volume format using cluster ID. If SCM HA was
@@ -655,7 +656,7 @@ public class TestDatanodeUpgradeToScmHA {
File destination = tempFolder.newFile();
try (FileOutputStream fos = new FileOutputStream(destination)) {
- replicationSource.copyData(containerId, fos, "NO_COMPRESSION");
+ replicationSource.copyData(containerId, fos, NO_COMPRESSION);
}
return destination;
}
@@ -669,13 +670,14 @@ public class TestDatanodeUpgradeToScmHA {
new ContainerImporter(dsm.getConf(),
dsm.getContainer().getContainerSet(),
dsm.getContainer().getController(),
- new TarContainerPacker(), dsm.getContainer().getVolumeSet());
+ dsm.getContainer().getVolumeSet());
File tempFile = tempFolder.newFile(
ContainerUtils.getContainerTarName(containerID));
Files.copy(source.toPath(), tempFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
- replicator.importContainer(containerID, tempFile.toPath(), null);
+ replicator.importContainer(containerID, tempFile.toPath(), null,
+ NO_COMPRESSION);
}
public void dispatchRequest(
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 7312fd6a9e..718e2a108c 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -507,6 +507,7 @@ message SendContainerRequest {
required uint64 offset = 2;
required bytes data = 3;
optional int64 checksum = 4;
+ optional CopyContainerCompressProto compression = 5;
}
message SendContainerResponse {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index ad96d4fa4b..551cf9aef0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -80,6 +80,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATI
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
/**
* Tests ozone containers via secure grpc/netty.
@@ -271,7 +272,7 @@ public class TestOzoneContainerWithTLS {
SimpleContainerDownloader downloader =
new SimpleContainerDownloader(conf, caClient);
Path file = downloader.getContainerDataFromReplicas(
- containerId, sourceDatanodes, null);
+ containerId, sourceDatanodes, null, NO_COMPRESSION);
downloader.close();
Assert.assertNull(file);
Assert.assertTrue(logCapture.getOutput().contains(
@@ -309,7 +310,7 @@ public class TestOzoneContainerWithTLS {
downloader = new SimpleContainerDownloader(conf, caClient);
try {
file = downloader.getContainerDataFromReplicas(cId, sourceDatanodes,
- null);
+ null, NO_COMPRESSION);
downloader.close();
Assert.assertNotNull(file);
} finally {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
index c5e37a1867..d0337f6534 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
@@ -31,8 +31,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.util.concurrent.Callable;
-import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
/**
* Handles {@code ozone debug container export} command.
@@ -76,7 +76,7 @@ public class ExportSubcommand implements Callable<Void> {
new File(destination, "container-" + containerId + ".tar");
try (FileOutputStream fos = new FileOutputStream(destinationFile)) {
try {
- replicationSource.copyData(containerId, fos, GZIP);
+ replicationSource.copyData(containerId, fos, NO_COMPRESSION);
} catch (StorageContainerException e) {
if (e.getResult() == CONTAINER_NOT_FOUND) {
continue;
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 1abd8aa273..6d5ae0b988 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
@@ -206,8 +205,8 @@ public class ClosedContainerReplicator extends BaseFreonGenerator implements
new ContainerController(containerSet, handlers);
ContainerImporter importer = new ContainerImporter(conf, containerSet,
- controller, new TarContainerPacker(), null);
- replicator = new DownloadAndImportReplicator(containerSet, importer,
+ controller, null);
+ replicator = new DownloadAndImportReplicator(conf, containerSet, importer,
new SimpleContainerDownloader(conf, null));
ReplicationServer.ReplicationConfig replicationConfig
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org