You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:46:09 UTC
[ozone] 20/27: continue the refactor
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit ec7d9220a319a5241d96c6df9a12b06c19e252a0
Author: Elek Márton <el...@apache.org>
AuthorDate: Fri Feb 19 10:40:00 2021 +0100
continue the refactor
---
hadoop-hdds/container-service/pom.xml | 15 +
.../common/interfaces/ContainerPacker.java | 59 ----
.../container/keyvalue/KeyValueContainer.java | 39 ---
.../container/keyvalue/TarContainerPacker.java | 217 ------------
.../ozone/container/ozoneimpl/OzoneContainer.java | 2 +-
.../replication/ContainerReplicationSource.java | 49 ---
.../replication/ContainerStreamingDestination.java | 26 ++
.../replication/ContainerStreamingSource.java | 48 +++
.../container/replication/GrpcOutputStream.java | 129 -------
.../replication/GrpcReplicationClient.java | 165 ---------
.../replication/GrpcReplicationService.java | 62 ----
.../replication/NullContainerDownloader.java | 16 -
.../OnDemandContainerReplicationSource.java | 63 ----
.../container/replication/ReplicationServer.java | 79 +----
.../replication/SimpleContainerDownloader.java | 104 +++---
.../container/stream/DirstreamClientHandler.java | 95 ++++++
.../container/stream/DirstreamServerHandler.java | 68 ++++
.../ozone/container/stream/StreamingClient.java | 62 ++++
.../container/stream/StreamingDestination.java | 9 +
.../ozone/container/stream/StreamingServer.java | 74 ++++
.../ozone/container/stream/StreamingSource.java | 10 +
.../container/keyvalue/TestKeyValueContainer.java | 36 --
.../container/keyvalue/TestTarContainerPacker.java | 373 ---------------------
.../replication/TestGrpcOutputStream.java | 213 ------------
.../replication/TestReplicationService.java | 139 +++++---
.../apache/hadoop/ozone/debug/ExportContainer.java | 185 ----------
26 files changed, 572 insertions(+), 1765 deletions(-)
diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml
index aaa5302..acb779c 100644
--- a/hadoop-hdds/container-service/pom.xml
+++ b/hadoop-hdds/container-service/pom.xml
@@ -43,6 +43,21 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdds-common</artifactId>
<type>test-jar</type>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
deleted file mode 100644
index a76e6f0..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.interfaces;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-
-/**
- * Service to pack/unpack ContainerData container data to/from a single byte
- * stream.
- */
-public interface ContainerPacker {
-
- /**
- * Extract the container data to the path defined by the container.
- * <p>
- * This doesn't contain the extraction of the container descriptor file.
- *
- * @return the byte content of the descriptor (which won't be written to a
- * file but returned).
- */
- byte[] unpackContainerData(
- KeyValueContainerData container,
- InputStream inputStream)
- throws IOException;
-
- /**
- * Compress all the container data (chunk data, metadata db AND container
- * descriptor) to one single archive.
- */
- void pack(KeyValueContainerData containerData, OutputStream destination)
- throws IOException;
-
- /**
- * Read the descriptor from the finished archive to get the data before
- * importing the container.
- */
- byte[] unpackContainerDescriptor(InputStream inputStream)
- throws IOException;
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 763f5ac..9785825 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.keyvalue;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Instant;
@@ -42,7 +41,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -618,43 +616,6 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
return checker.fullCheck(throttler, canceler);
}
- public void exportContainerData(
- OutputStream destination,
- ContainerPacker packer
- ) throws IOException {
- writeLock();
- try {
- // Closed/ Quasi closed containers are considered for replication by
- // replication manager if they are under-replicated.
- ContainerProtos.ContainerDataProto.State state =
- getContainerData().getState();
- if (!(state == ContainerProtos.ContainerDataProto.State.CLOSED ||
- state == ContainerDataProto.State.QUASI_CLOSED)) {
- throw new IllegalStateException(
- "Only (quasi)closed containers can be exported, but " +
- "ContainerId=" + getContainerData().getContainerID() +
- " is in state " + state);
- }
-
- try {
- compactDB();
- // Close DB (and remove from cache) to avoid concurrent modification
- // while packing it.
- BlockUtils.removeDB(containerData, config);
- } finally {
- readLock();
- writeUnlock();
- }
-
- packer.pack(containerData, destination);
- } finally {
- if (lock.isWriteLockedByCurrentThread()) {
- writeUnlock();
- } else {
- readUnlock();
- }
- }
- }
private enum ContainerCheckLevel {
NO_CHECK, FAST_CHECK, FULL_CHECK
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
deleted file mode 100644
index c745fdb9..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.stream.Stream;
-
-import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
-
-import static java.util.stream.Collectors.toList;
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.ArchiveInputStream;
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
-import org.apache.commons.io.IOUtils;
-
-/**
- * Compress/uncompress KeyValueContainer data to a tar.gz archive.
- */
-public class TarContainerPacker
- implements ContainerPacker {
-
- static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS;
-
- static final String DB_DIR_NAME = "db";
-
- private static final String CONTAINER_FILE_NAME = "container.yaml";
-
- /**
- * Given an input stream (tar file) extract the data to the specified
- * directories.
- *
- * @param containerData container which defines the destination structure.
- * @param input the input stream.
- */
- @Override
- public byte[] unpackContainerData(KeyValueContainerData containerData,
- InputStream input)
- throws IOException {
- byte[] descriptorFileContent = null;
- Path dbRoot = containerData.getDbFile().toPath();
- Path chunksRoot = Paths.get(containerData.getChunksPath());
-
- try (ArchiveInputStream archiveInput = untar(input)) {
-
- ArchiveEntry entry = archiveInput.getNextEntry();
- while (entry != null) {
- String name = entry.getName();
- long size = entry.getSize();
- if (name.startsWith(DB_DIR_NAME + "/")) {
- Path destinationPath = dbRoot
- .resolve(name.substring(DB_DIR_NAME.length() + 1));
- extractEntry(archiveInput, size, dbRoot, destinationPath);
- } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
- Path destinationPath = chunksRoot
- .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
- extractEntry(archiveInput, size, chunksRoot, destinationPath);
- } else if (CONTAINER_FILE_NAME.equals(name)) {
- //Don't do anything. Container file should be unpacked in a
- //separated step by unpackContainerDescriptor call.
- descriptorFileContent = readEntry(archiveInput, size);
- } else {
- throw new IllegalArgumentException(
- "Unknown entry in the tar file: " + "" + name);
- }
- entry = archiveInput.getNextEntry();
- }
- return descriptorFileContent;
- }
- }
-
- private void extractEntry(InputStream input, long size,
- Path ancestor, Path path) throws IOException {
- HddsUtils.validatePath(path, ancestor);
- Path parent = path.getParent();
- if (parent != null) {
- Files.createDirectories(parent);
- }
-
- try (OutputStream fileOutput = new FileOutputStream(path.toFile());
- OutputStream output = new BufferedOutputStream(fileOutput)) {
- int bufferSize = 1024;
- byte[] buffer = new byte[bufferSize + 1];
- long remaining = size;
- while (remaining > 0) {
- int len = (int) Math.min(remaining, bufferSize);
- int read = input.read(buffer, 0, len);
- if (read >= 0) {
- remaining -= read;
- output.write(buffer, 0, read);
- } else {
- remaining = 0;
- }
- }
- }
- }
-
- /**
- * Given a containerData include all the required container data/metadata
- * in a tar file.
- *
- * @param containerData Container to archive
- * @param output Destination tar file/stream.
- */
- @Override
- public void pack(KeyValueContainerData containerData,
- OutputStream output)
- throws IOException {
-
- try (ArchiveOutputStream archiveOutput = tar(output)) {
-
- includePath(containerData.getDbFile().toPath(), DB_DIR_NAME,
- archiveOutput);
-
- includePath(Paths.get(containerData.getChunksPath()), CHUNKS_DIR_NAME,
- archiveOutput);
-
- includeFile(containerData.getContainerFile(), CONTAINER_FILE_NAME,
- archiveOutput);
- }
-
- }
-
- @Override
- public byte[] unpackContainerDescriptor(InputStream input)
- throws IOException {
- try (ArchiveInputStream archiveInput = untar(input)) {
-
- ArchiveEntry entry = archiveInput.getNextEntry();
- while (entry != null) {
- String name = entry.getName();
- if (CONTAINER_FILE_NAME.equals(name)) {
- return readEntry(archiveInput, entry.getSize());
- }
- entry = archiveInput.getNextEntry();
- }
- }
- throw new IOException(
- "Container descriptor is missing from the container archive.");
- }
-
- private byte[] readEntry(InputStream input, final long size)
- throws IOException {
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- int bufferSize = 1024;
- byte[] buffer = new byte[bufferSize + 1];
- long remaining = size;
- while (remaining > 0) {
- int len = (int) Math.min(remaining, bufferSize);
- int read = input.read(buffer, 0, len);
- remaining -= read;
- output.write(buffer, 0, read);
- }
- return output.toByteArray();
- }
-
- private void includePath(Path dir, String subdir,
- ArchiveOutputStream archiveOutput) throws IOException {
-
- try (Stream<Path> dirEntries = Files.list(dir)) {
- for (Path path : dirEntries.collect(toList())) {
- String entryName = subdir + "/" + path.getFileName();
- includeFile(path.toFile(), entryName, archiveOutput);
- }
- }
- }
-
- static void includeFile(File file, String entryName,
- ArchiveOutputStream archiveOutput) throws IOException {
- ArchiveEntry entry = archiveOutput.createArchiveEntry(file, entryName);
- archiveOutput.putArchiveEntry(entry);
- try (InputStream input = new FileInputStream(file)) {
- IOUtils.copy(input, archiveOutput);
- }
- archiveOutput.closeArchiveEntry();
- }
-
- private static ArchiveInputStream untar(InputStream input) {
- return new TarArchiveInputStream(input);
- }
-
- private static ArchiveOutputStream tar(OutputStream output) {
- return new TarArchiveOutputStream(output);
- }
-
-
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 4ea2d6d..1d83524 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -148,7 +148,7 @@ public class OzoneContainer {
context);
replicationServer = new ReplicationServer(
- controller,
+ containerSet,
conf.getObject(ReplicationConfig.class),
secConf,
certClient);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
deleted file mode 100644
index 69582f7..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Contract to prepare provide the container in binary form..
- * <p>
- * Prepare will be called when container is closed. An implementation could
- * precache any binary representation of a container and store the pre packede
- * images.
- */
-public interface ContainerReplicationSource {
-
- /**
- * Prepare for the replication.
- *
- * @param containerId The name of the container the package.
- */
- void prepare(long containerId);
-
- /**
- * Copy the container data to an output stream.
- *
- * @param containerId Container to replicate
- * @param destination The destination stream to copy all the container data.
- * @throws IOException
- */
- void copyData(long containerId, OutputStream destination)
- throws IOException;
-
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
new file mode 100644
index 0000000..b0e60ac
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
@@ -0,0 +1,26 @@
+package org.apache.hadoop.ozone.container.replication;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.stream.StreamingDestination;
+
+public class ContainerStreamingDestination implements StreamingDestination {
+
+ private final KeyValueContainerData containerData;
+
+ public ContainerStreamingDestination(KeyValueContainerData containerData) {
+ this.containerData = containerData;
+ }
+
+ @Override
+ public Path mapToDestination(String name) {
+ String[] parts = name.split("/", 2);
+ if (parts[0].equals("DB")) {
+ return Paths.get(containerData.getContainerDBFile().getAbsolutePath()
+ , parts[1]);
+ }
+ throw new IllegalArgumentException("Unknown container part:" + parts[0]);
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
new file mode 100644
index 0000000..46fb767
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
@@ -0,0 +1,48 @@
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+public class ContainerStreamingSource implements StreamingSource {
+
+ private ContainerSet containerSet;
+
+ public ContainerStreamingSource(ContainerSet containerSet) {
+ this.containerSet = containerSet;
+ }
+
+ @Override
+ public Map<String, Path> getFilesToStream(String id) {
+
+ Map<String, Path> filesToStream = new HashMap<>();
+
+ final Long containerId = Long.valueOf(id);
+ final KeyValueContainer container =
+ (KeyValueContainer) containerSet.getContainer(containerId);
+ if (container == null) {
+ throw new IllegalArgumentException("No such container " + containerId);
+ }
+ final File dbPath =
+ container.getContainerData().getContainerDBFile();
+ try {
+ final List<Path> dbFiles =
+ Files.list(dbPath.toPath()).collect(Collectors.toList());
+ for (Path dbFile : dbFiles) {
+ filesToStream.put("DB/" + dbFile.getFileName().toString(), dbFile);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return filesToStream;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
deleted file mode 100644
index c09c8f6..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Adapter from {@code OutputStream} to gRPC {@code StreamObserver}.
- * Data is buffered in a limited buffer of the specified size.
- */
-class GrpcOutputStream extends OutputStream {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(GrpcOutputStream.class);
-
- private final StreamObserver<CopyContainerResponseProto> responseObserver;
-
- private final ByteString.Output buffer;
-
- private final long containerId;
-
- private final int bufferSize;
-
- private long writtenBytes;
-
- GrpcOutputStream(
- StreamObserver<CopyContainerResponseProto> responseObserver,
- long containerId, int bufferSize) {
- this.responseObserver = responseObserver;
- this.containerId = containerId;
- this.bufferSize = bufferSize;
- buffer = ByteString.newOutput(bufferSize);
- }
-
- @Override
- public void write(int b) {
- try {
- buffer.write(b);
- if (buffer.size() >= bufferSize) {
- flushBuffer(false);
- }
- } catch (Exception ex) {
- responseObserver.onError(ex);
- }
- }
-
- @Override
- public void write(@Nonnull byte[] data, int offset, int length) {
- if ((offset < 0) || (offset > data.length) || (length < 0) ||
- ((offset + length) > data.length) || ((offset + length) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (length == 0) {
- return;
- }
-
- try {
- if (buffer.size() >= bufferSize) {
- flushBuffer(false);
- }
-
- int remaining = length;
- int off = offset;
- int len = Math.min(remaining, bufferSize - buffer.size());
- while (remaining > 0) {
- buffer.write(data, off, len);
- if (buffer.size() >= bufferSize) {
- flushBuffer(false);
- }
- off += len;
- remaining -= len;
- len = Math.min(bufferSize, remaining);
- }
- } catch (Exception ex) {
- responseObserver.onError(ex);
- }
- }
-
- @Override
- public void close() throws IOException {
- flushBuffer(true);
- LOG.info("Sent {} bytes for container {}",
- writtenBytes, containerId);
- responseObserver.onCompleted();
- buffer.close();
- }
-
- private void flushBuffer(boolean eof) {
- int length = buffer.size();
- if (length > 0) {
- ByteString data = buffer.toByteString();
- LOG.debug("Sending {} bytes (of type {}) for container {}",
- length, data.getClass().getSimpleName(), containerId);
- CopyContainerResponseProto response =
- CopyContainerResponseProto.newBuilder()
- .setContainerID(containerId)
- .setData(data)
- .setEof(eof)
- .setReadOffset(writtenBytes)
- .setLen(length)
- .build();
- responseObserver.onNext(response);
- writtenBytes += length;
- buffer.reset();
- }
- }
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
deleted file mode 100644
index 89e23fe..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Path;
-import java.security.cert.X509Certificate;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
-import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-
-import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
-import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
-import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Client to read container data from gRPC.
- */
-public class GrpcReplicationClient implements AutoCloseable {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(GrpcReplicationClient.class);
-
- private final ManagedChannel channel;
-
- private final IntraDatanodeProtocolServiceStub client;
-
- private final Path workingDirectory;
-
- public GrpcReplicationClient(
- String host, int port, Path workingDir,
- SecurityConfig secConfig, X509Certificate caCert
- ) throws IOException {
- NettyChannelBuilder channelBuilder =
- NettyChannelBuilder.forAddress(host, port)
- .usePlaintext()
- .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
-
- if (secConfig.isSecurityEnabled()) {
- channelBuilder.useTransportSecurity();
-
- SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
- if (caCert != null) {
- sslContextBuilder.trustManager(caCert);
- }
-
- sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
- sslContextBuilder.keyManager(
- new File(secConfig.getCertificateFileName()),
- new File(secConfig.getPrivateKeyFileName()));
- if (secConfig.useTestCert()) {
- channelBuilder.overrideAuthority("localhost");
- }
- channelBuilder.sslContext(sslContextBuilder.build());
- }
- channel = channelBuilder.build();
- client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
- workingDirectory = workingDir;
- }
-
- public void download(
- KeyValueContainerData containerData,
- OutputStream outputStream
- ) {
- CopyContainerRequestProto request =
- CopyContainerRequestProto.newBuilder()
- .setContainerID(containerData.getContainerID())
- .setLen(-1)
- .setReadOffset(0)
- .build();
-
- client.download(request, new StreamDownloader(outputStream));
- }
-
- private Path getWorkingDirectory() {
- return workingDirectory;
- }
-
- public void shutdown() {
- channel.shutdown();
- try {
- channel.awaitTermination(5, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("failed to shutdown replication channel", e);
- }
- }
-
- @Override
- public void close() throws Exception {
- shutdown();
- }
-
- /**
- * gRPC stream observer to CompletableFuture adapter.
- */
- public static class StreamDownloader
- implements StreamObserver<CopyContainerResponseProto> {
-
- private final OutputStream outputStream;
-
- public StreamDownloader(
- OutputStream output
- ) {
- this.outputStream = output;
- }
-
- @Override
- public void onNext(CopyContainerResponseProto chunk) {
- try {
- chunk.getData().writeTo(outputStream);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- try {
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onCompleted() {
- try {
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- }
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
deleted file mode 100644
index 00bf844..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
-
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Service to make containers available for replication.
- */
-public class GrpcReplicationService extends
- IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceImplBase {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(GrpcReplicationService.class);
-
- private static final int BUFFER_SIZE = 1024 * 1024;
-
- private final ContainerReplicationSource source;
-
- public GrpcReplicationService(ContainerReplicationSource source) {
- this.source = source;
- }
-
- @Override
- public void download(CopyContainerRequestProto request,
- StreamObserver<CopyContainerResponseProto> responseObserver) {
- long containerID = request.getContainerID();
- LOG.info("Streaming container data ({}) to other datanode", containerID);
- try (GrpcOutputStream outputStream =
- new GrpcOutputStream(responseObserver, containerID, BUFFER_SIZE)) {
- source.copyData(containerID, outputStream);
- } catch (IOException e) {
- LOG.error("Error streaming container {}", containerID, e);
- responseObserver.onError(e);
- }
- }
-
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/NullContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/NullContainerDownloader.java
index 106fccb..1821663 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/NullContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/NullContainerDownloader.java
@@ -19,14 +19,10 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Path;
import java.security.cert.X509Certificate;
-import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import com.google.common.annotations.VisibleForTesting;
@@ -50,19 +46,7 @@ public class NullContainerDownloader extends SimpleContainerDownloader {
KeyValueContainerData preCreated,
DatanodeDetails datanode
) throws IOException {
- CompletableFuture<Path> result;
- GrpcReplicationClient grpcReplicationClient =
- new GrpcReplicationClient(datanode.getIpAddress(),
- datanode.getPort(Name.REPLICATION).getValue(),
- workingDirectory, securityConfig, caCert);
- OutputStream nullOutputStream = new OutputStream() {
- @Override
- public void write(int i) throws IOException {
- //
- }
- };
- grpcReplicationClient.download(preCreated, nullOutputStream);
return preCreated;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
deleted file mode 100644
index 474333e..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A naive implementation of the replication source which creates a tar file
- * on-demand without pre-create the compressed archives.
- */
-public class OnDemandContainerReplicationSource
- implements ContainerReplicationSource {
-
- private final ContainerController controller;
-
- private final TarContainerPacker packer = new TarContainerPacker();
-
- public OnDemandContainerReplicationSource(
- ContainerController controller) {
- this.controller = controller;
- }
-
- @Override
- public void prepare(long containerId) {
- // no pre-create in this implementation
- }
-
- @Override
- public void copyData(long containerId, OutputStream destination)
- throws IOException {
-
- KeyValueContainer container =
- (KeyValueContainer) controller.getContainer(containerId);
-
- Preconditions.checkNotNull(
- container, "Container is not found " + containerId);
-
- container.exportContainerData(destination, packer);
-
- }
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 1288b22..e5b0b33 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -17,26 +17,16 @@
*/
package org.apache.hadoop.ozone.container.replication;
-import javax.net.ssl.SSLException;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
-import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-
-import org.apache.ratis.thirdparty.io.grpc.Server;
-import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
-import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
-import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
-import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.stream.StreamingServer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,84 +38,48 @@ public class ReplicationServer {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationServer.class);
- private Server server;
-
private SecurityConfig secConf;
private CertificateClient caClient;
- private ContainerController controller;
+ private ContainerSet containerSet;
private int port;
+ private StreamingServer server;
+
public ReplicationServer(
- ContainerController controller,
+ ContainerSet containerSet,
ReplicationConfig replicationConfig,
SecurityConfig secConf,
CertificateClient caClient
) {
this.secConf = secConf;
this.caClient = caClient;
- this.controller = controller;
+ this.containerSet = containerSet;
this.port = replicationConfig.getPort();
init();
}
public void init() {
- NettyServerBuilder nettyServerBuilder =
- ((NettyServerBuilder) ServerBuilder.forPort(port))
- .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
-
- GrpcServerInterceptor tracingInterceptor = new GrpcServerInterceptor();
- nettyServerBuilder
- .addService(ServerInterceptors.intercept(new GrpcReplicationService(
- new OnDemandContainerReplicationSource(controller)
- ), tracingInterceptor));
-
- if (secConf.isSecurityEnabled()) {
- try {
- SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(
- caClient.getPrivateKey(), caClient.getCertificate());
-
- sslContextBuilder = GrpcSslContexts.configure(
- sslContextBuilder, secConf.getGrpcSslProvider());
-
- sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
- sslContextBuilder.trustManager(caClient.getCACertificate());
-
- nettyServerBuilder.sslContext(sslContextBuilder.build());
- } catch (SSLException ex) {
- throw new IllegalArgumentException(
- "Unable to setup TLS for secure datanode replication GRPC "
- + "endpoint.", ex);
- }
- }
-
- server = nettyServerBuilder.build();
+ server = new StreamingServer(new ContainerStreamingSource(containerSet),
+ this.port);
}
public void start() throws IOException {
- server.start();
-
- if (port == 0) {
- LOG.info("{} is started using port {}", getClass().getSimpleName(),
- server.getPort());
+ try {
+ server.start();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Couldn't start replication server", e);
}
-
- port = server.getPort();
-
}
public void stop() {
- try {
- server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
- } catch (InterruptedException ex) {
- LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());
- }
+ server.stop();
}
public int getPort() {
- return port;
+ return server.getPort();
}
@ConfigGroup(prefix = "hdds.datanode.replication")
@@ -145,5 +99,4 @@ public class ReplicationServer {
return this;
}
}
-
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 666a412..ac57896 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -19,26 +19,23 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.stream.StreamingClient;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +55,6 @@ public class SimpleContainerDownloader implements ContainerDownloader {
protected final Path workingDirectory;
protected final SecurityConfig securityConfig;
protected final X509Certificate caCert;
- protected TarContainerPacker packer = new TarContainerPacker();
public SimpleContainerDownloader(
ConfigurationSource conf,
@@ -124,50 +120,58 @@ public class SimpleContainerDownloader implements ContainerDownloader {
KeyValueContainerData preCreated,
DatanodeDetails datanode
) throws IOException {
- CompletableFuture<Path> result;
- GrpcReplicationClient grpcReplicationClient =
- new GrpcReplicationClient(datanode.getIpAddress(),
- datanode.getPort(Name.REPLICATION).getValue(),
- workingDirectory, securityConfig, caCert);
-
- PipedOutputStream outputStream = new PipedOutputStream();
-
- grpcReplicationClient.download(preCreated, outputStream);
- final byte[] descriptor = packer
- .unpackContainerData(preCreated, new PipedInputStream(outputStream));
-
- //parse descriptor
- //now, we have extracted the container descriptor from the previous
- //datanode. We can load it and upload it with the current data
- // (original metadata + current filepath fields)
- KeyValueContainerData replicated =
- (KeyValueContainerData) ContainerDataYaml
- .readContainer(descriptor);
-
- KeyValueContainerData updated = new KeyValueContainerData(
- replicated.getContainerID(),
- replicated.getLayOutVersion(),
- replicated.getMaxSize(),
- replicated.getOriginPipelineId(),
- replicated.getOriginNodeId());
-
- //inherited from the replicated
- updated
- .setState(replicated.getState());
- updated
- .setContainerDBType(replicated.getContainerDBType());
- updated
- .updateBlockCommitSequenceId(replicated.getBlockCommitSequenceId());
- updated
- .setSchemaVersion(replicated.getSchemaVersion());
-
- //inherited from the pre-created seed container
- updated.setMetadataPath(preCreated.getMetadataPath());
- updated.setDbFile(preCreated.getDbFile());
- updated.setChunksPath(preCreated.getChunksPath());
- updated.setVolume(preCreated.getVolume());
-
- return updated;
+ // CompletableFuture<Path> result;
+ //
+ try {
+ StreamingClient client =
+ new StreamingClient(datanode.getIpAddress(), datanode.getPort(
+ Name.REPLICATION).getValue(),
+ new ContainerStreamingDestination(preCreated));
+ final Channel channel = client.connect();
+ channel.writeAndFlush(preCreated.getContainerID() + "\n");
+ channel.closeFuture().sync().addListener(f -> {
+ LOG.info("Container " + preCreated.getContainerID()
+ + " is downloaded succesfully");
+ });
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+
+ //parse descriptor
+ //now, we have extracted the container descriptor from the previous
+ //datanode. We can load it and upload it with the current data
+ // (original metadata + current filepath fields)
+ KeyValueContainerData replicated =
+ (KeyValueContainerData) ContainerDataYaml
+ .readContainer(descriptor);
+
+ KeyValueContainerData updated = new KeyValueContainerData(
+ replicated.getContainerID(),
+ replicated.getLayOutVersion(),
+ replicated.getMaxSize(),
+ replicated.getOriginPipelineId(),
+ replicated.getOriginNodeId());
+
+ //inherited from the replicated
+ updated
+ .setState(replicated.getState());
+ updated
+ .setContainerDBType(replicated.getContainerDBType());
+ updated
+ .updateBlockCommitSequenceId(replicated
+ .getBlockCommitSequenceId());
+ updated
+ .setSchemaVersion(replicated.getSchemaVersion());
+
+ //inherited from the pre-created seed container
+ updated.setMetadataPath(preCreated.getMetadataPath());
+ updated.setDbFile(preCreated.getDbFile());
+ updated.setChunksPath(preCreated.getChunksPath());
+ updated.setVolume(preCreated.getVolume());
+
+ return updated;
+ return preCreated;
}
@Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
new file mode 100644
index 0000000..d6076e5
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
@@ -0,0 +1,95 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ByteProcessor;
+
+public class DirstreamClientHandler extends ChannelInboundHandlerAdapter {
+
+ private final StreamingDestination destination;
+ private boolean headerMode = true;
+ private StringBuilder currentFileName = new StringBuilder();
+ private RandomAccessFile destFile;
+
+ private FileChannel destFileChannel;
+
+ private long remaining;
+
+ public DirstreamClientHandler(StreamingDestination streamingDestination) {
+ this.destination = streamingDestination;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws IOException {
+ ByteBuf buffer = (ByteBuf) msg;
+
+ if (headerMode) {
+ int eolPosition = buffer.forEachByte(ByteProcessor.FIND_LF) - buffer
+ .readerIndex();
+ if (eolPosition > 0) {
+ headerMode = false;
+ currentFileName.append(buffer.readBytes(eolPosition)
+ .toString(Charset.defaultCharset()));
+ buffer.readBytes(1);
+ String[] parts = currentFileName.toString().split(" ", 2);
+ remaining = Long.parseLong(parts[0]);
+ System.out.println("Starting to write to " + parts[1]);
+ Path destFilePath = destination.mapToDestination(parts[1]);
+ Files.createDirectories(destFilePath.getParent());
+ this.destFile =
+ new RandomAccessFile(destFilePath.toFile(), "rw");
+ destFileChannel = this.destFile.getChannel();
+ } else {
+ currentFileName
+ .append(buffer.toString(Charset.defaultCharset()));
+ }
+ }
+ if (!headerMode) {
+ if (remaining > buffer.readableBytes()) {
+ remaining -=
+ buffer.readBytes(destFileChannel, buffer.readableBytes());
+ } else {
+ remaining -= buffer.readBytes(destFileChannel, (int) remaining);
+ currentFileName = new StringBuilder();
+ headerMode = true;
+ if (buffer.readableBytes() > 0) {
+ channelRead(ctx, buffer);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) {
+ try {
+ if (destFile != null) {
+ destFile.close();
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ try {
+ destFileChannel.close();
+ destFile.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ ctx.close();
+ }
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
new file mode 100644
index 0000000..4264f71
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
@@ -0,0 +1,68 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.DefaultFileRegion;
+
+public class DirstreamServerHandler extends ChannelInboundHandlerAdapter {
+
+ private StreamingSource source;
+
+ public DirstreamServerHandler(StreamingSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+ ChannelFuture future = null;
+
+ for (Map.Entry<String, Path> entries : source.getFilesToStream("1")
+ .entrySet()) {
+ Path file = entries.getValue();
+ String name = entries.getKey();
+
+ if (future == null) {
+ long fileSize = Files.size(file);
+ future = ctx.writeAndFlush(
+ fileSize + " " + name + "\n")
+ .addListener(f -> ctx.writeAndFlush(
+ new DefaultFileRegion(file.toFile(), 0, fileSize)));
+
+ } else {
+ long fileSize = Files.size(file);
+ future.addListener(f -> ctx.writeAndFlush(
+ fileSize + " " + file.getFileName().toString() + "\n"))
+ .addListener(f -> ctx.writeAndFlush(
+ new DefaultFileRegion(file.toFile(), 0, fileSize)));
+
+ }
+ }
+
+ future.addListener(f -> ctx.channel().close());
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ cause.printStackTrace();
+ if (ctx.channel().isActive()) {
+ ctx.writeAndFlush("ERR: " +
+ cause.getClass().getSimpleName() + ": " +
+ cause.getMessage() + '\n').addListener(
+ ChannelFutureListener.CLOSE);
+ }
+ ctx.close();
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
new file mode 100644
index 0000000..0676a74
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
@@ -0,0 +1,62 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.CharsetUtil;
+
+public class StreamingClient {
+
+ private static EventLoopGroup group;
+ private final Bootstrap b;
+ private ChannelFuture f;
+ private int port;
+ private String host;
+ private StreamingDestination streamingDestination;
+
+ public StreamingClient(
+ String host,
+ int port,
+ StreamingDestination streamingDestination
+ ) throws InterruptedException {
+ this.port = port;
+ this.host = host;
+ this.streamingDestination = streamingDestination;
+
+ group = new NioEventLoopGroup();
+
+ b = new Bootstrap();
+ b.group(group)
+ .channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new LoggingHandler(LogLevel.INFO),
+ new StringEncoder(CharsetUtil.UTF_8),
+ new DirstreamClientHandler(streamingDestination));
+ }
+ });
+
+ }
+
+ public Channel connect() throws InterruptedException {
+ f = b.connect(host, port).sync();
+ return f.channel();
+ }
+
+ public void close() throws InterruptedException {
+ f.channel().closeFuture().sync();
+ group.shutdownGracefully();
+ }
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java
new file mode 100644
index 0000000..63c3ea8
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java
@@ -0,0 +1,9 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.nio.file.Path;
+
+public interface StreamingDestination {
+
+ Path mapToDestination(String name);
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java
new file mode 100644
index 0000000..c81ce96
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java
@@ -0,0 +1,74 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.net.InetSocketAddress;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+
+public class StreamingServer {
+
+ private int port;
+
+ private StreamingSource source;
+
+ private EventLoopGroup bossGroup;
+
+ private EventLoopGroup workerGroup;
+
+ public StreamingServer(
+ StreamingSource source, int port
+ ) {
+ this.port = port;
+ this.source = source;
+ }
+
+ public void start() throws InterruptedException {
+ ServerBootstrap b = new ServerBootstrap();
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup();
+
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 100)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ new StringEncoder(CharsetUtil.UTF_8),
+ new LineBasedFrameDecoder(8192),
+ new StringDecoder(CharsetUtil.UTF_8),
+ new ChunkedWriteHandler(),
+ new DirstreamServerHandler(source));
+ }
+ });
+
+ ChannelFuture f = b.bind(port).sync();
+ final InetSocketAddress socketAddress =
+ (InetSocketAddress) f.channel().localAddress();
+ port = socketAddress.getPort();
+
+ }
+
+ public void stop() {
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+
+ public int getPort() {
+ return port;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java
new file mode 100644
index 0000000..c299eae
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java
@@ -0,0 +1,10 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+public interface StreamingSource {
+
+ Map<String, Path> getFilesToStream(String id);
+
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 6ca5e44..da03318 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -19,16 +19,10 @@
package org.apache.hadoop.ozone.container.keyvalue;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
@@ -55,7 +49,6 @@ import static org.apache.ratis.util.Preconditions.assertTrue;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Rule;
@@ -180,35 +173,6 @@ public class TestKeyValueContainer {
ContainerProtos.ContainerDataProto.State.CLOSED);
}
- @Test
- public void concurrentExport() throws Exception {
- createContainer();
- populate(100);
- closeContainer();
-
- AtomicReference<String> failed = new AtomicReference<>();
-
- TarContainerPacker packer = new TarContainerPacker();
- List<Thread> threads = IntStream.range(0, 20)
- .mapToObj(i -> new Thread(() -> {
- try {
- File file = folder.newFile("concurrent" + i + ".tar.gz");
- try (OutputStream out = new FileOutputStream(file)) {
- keyValueContainer.exportContainerData(out, packer);
- }
- } catch (Exception e) {
- failed.compareAndSet(null, e.getMessage());
- }
- }))
- .collect(Collectors.toList());
-
- threads.forEach(Thread::start);
- for (Thread thread : threads) {
- thread.join();
- }
-
- assertNull(failed.get());
- }
@Test
public void testDuplicateContainer() throws Exception {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
deleted file mode 100644
index c067a59..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
-import org.apache.hadoop.test.LambdaTestUtils;
-
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/**
- * Test the tar/untar for a given container.
- */
-@RunWith(Parameterized.class)
-public class TestTarContainerPacker {
-
- private static final String TEST_DB_FILE_NAME = "test1";
-
- private static final String TEST_DB_FILE_CONTENT = "test1";
-
- private static final String TEST_CHUNK_FILE_NAME = "chunk1";
-
- private static final String TEST_CHUNK_FILE_CONTENT = "This is a chunk";
-
- private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
-
- private final ContainerPacker packer
- = new TarContainerPacker();
-
- private static final Path SOURCE_CONTAINER_ROOT =
- Paths.get("target/test/data/packer-source-dir");
-
- private static final Path DEST_CONTAINER_ROOT =
- Paths.get("target/test/data/packer-dest-dir");
-
- private static final Path TEMP_DIR =
- Paths.get("target/test/data/packer-tmp-dir");
-
- private static final AtomicInteger CONTAINER_ID = new AtomicInteger(1);
-
- private final ChunkLayOutVersion layout;
-
- public TestTarContainerPacker(ChunkLayOutVersion layout) {
- this.layout = layout;
- }
-
- @Parameterized.Parameters
- public static Iterable<Object[]> parameters() {
- return ChunkLayoutTestInfo.chunkLayoutParameters();
- }
-
- @BeforeClass
- public static void init() throws IOException {
- initDir(SOURCE_CONTAINER_ROOT);
- initDir(DEST_CONTAINER_ROOT);
- initDir(TEMP_DIR);
- }
-
- @AfterClass
- public static void cleanup() throws IOException {
- FileUtils.deleteDirectory(SOURCE_CONTAINER_ROOT.toFile());
- FileUtils.deleteDirectory(DEST_CONTAINER_ROOT.toFile());
- FileUtils.deleteDirectory(TEMP_DIR.toFile());
- }
-
- private static void initDir(Path path) throws IOException {
- if (path.toFile().exists()) {
- FileUtils.deleteDirectory(path.toFile());
- }
- Files.createDirectories(path);
- }
-
- private KeyValueContainerData createContainer(Path dir) throws IOException {
- long id = CONTAINER_ID.getAndIncrement();
-
- Path containerDir = dir.resolve("container" + id);
- Path dbDir = containerDir.resolve("db");
- Path dataDir = containerDir.resolve("data");
- Files.createDirectories(dbDir);
- Files.createDirectories(dataDir);
-
- KeyValueContainerData containerData = new KeyValueContainerData(
- id, layout,
- -1, UUID.randomUUID().toString(), UUID.randomUUID().toString());
- containerData.setChunksPath(dataDir.toString());
- containerData.setMetadataPath(dbDir.getParent().toString());
- containerData.setDbFile(dbDir.toFile());
-
- return containerData;
- }
-
- @Test
- public void pack() throws IOException, CompressorException {
-
- //GIVEN
- OzoneConfiguration conf = new OzoneConfiguration();
-
- KeyValueContainerData sourceContainerData =
- createContainer(SOURCE_CONTAINER_ROOT);
-
- KeyValueContainer sourceContainer =
- new KeyValueContainer(sourceContainerData, conf);
-
- //sample db file in the metadata directory
- writeDbFile(sourceContainerData, TEST_DB_FILE_NAME);
-
- //sample chunk file in the chunk directory
- writeChunkFile(sourceContainerData, TEST_CHUNK_FILE_NAME);
-
- //sample container descriptor file
- writeDescriptor(sourceContainer);
-
- Path targetFile = TEMP_DIR.resolve("container.tar");
-
- //WHEN: pack it
- try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) {
- packer.pack(sourceContainer.getContainerData(), output);
- }
-
- //THEN: check the result
- try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
- TarArchiveInputStream tarStream = new TarArchiveInputStream(input);
-
- TarArchiveEntry entry;
- Map<String, TarArchiveEntry> entries = new HashMap<>();
- while ((entry = tarStream.getNextTarEntry()) != null) {
- entries.put(entry.getName(), entry);
- }
- Assert.assertTrue(
- entries.containsKey("container.yaml"));
-
- }
-
- //read the container descriptor only
- try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
- String containerYaml = new String(packer.unpackContainerDescriptor(input),
- StandardCharsets.UTF_8);
- Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, containerYaml);
- }
-
- KeyValueContainerData destinationContainerData =
- createContainer(DEST_CONTAINER_ROOT);
-
- String descriptor;
-
- //unpackContainerData
- try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
- descriptor =
- new String(
- packer.unpackContainerData(destinationContainerData, input),
- StandardCharsets.UTF_8);
- }
-
- assertExampleMetadataDbIsGood(
- destinationContainerData.getDbFile().toPath(),
- TEST_DB_FILE_NAME);
- assertExampleChunkFileIsGood(
- Paths.get(destinationContainerData.getChunksPath()),
- TEST_CHUNK_FILE_NAME);
- Assert.assertFalse(
- "Descriptor file should not have been extracted by the "
- + "unpackContainerData Call",
- destinationContainerData.getContainerFile().exists());
- Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor);
- }
-
- @Test
- public void unpackContainerDataWithValidRelativeDbFilePath()
- throws Exception {
- //GIVEN
- KeyValueContainerData sourceContainerData =
- createContainer(SOURCE_CONTAINER_ROOT);
-
- String fileName = "sub/dir/" + TEST_DB_FILE_NAME;
- File file = writeDbFile(sourceContainerData, fileName);
- String entryName = TarContainerPacker.DB_DIR_NAME + "/" + fileName;
-
- File containerFile = packContainerWithSingleFile(file, entryName);
-
- // WHEN
- KeyValueContainerData dest = unpackContainerData(containerFile);
-
- // THEN
- assertExampleMetadataDbIsGood(dest.getDbFile().toPath(), fileName);
- }
-
- @Test
- public void unpackContainerDataWithValidRelativeChunkFilePath()
- throws Exception {
- //GIVEN
- KeyValueContainerData sourceContainerData =
- createContainer(SOURCE_CONTAINER_ROOT);
-
- String fileName = "sub/dir/" + TEST_CHUNK_FILE_NAME;
- File file = writeChunkFile(sourceContainerData, fileName);
- String entryName = TarContainerPacker.CHUNKS_DIR_NAME + "/" + fileName;
-
- File containerFile = packContainerWithSingleFile(file, entryName);
-
- // WHEN
- KeyValueContainerData dest = unpackContainerData(containerFile);
-
- // THEN
- assertExampleChunkFileIsGood(Paths.get(dest.getChunksPath()), fileName);
- }
-
- @Test
- public void unpackContainerDataWithInvalidRelativeDbFilePath()
- throws Exception {
- //GIVEN
- KeyValueContainerData sourceContainerData =
- createContainer(SOURCE_CONTAINER_ROOT);
-
- String fileName = "../db_file";
- File file = writeDbFile(sourceContainerData, fileName);
- String entryName = TarContainerPacker.DB_DIR_NAME + "/" + fileName;
-
- File containerFile = packContainerWithSingleFile(file, entryName);
-
- LambdaTestUtils.intercept(IllegalArgumentException.class,
- () -> unpackContainerData(containerFile));
- }
-
- @Test
- public void unpackContainerDataWithInvalidRelativeChunkFilePath()
- throws Exception {
- //GIVEN
- KeyValueContainerData sourceContainerData =
- createContainer(SOURCE_CONTAINER_ROOT);
-
- String fileName = "../chunk_file";
- File file = writeChunkFile(sourceContainerData, fileName);
- String entryName = TarContainerPacker.CHUNKS_DIR_NAME + "/" + fileName;
-
- File containerFile = packContainerWithSingleFile(file, entryName);
-
- LambdaTestUtils.intercept(IllegalArgumentException.class,
- () -> unpackContainerData(containerFile));
- }
-
- private KeyValueContainerData unpackContainerData(File containerFile)
- throws IOException {
- try (FileInputStream input = new FileInputStream(containerFile)) {
- OzoneConfiguration conf = new OzoneConfiguration();
- KeyValueContainerData data = createContainer(DEST_CONTAINER_ROOT);
- packer.unpackContainerData(data, input);
- return data;
- }
- }
-
- private void writeDescriptor(KeyValueContainer container) throws IOException {
- try (FileWriter writer = new FileWriter(
- container.getContainerData().getContainerFile())) {
- IOUtils.write(TEST_DESCRIPTOR_FILE_CONTENT, writer);
- }
- }
-
- private File writeChunkFile(
- KeyValueContainerData containerData, String chunkFileName)
- throws IOException {
- Path path = Paths.get(containerData.getChunksPath())
- .resolve(chunkFileName);
- Files.createDirectories(path.getParent());
- File file = path.toFile();
- try (FileWriter writer = new FileWriter(file)) {
- IOUtils.write(TEST_CHUNK_FILE_CONTENT, writer);
- }
- return file;
- }
-
- private File writeDbFile(
- KeyValueContainerData containerData, String dbFileName)
- throws IOException {
- Path path = containerData.getDbFile().toPath()
- .resolve(dbFileName);
- Files.createDirectories(path.getParent());
- File file = path.toFile();
- try (FileWriter writer = new FileWriter(file)) {
- IOUtils.write(TEST_DB_FILE_CONTENT, writer);
- }
- return file;
- }
-
- private File packContainerWithSingleFile(File file, String entryName)
- throws Exception {
- File targetFile = TEMP_DIR.resolve("container.tar").toFile();
- try (FileOutputStream output = new FileOutputStream(targetFile);
- ArchiveOutputStream archive = new TarArchiveOutputStream(output)) {
- TarContainerPacker.includeFile(file, entryName, archive);
- }
- return targetFile;
- }
-
- private void assertExampleMetadataDbIsGood(Path dbPath, String filename)
- throws IOException {
-
- Path dbFile = dbPath.resolve(filename);
-
- Assert.assertTrue(
- "example DB file is missing after pack/unpackContainerData: " + dbFile,
- Files.exists(dbFile));
-
- try (FileInputStream testFile = new FileInputStream(dbFile.toFile())) {
- List<String> strings = IOUtils
- .readLines(testFile, StandardCharsets.UTF_8);
- Assert.assertEquals(1, strings.size());
- Assert.assertEquals(TEST_DB_FILE_CONTENT, strings.get(0));
- }
- }
-
- private void assertExampleChunkFileIsGood(Path chunkPath, String filename)
- throws IOException {
-
- Path chunkFile = chunkPath.resolve(filename);
-
- Assert.assertTrue(
- "example chunk file is missing after pack/unpackContainerData: "
- + chunkFile,
- Files.exists(chunkFile));
-
- try (FileInputStream testFile = new FileInputStream(chunkFile.toFile())) {
- List<String> strings = IOUtils
- .readLines(testFile, StandardCharsets.UTF_8);
- Assert.assertEquals(1, strings.size());
- Assert.assertEquals(TEST_CHUNK_FILE_CONTENT, strings.get(0));
- }
- }
-
-}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
deleted file mode 100644
index cf6ece3..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests for {@code GrpcOutputStream}.
- */
-@RunWith(MockitoJUnitRunner.class)
-public class TestGrpcOutputStream {
-
- private static final Random RND = new Random();
-
- private final long containerId = RND.nextLong();
- private final int bufferSize = RND.nextInt(1024) + 128 + 1;
-
- @Mock
- private StreamObserver<CopyContainerResponseProto> observer;
-
- private OutputStream subject;
-
- @Before
- public void setUp() throws Exception {
- subject = new GrpcOutputStream(observer, containerId, bufferSize);
- }
-
- @Test
- public void seriesOfBytesInSingleResponse() throws IOException {
- byte[] bytes = getRandomBytes(5);
- for (byte b : bytes) {
- subject.write(b);
- }
- subject.close();
-
- verifyResponses(bytes);
- }
-
- @Test
- public void mixedBytesAndArraysInSingleResponse() throws IOException {
- byte[] bytes = getRandomBytes(16);
- subject.write(bytes[0]);
- subject.write(bytes, 1, 14);
- subject.write(bytes[15]);
- subject.close();
-
- verifyResponses(bytes);
- }
-
- @Test
- public void mixedArraysAndBytesInSingleResponse() throws IOException {
- byte[] bytes = getRandomBytes(10);
-
- subject.write(bytes, 0, 5);
- subject.write(bytes[5]);
- subject.write(bytes, 6, 4);
- subject.close();
-
- verifyResponses(bytes);
- }
-
- @Test
- public void seriesOfArraysInSingleResponse() throws IOException {
- byte[] bytes = getRandomBytes(8);
-
- subject.write(bytes, 0, 5);
- subject.write(bytes, 5, 3);
- subject.close();
-
- verifyResponses(bytes);
- }
-
- @Test
- public void seriesOfArraysExactlyFillBuffer() throws IOException {
- int half = bufferSize / 2, otherHalf = bufferSize - half;
- byte[] bytes = getRandomBytes(2 * bufferSize);
-
- // fill buffer
- subject.write(bytes, 0, half);
- subject.write(bytes, half, otherHalf);
- // fill buffer again
- subject.write(bytes, bufferSize, half);
- subject.write(bytes, bufferSize + half, otherHalf);
- subject.close();
-
- verifyResponses(bytes);
- }
-
- @Test
- public void bufferFlushedWhenFull() throws IOException {
- byte[] bytes = getRandomBytes(bufferSize);
-
- subject.write(bytes, 0, bufferSize-1);
- subject.write(bytes[bufferSize-1]);
- verify(observer).onNext(any());
-
- subject.write(bytes[0]);
- subject.write(bytes, 1, bufferSize-1);
- verify(observer, times(2)).onNext(any());
- }
-
- @Test
- public void singleArraySpansMultipleResponses() throws IOException {
- byte[] bytes = writeBytes(subject, 2 * bufferSize + bufferSize/2);
- subject.close();
-
- verifyResponses(bytes);
- }
-
- @Test
- public void secondWriteSpillsToNextResponse() throws IOException {
- byte[] bytes1 = writeBytes(subject, bufferSize / 2);
- byte[] bytes2 = writeBytes(subject, 2 * bufferSize);
- subject.close();
-
- verifyResponses(concat(bytes1, bytes2));
- }
-
- private void verifyResponses(byte[] bytes) {
- int expectedResponseCount = bytes.length / bufferSize;
- if (bytes.length % bufferSize > 0) {
- expectedResponseCount++;
- }
-
- ArgumentCaptor<CopyContainerResponseProto> captor =
- ArgumentCaptor.forClass(CopyContainerResponseProto.class);
- verify(observer, times(expectedResponseCount)).onNext(captor.capture());
-
- List<CopyContainerResponseProto> responses =
- new ArrayList<>(captor.getAllValues());
- for (int i = 0; i < expectedResponseCount; i++) {
- CopyContainerResponseProto response = responses.get(i);
- assertEquals(containerId, response.getContainerID());
-
- int expectedOffset = i * bufferSize;
- assertEquals(expectedOffset, response.getReadOffset());
-
- int size = Math.min(bufferSize, bytes.length - expectedOffset);
- assertEquals(size, response.getLen());
-
- byte[] part = new byte[size];
- System.arraycopy(bytes, expectedOffset, part, 0, size);
- ByteString data = response.getData();
- assertArrayEquals(part, data.toByteArray());
-
- // we don't want concatenated ByteStrings
- assertEquals("LiteralByteString", data.getClass().getSimpleName());
- }
-
- verify(observer, times(1)).onCompleted();
- }
-
- private static byte[] concat(byte[]... parts) {
- int length = Arrays.stream(parts).mapToInt(each -> each.length).sum();
- byte[] bytes = new byte[length];
- int pos = 0;
- for (byte[] part : parts) {
- System.arraycopy(part, 0, bytes, pos, part.length);
- pos += part.length;
- }
- return bytes;
- }
-
- private static byte[] writeBytes(OutputStream subject, int size)
- throws IOException {
- byte[] bytes = getRandomBytes(size);
- subject.write(bytes);
- return bytes;
- }
-
- private static byte[] getRandomBytes(int size) {
- byte[] bytes = new byte[size];
- RND.nextBytes(bytes);
- return bytes;
- }
-
-}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
index 5360eac..c9cd1eb 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
@@ -1,13 +1,13 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
@@ -39,7 +39,9 @@ import org.apache.hadoop.test.GenericTestUtils;
import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.jetbrains.annotations.NotNull;
import org.junit.Test;
/**
@@ -48,20 +50,97 @@ import org.junit.Test;
public class TestReplicationService {
@Test
- public void test() throws IOException, TimeoutException,
- InterruptedException {
- final UUID scmUuid = UUID.randomUUID();
+ public void test() throws Exception {
+
+ final String scmUuid = "99335668-b0b2-46ff-bd6e-7bc9cc0e1404";
+ final String clusterUuid = " 7001371d-d474-4ae2-bd80-d942b31f8bc9";
//start server
- ConfigurationSource ozoneConfig = new OzoneConfiguration();
+ final Path sourceDir = Paths
+ .get(System.getProperty("user.dir"), "target", "test-data", "source");
+ final Path destDir = Paths
+ .get(System.getProperty("user.dir"), "target", "test-data", "dest");
+ FileUtils.deleteDirectory(sourceDir.toFile());
+ FileUtils.deleteDirectory(destDir.toFile());
+
+ final String sourceDnUUID = "d6979383-5fd5-4fa5-be02-9b39f06d763d";
+ final String destDnUUID = "bb11a0cc-8902-4f07-adae-a853ba891132";
+
+ ReplicationServer replicationServer =
+ initSource(clusterUuid, scmUuid, destDnUUID, sourceDir.toString());
+
+ //start client
+ ContainerSet
+ destinationContainerSet =
+ replicateContainer(scmUuid,
+ sourceDnUUID,
+ replicationServer.getPort(),
+ destDnUUID,
+ destDir);
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return destinationContainerSet.getContainer(1L) != null;
+ }
+ }, 1000, 10_000);
+ }
+
+ @NotNull
+ private ContainerSet replicateContainer(
+ String scmUuid,
+ String sourceDnUUID,
+ int port,
+ String destDnUUID,
+ Path destDir
+ ) throws IOException {
+ ContainerSet sourceContainerSet = new ContainerSet();
+
+ OzoneConfiguration clientConfig = new OzoneConfiguration();
+ clientConfig.set("hdds.datanode.dir", destDir.toString());
+ MutableVolumeSet volumeSet =
+ new MutableVolumeSet(destDnUUID, clientConfig);
+
+ DownloadAndImportReplicator replicator = new DownloadAndImportReplicator(
+ clientConfig,
+ () -> scmUuid,
+ sourceContainerSet,
+ new SimpleContainerDownloader(clientConfig, null),
+ volumeSet);
+
+ DatanodeDetails source =
+ DatanodeDetails.newBuilder()
+ .setIpAddress("127.0.0.1")
+ .setUuid(UUID.fromString(sourceDnUUID))
+ .build();
+
+ source.setPort(Name.REPLICATION, port);
+ List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
+ sourceDatanodes.add(source);
+
+ ContainerSet destinationContainerSet = new ContainerSet();
+ ReplicationSupervisor supervisor =
+ new ReplicationSupervisor(destinationContainerSet, replicator, 10);
+ replicator.replicate(new ReplicationTask(1L, sourceDatanodes));
+ return destinationContainerSet;
+ }
+
+ private ReplicationServer initSource(
+ String clusterUuid,
+ String scmUuid,
+ String sourceDnUUID,
+ String sourceDir
+ )
+ throws Exception {
+ OzoneConfiguration ozoneConfig = new OzoneConfiguration();
+ ozoneConfig.set("hdds.datanode.dir", sourceDir);
- final String sourceDnUUID = UUID.randomUUID().toString();
- final String destDnUUID = UUID.randomUUID().toString();
MutableVolumeSet sourceVolumes =
new MutableVolumeSet(sourceDnUUID, ozoneConfig);
VolumeChoosingPolicy v = new RoundRobinVolumeChoosingPolicy();
final HddsVolume volume =
v.chooseVolume(sourceVolumes.getVolumesList(), 5L);
+ volume.format(clusterUuid);
KeyValueContainerData kvd = new KeyValueContainerData(1L, "/tmp/asd");
kvd.setState(State.OPEN);
@@ -82,7 +161,7 @@ public class TestReplicationService {
final ContainerCommandRequestProto containerCommandRequest =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.WriteChunk)
- .setDatanodeUuid(destDnUUID)
+ .setDatanodeUuid(sourceDnUUID)
.setContainerID(kvc.getContainerData().getContainerID())
.setWriteChunk(WriteChunkRequestProto.newBuilder()
.setBlockID(DatanodeBlockID.newBuilder()
@@ -103,7 +182,8 @@ public class TestReplicationService {
.build())
.build();
- handler.handle(containerCommandRequest, kvc, new DispatcherContext.Builder().build());
+ handler.handle(containerCommandRequest, kvc,
+ new DispatcherContext.Builder().build());
HashMap<ContainerType, Handler> handlers = Maps.newHashMap();
ContainerController controller =
@@ -114,45 +194,14 @@ public class TestReplicationService {
SecurityConfig securityConfig = new SecurityConfig(ozoneConfig);
ReplicationServer replicationServer =
- new ReplicationServer(controller, replicationConfig, securityConfig,
+ new ReplicationServer(sourceContainerSet, replicationConfig, securityConfig,
null);
+ kvd.setState(State.CLOSED);
+
replicationServer.init();
replicationServer.start();
-
- //start client
- OzoneConfiguration clientConfig = new OzoneConfiguration();
- clientConfig.set("hdds.datanode.dir","tmp/qwe");
- MutableVolumeSet volumeSet =
- new MutableVolumeSet(destDnUUID, clientConfig);
-
- DownloadAndImportReplicator replicator = new DownloadAndImportReplicator(
- ozoneConfig,
- () -> scmUuid.toString(),
- sourceContainerSet,
- new SimpleContainerDownloader(ozoneConfig, null),
- volumeSet);
-
- DatanodeDetails source =
- DatanodeDetails.newBuilder()
- .setIpAddress("127.0.0.1")
- .setUuid(UUID.randomUUID())
- .build();
- source.setPort(Name.REPLICATION, replicationServer.getPort());
- List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
- sourceDatanodes.add(source);
-
- ContainerSet destinationContainerSet = new ContainerSet();
- ReplicationSupervisor supervisor =
- new ReplicationSupervisor(destinationContainerSet, replicator, 10);
- replicator.replicate(new ReplicationTask(1L, sourceDatanodes));
-
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return destinationContainerSet.getContainer(1L) != null;
- }
- }, 1000, 10_000);
+ return replicationServer;
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ExportContainer.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ExportContainer.java
deleted file mode 100644
index 686c8be..0000000
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ExportContainer.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.debug;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-
-import org.apache.hadoop.hdds.cli.GenericParentCommand;
-import org.apache.hadoop.hdds.cli.SubcommandWithParent;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerReader;
-import org.apache.hadoop.ozone.container.replication.ContainerReplicationSource;
-import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
-
-import com.google.common.base.Preconditions;
-import org.kohsuke.MetaInfServices;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import picocli.CommandLine;
-import picocli.CommandLine.Command;
-import picocli.CommandLine.ParentCommand;
-
-@Command(name = "export-container",
- description = "Export one container to a tarball")
-@MetaInfServices(SubcommandWithParent.class)
-public class ExportContainer implements SubcommandWithParent, Callable<Void> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ExportContainer.class);
- @ParentCommand
- private GenericParentCommand parent;
-
- @CommandLine.Option(names = {"--container"},
- required = true,
- description = "Container Id")
- private long containerId;
-
- @CommandLine.Option(names = {"--dest"},
- defaultValue = "/tmp",
- description = "Destination directory")
- private String destination;
-
- @Override
- public Class<?> getParentType() {
- return OzoneDebug.class;
- }
-
- @Override
- public Void call() throws Exception {
-
- ConfigurationSource conf = parent.createOzoneConfiguration();
-
- ContainerSet containerSet = new ContainerSet();
-
- ContainerMetrics metrics = ContainerMetrics.create(conf);
-
- String firstStorageDir = getFirstStorageDir(conf);
-
- String datanodeUuid = getDatanodeUUID(firstStorageDir, conf);
-
- String scmId = getScmId(firstStorageDir);
-
- MutableVolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf);
-
- Map<ContainerType, Handler> handlers = new HashMap<>();
-
- for (ContainerType containerType : ContainerType.values()) {
- final Handler handler =
- Handler.getHandlerForContainerType(
- containerType,
- conf,
- datanodeUuid,
- containerSet,
- volumeSet,
- metrics,
- containerReplicaProto -> {
- });
- handler.setScmID(scmId);
- handlers.put(containerType, handler);
- }
-
- ContainerController controller =
- new ContainerController(containerSet, handlers);
-
- final ContainerReplicationSource replicationSource =
- new OnDemandContainerReplicationSource(controller);
-
- Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
- .iterator();
-
- LOG.info("Starting the read all the container metadata");
-
- while (volumeSetIterator.hasNext()) {
- HddsVolume volume = volumeSetIterator.next();
- LOG.info("Loading container metadata from volume " + volume.toString());
- final ContainerReader reader =
- new ContainerReader(volumeSet, volume, containerSet, conf);
- reader.run();
- }
-
- LOG.info("All the container metadata is loaded. Starting to replication");
-
- replicationSource.prepare(containerId);
- LOG.info("Preparation is done");
-
- final File destinationFile =
- new File(destination, "container-" + containerId + ".tar.gz");
- try (FileOutputStream fos = new FileOutputStream(destinationFile)) {
- replicationSource.copyData(containerId, fos);
- }
- LOG.info("Container is exported to {}", destinationFile);
-
- return null;
- }
-
- public String getScmId(String storageDir) throws IOException {
- Preconditions.checkNotNull(storageDir);
- return Files.list(Paths.get(storageDir, "hdds"))
- .filter(Files::isDirectory)
- .findFirst().get().getFileName().toString();
- }
-
- public String getDatanodeUUID(String storageDir, ConfigurationSource config)
- throws IOException {
-
- final File versionFile = new File(storageDir, "hdds/VERSION");
-
- Properties props = DatanodeVersionFile.readFrom(versionFile);
- if (props.isEmpty()) {
- throw new InconsistentStorageStateException(
- "Version file " + versionFile + " is missing");
- }
-
- return HddsVolumeUtil
- .getProperty(props, OzoneConsts.DATANODE_UUID, versionFile);
- }
-
- private String getFirstStorageDir(ConfigurationSource config)
- throws IOException {
- final Collection<String> storageDirs =
- MutableVolumeSet.getDatanodeStorageDirs(config);
-
- return
- StorageLocation.parse(storageDirs.iterator().next())
- .getUri().getPath();
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org