You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:45:53 UTC
[ozone] 04/27: closed container replication
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 664bd5e83c681a9e8feff805f2c56a7bffd91cda
Author: Elek Márton <el...@apache.org>
AuthorDate: Fri Jan 22 06:33:19 2021 +0100
closed container replication
---
.../ozone/container/common/impl/ContainerData.java | 67 ++++++--
.../container/common/impl/ContainerDataYaml.java | 14 +-
.../container/common/impl/HddsDispatcher.java | 4 +
.../container/common/interfaces/Container.java | 24 +--
.../common/interfaces/ContainerPacker.java | 9 +-
.../ozone/container/common/interfaces/Handler.java | 21 ---
.../common/statemachine/DatanodeStateMachine.java | 8 +-
.../container/keyvalue/KeyValueContainer.java | 177 ++++++---------------
.../container/keyvalue/KeyValueContainerCheck.java | 26 +--
.../container/keyvalue/KeyValueContainerData.java | 106 +++++++-----
.../ozone/container/keyvalue/KeyValueHandler.java | 31 ----
.../container/keyvalue/TarContainerPacker.java | 22 +--
.../container/ozoneimpl/ContainerController.java | 39 +----
.../ozone/container/ozoneimpl/OzoneContainer.java | 4 +
.../container/replication/ContainerDownloader.java | 7 +-
.../container/replication/ContainerReplicator.java | 2 +-
.../replication/DownloadAndImportReplicator.java | 119 +++++++-------
.../replication/GrpcReplicationClient.java | 80 +++-------
.../OnDemandContainerReplicationSource.java | 10 +-
.../replication/ReplicationSupervisor.java | 3 +
.../replication/SimpleContainerDownloader.java | 74 +++++----
.../container/keyvalue/TestKeyValueContainer.java | 124 ++++-----------
.../container/keyvalue/TestTarContainerPacker.java | 29 ++--
.../replication/TestReplicationSupervisor.java | 17 +-
.../replication/TestSimpleContainerDownloader.java | 64 ++++----
.../ozone/freon/ClosedContainerReplicator.java | 40 +----
pom.xml | 2 +-
27 files changed, 436 insertions(+), 687 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 19cc1e2..f65e54a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -17,31 +17,28 @@
*/
package org.apache.hadoop.ozone.container.common.impl;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
-import java.util.List;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
- ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerDataProto;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
-import org.yaml.snakeyaml.Yaml;
-import javax.annotation.Nullable;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import static org.apache.hadoop.ozone.OzoneConsts.CHECKSUM;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ID;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_TYPE;
@@ -52,6 +49,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.METADATA;
import static org.apache.hadoop.ozone.OzoneConsts.ORIGIN_NODE_ID;
import static org.apache.hadoop.ozone.OzoneConsts.ORIGIN_PIPELINE_ID;
import static org.apache.hadoop.ozone.OzoneConsts.STATE;
+import org.yaml.snakeyaml.Yaml;
/**
* ContainerData is the in-memory representation of container metadata and is
@@ -67,7 +65,7 @@ public abstract class ContainerData {
private final long containerID;
// Layout version of the container data
- private final int layOutVersion;
+ private int layOutVersion;
// Metadata of the container will be a key value pair.
// This can hold information like volume name, owner etc.,
@@ -83,6 +81,9 @@ public abstract class ContainerData {
private boolean committedSpace;
+ // Path to Container metadata Level DB/RocksDB Store and .container file.
+ private String metadataPath;
+
//ID of the pipeline where this container is created
private String originPipelineId;
//ID of the datanode where this container is created
@@ -98,6 +99,8 @@ public abstract class ContainerData {
private HddsVolume volume;
+ private File containerFile;
+
private String checksum;
/** Timestamp of last data scan (milliseconds since Unix Epoch).
@@ -630,4 +633,38 @@ public abstract class ContainerData {
incrWriteBytes(bytesWritten);
}
+ public void setLayoutVersion(int version) {
+ this.layOutVersion = version;
+ }
+
+ public File getContainerFile() {
+ return ContainerData.getContainerFile(getMetadataPath(), getContainerID());
+ }
+
+ public static File getContainerFile(String metadataPath, long containerId) {
+ return new File(metadataPath,
+ containerId + OzoneConsts.CONTAINER_EXTENSION);
+ }
+
+ /**
+ * Returns container metadata path.
+ * @return - Physical path where container file and checksum is stored.
+ */
+ public String getMetadataPath() {
+ return metadataPath;
+ }
+
+ /**
+ * Sets container metadata path.
+ *
+ * @param path - String.
+ */
+ public void setMetadataPath(String path) {
+ this.metadataPath = path;
+ }
+
+
+ public static List<String> getYamlFields() {
+ return YAML_FIELDS;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
index 757d7e8..3f54e25 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
@@ -32,18 +32,15 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerType;
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import com.google.common.base.Preconditions;
-import static org.apache.hadoop.ozone.container.keyvalue
- .KeyValueContainerData.KEYVALUE_YAML_TAG;
+import org.apache.commons.io.IOUtils;
+import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.KEYVALUE_YAML_TAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
@@ -156,8 +153,7 @@ public final class ContainerDataYaml {
Yaml yaml = new Yaml(containerDataConstructor, representer);
yaml.setBeanAccess(BeanAccess.FIELD);
- containerData = (ContainerData)
- yaml.load(input);
+ containerData = yaml.load(input);
return containerData;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 13c1780..fa451a3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -648,6 +648,10 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
.build();
}
+ public String getScmId() {
+ return scmID;
+ }
+
enum EventType {
READ,
WRITE
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 67a7a16..063ac33 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -19,18 +19,12 @@
package org.apache.hadoop.ozone.container.common.interfaces;
import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.time.Instant;
import java.util.Map;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .StorageContainerException;
-
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.util.RwLock;
@@ -127,20 +121,6 @@ public interface Container<CONTAINERDATA extends ContainerData> extends RwLock {
void updateDeleteTransactionId(long deleteTransactionId);
/**
- * Import the container from an external archive.
- */
- void importContainerData(InputStream stream,
- ContainerPacker<CONTAINERDATA> packer) throws IOException;
-
- /**
- * Export all the data of the container to one output archive with the help
- * of the packer.
- *
- */
- void exportContainerData(OutputStream stream,
- ContainerPacker<CONTAINERDATA> packer) throws IOException;
-
- /**
* Returns containerReport for the container.
*/
ContainerReplicaProto getContainerReport()
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
index 8308c23..a76e6f0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
@@ -22,13 +22,13 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
/**
* Service to pack/unpack ContainerData container data to/from a single byte
* stream.
*/
-public interface ContainerPacker<CONTAINERDATA extends ContainerData> {
+public interface ContainerPacker {
/**
* Extract the container data to the path defined by the container.
@@ -38,7 +38,8 @@ public interface ContainerPacker<CONTAINERDATA extends ContainerData> {
* @return the byte content of the descriptor (which won't be written to a
* file but returned).
*/
- byte[] unpackContainerData(Container<CONTAINERDATA> container,
+ byte[] unpackContainerData(
+ KeyValueContainerData container,
InputStream inputStream)
throws IOException;
@@ -46,7 +47,7 @@ public interface ContainerPacker<CONTAINERDATA extends ContainerData> {
* Compress all the container data (chunk data, metadata db AND container
* descriptor) to one single archive.
*/
- void pack(Container<CONTAINERDATA> container, OutputStream destination)
+ void pack(KeyValueContainerData containerData, OutputStream destination)
throws IOException;
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 4ba7572..04317e9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.ozone.container.common.interfaces;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -31,12 +29,10 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
/**
* Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -107,23 +103,6 @@ public abstract class Handler {
DispatcherContext dispatcherContext);
/**
- * Imports container from a raw input stream.
- */
- public abstract Container importContainer(
- ContainerData containerData, InputStream rawContainerStream,
- TarContainerPacker packer)
- throws IOException;
-
- /**
- * Exports container to the output stream.
- */
- public abstract void exportContainer(
- Container container,
- OutputStream outputStream,
- TarContainerPacker packer)
- throws IOException;
-
- /**
* Stop the Handler.
*/
public abstract void stop();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a7cb2d3..0969d64 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Dele
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
@@ -126,11 +125,12 @@ public class DatanodeStateMachine implements Closeable {
nextHB = new AtomicLong(Time.monotonicNow());
ContainerReplicator replicator =
- new DownloadAndImportReplicator(container.getContainerSet(),
- container.getController(),
+ new DownloadAndImportReplicator(conf,
+ () -> container.getScmId(),
+ container.getContainerSet(),
new SimpleContainerDownloader(conf,
dnCertClient != null ? dnCertClient.getCACertificate() : null),
- new TarContainerPacker());
+ container.getVolumeSet());
supervisor =
new ReplicationSupervisor(container.getContainerSet(), replicator,
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 53d6162..d2111f7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.keyvalue;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
@@ -54,7 +53,6 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_FILES_CREATE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
@@ -108,36 +106,29 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
long maxSize = containerData.getMaxSize();
volumeSet.readLock();
try {
+
HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
.getVolumesList(), maxSize);
- String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
long containerID = containerData.getContainerID();
- containerMetaDataPath = KeyValueContainerLocationUtil
- .getContainerMetaDataPath(hddsVolumeDir, scmId, containerID);
- containerData.setMetadataPath(containerMetaDataPath.getPath());
-
- File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
- hddsVolumeDir, scmId, containerID);
+ containerData.assignToVolume(scmId, containerVolume);
+ containerMetaDataPath = new File(containerData.getMetadataPath());
// Check if it is new Container.
ContainerUtils.verifyIsNewContainer(containerMetaDataPath);
- //Create Metadata path chunks path and metadata db
- File dbFile = getContainerDBFile();
-
// This method is only called when creating new containers.
// Therefore, always use the newest schema version.
containerData.setSchemaVersion(OzoneConsts.SCHEMA_LATEST);
KeyValueContainerUtil.createContainerMetaData(containerID,
- containerMetaDataPath, chunksPath, dbFile,
- containerData.getSchemaVersion(), config);
+ containerMetaDataPath,
+ new File(containerData.getChunksPath()),
+ containerData.getDbFile(),
+ containerData.getSchemaVersion(),
+ config);
+
- //Set containerData for the KeyValueContainer.
- containerData.setChunksPath(chunksPath.getPath());
- containerData.setDbFile(dbFile);
- containerData.setVolume(containerVolume);
// Create .container file
File containerFile = getContainerFile();
@@ -455,100 +446,6 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
containerData.updateDeleteTransactionId(deleteTransactionId);
}
- @Override
- public void importContainerData(InputStream input,
- ContainerPacker<KeyValueContainerData> packer) throws IOException {
- writeLock();
- try {
- if (getContainerFile().exists()) {
- String errorMessage = String.format(
- "Can't import container (cid=%d) data to a specific location"
- + " as the container descriptor (%s) has already been exist.",
- getContainerData().getContainerID(),
- getContainerFile().getAbsolutePath());
- throw new IOException(errorMessage);
- }
- //copy the values from the input stream to the final destination
- // directory.
- byte[] descriptorContent = packer.unpackContainerData(this, input);
-
- Preconditions.checkNotNull(descriptorContent,
- "Container descriptor is missing from the container archive: "
- + getContainerData().getContainerID());
-
- //now, we have extracted the container descriptor from the previous
- //datanode. We can load it and upload it with the current data
- // (original metadata + current filepath fields)
- KeyValueContainerData originalContainerData =
- (KeyValueContainerData) ContainerDataYaml
- .readContainer(descriptorContent);
-
-
- containerData.setState(originalContainerData.getState());
- containerData
- .setContainerDBType(originalContainerData.getContainerDBType());
- containerData.setSchemaVersion(originalContainerData.getSchemaVersion());
-
- //rewriting the yaml file with new checksum calculation.
- update(originalContainerData.getMetadata(), true);
-
- //fill in memory stat counter (keycount, byte usage)
- KeyValueContainerUtil.parseKVContainerData(containerData, config);
-
- } catch (Exception ex) {
- //delete all the temporary data in case of any exception.
- try {
- FileUtils.deleteDirectory(new File(containerData.getMetadataPath()));
- FileUtils.deleteDirectory(new File(containerData.getChunksPath()));
- FileUtils.deleteDirectory(getContainerFile());
- } catch (Exception deleteex) {
- LOG.error(
- "Can not cleanup destination directories after a container import"
- + " error (cid" +
- containerData.getContainerID() + ")", deleteex);
- }
- throw ex;
- } finally {
- writeUnlock();
- }
- }
-
- @Override
- public void exportContainerData(OutputStream destination,
- ContainerPacker<KeyValueContainerData> packer) throws IOException {
- writeLock();
- try {
- // Closed/ Quasi closed containers are considered for replication by
- // replication manager if they are under-replicated.
- ContainerProtos.ContainerDataProto.State state =
- getContainerData().getState();
- if (!(state == ContainerProtos.ContainerDataProto.State.CLOSED ||
- state == ContainerDataProto.State.QUASI_CLOSED)) {
- throw new IllegalStateException(
- "Only (quasi)closed containers can be exported, but " +
- "ContainerId=" + getContainerData().getContainerID() +
- " is in state " + state);
- }
-
- try {
- compactDB();
- // Close DB (and remove from cache) to avoid concurrent modification
- // while packing it.
- BlockUtils.removeDB(containerData, config);
- } finally {
- readLock();
- writeUnlock();
- }
-
- packer.pack(this, destination);
- } finally {
- if (lock.isWriteLockedByCurrentThread()) {
- writeUnlock();
- } else {
- readUnlock();
- }
- }
- }
/**
* Acquire read lock.
@@ -620,14 +517,9 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
* @return .container File name
*/
@Override
+ @Deprecated
public File getContainerFile() {
- return getContainerFile(containerData.getMetadataPath(),
- containerData.getContainerID());
- }
-
- static File getContainerFile(String metadataPath, long containerId) {
- return new File(metadataPath,
- containerId + OzoneConsts.CONTAINER_EXTENSION);
+ return containerData.getContainerFile();
}
@Override
@@ -697,15 +589,6 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
return state;
}
- /**
- * Returns container DB file.
- * @return
- */
- public File getContainerDBFile() {
- return new File(containerData.getMetadataPath(), containerData
- .getContainerID() + OzoneConsts.DN_CONTAINER_DB);
- }
-
public boolean scanMetaData() {
long containerId = containerData.getContainerID();
KeyValueContainerCheck checker =
@@ -735,6 +618,44 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
return checker.fullCheck(throttler, canceler);
}
+ public void exportContainerData(
+ OutputStream destination,
+ ContainerPacker packer
+ ) throws IOException {
+ writeLock();
+ try {
+ // Closed/ Quasi closed containers are considered for replication by
+ // replication manager if they are under-replicated.
+ ContainerProtos.ContainerDataProto.State state =
+ getContainerData().getState();
+ if (!(state == ContainerProtos.ContainerDataProto.State.CLOSED ||
+ state == ContainerDataProto.State.QUASI_CLOSED)) {
+ throw new IllegalStateException(
+ "Only (quasi)closed containers can be exported, but " +
+ "ContainerId=" + getContainerData().getContainerID() +
+ " is in state " + state);
+ }
+
+ try {
+ compactDB();
+ // Close DB (and remove from cache) to avoid concurrent modification
+ // while packing it.
+ BlockUtils.removeDB(containerData, config);
+ } finally {
+ readLock();
+ writeUnlock();
+ }
+
+ packer.pack(containerData, destination);
+ } finally {
+ if (lock.isWriteLockedByCurrentThread()) {
+ writeUnlock();
+ } else {
+ readUnlock();
+ }
+ }
+ }
+
private enum ContainerCheckLevel {
NO_CHECK, FAST_CHECK, FULL_CHECK
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index 20c1455..0f47e8a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -18,7 +18,12 @@
package org.apache.hadoop.ozone.container.keyvalue;
-import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdfs.util.Canceler;
@@ -30,26 +35,21 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
-import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_LEVELDB;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_LEVELDB;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
-
/**
* Class to run integrity checks on Datanode Containers.
* Provide infra for Data Scrubbing
@@ -324,8 +324,8 @@ public class KeyValueContainerCheck {
}
private void loadContainerData() throws IOException {
- File containerFile = KeyValueContainer
- .getContainerFile(metadataPath, containerID);
+ File containerFile =
+ ContainerData.getContainerFile(metadataPath, containerID);
onDiskContainerData = (KeyValueContainerData) ContainerDataYaml
.readContainerFile(containerFile);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 5c20e6e..c9b0b31 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -18,37 +18,36 @@
package org.apache.hadoop.ozone.container.keyvalue;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
+import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerDataProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
-import org.yaml.snakeyaml.nodes.Tag;
-
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import static java.lang.Math.max;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
+import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_COUNT;
import static org.apache.hadoop.ozone.OzoneConsts.CHUNKS_PATH;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_BYTES_USED;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
import static org.apache.hadoop.ozone.OzoneConsts.METADATA_PATH;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSION;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_BYTES_USED;
-import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_COUNT;
import static org.apache.hadoop.ozone.OzoneConsts.PENDING_DELETE_BLOCK_COUNT;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSION;
+import org.yaml.snakeyaml.nodes.Tag;
/**
* This class represents the KeyValueContainer metadata, which is the
@@ -63,9 +62,6 @@ public class KeyValueContainerData extends ContainerData {
// Fields need to be stored in .container file.
private static final List<String> KV_YAML_FIELDS;
- // Path to Container metadata Level DB/RocksDB Store and .container file.
- private String metadataPath;
-
//Type of DB used to store key to chunks mapping
private String containerDBType = CONTAINER_DB_TYPE_ROCKSDB;
@@ -92,14 +88,26 @@ public class KeyValueContainerData extends ContainerData {
KV_YAML_FIELDS.add(SCHEMA_VERSION);
}
+ public KeyValueContainerData(long id) {
+ this(id, ChunkLayOutVersion.FILE_PER_BLOCK, 0, "", "");
+ }
+
+ public KeyValueContainerData(long id, String metadataPath) {
+ this(id, ChunkLayOutVersion.FILE_PER_BLOCK, 0, "", "");
+ setMetadataPath(metadataPath);
+ }
+
/**
* Constructs KeyValueContainerData object.
- * @param id - ContainerId
+ *
+ * @param id - ContainerId
* @param layOutVersion chunk layout
- * @param size - maximum size of the container in bytes
+ * @param size - maximum size of the container in bytes
*/
- public KeyValueContainerData(long id, ChunkLayOutVersion layOutVersion,
- long size, String originPipelineId, String originNodeId) {
+ public KeyValueContainerData(
+ long id, ChunkLayOutVersion layOutVersion,
+ long size, String originPipelineId, String originNodeId
+ ) {
super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion,
size, originPipelineId, originNodeId);
this.numPendingDeletionBlocks = new AtomicLong(0);
@@ -148,28 +156,11 @@ public class KeyValueContainerData extends ContainerData {
}
/**
- * Returns container metadata path.
- * @return - Physical path where container file and checksum is stored.
- */
- public String getMetadataPath() {
- return metadataPath;
- }
-
- /**
- * Sets container metadata path.
- *
- * @param path - String.
- */
- public void setMetadataPath(String path) {
- this.metadataPath = path;
- }
-
- /**
* Returns the path to base dir of the container.
* @return Path to base dir
*/
public String getContainerPath() {
- return new File(metadataPath).getParent();
+ return new File(getMetadataPath()).getParent();
}
/**
@@ -290,12 +281,39 @@ public class KeyValueContainerData extends ContainerData {
// Set Bytes used and block count key.
metadataTable.putWithBatch(batchOperation, CONTAINER_BYTES_USED,
- getBytesUsed());
+ getBytesUsed());
metadataTable.putWithBatch(batchOperation, BLOCK_COUNT,
- getKeyCount() - deletedBlockCount);
+ getKeyCount() - deletedBlockCount);
metadataTable.putWithBatch(batchOperation, PENDING_DELETE_BLOCK_COUNT,
- (long)(getNumPendingDeletionBlocks() - deletedBlockCount));
+ (long) (getNumPendingDeletionBlocks() - deletedBlockCount));
db.getStore().getBatchHandler().commitBatchOperation(batchOperation);
}
+
+ public void assignToVolume(String scmId, HddsVolume containerVolume) {
+ String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
+
+ File containerMetaDataPath = KeyValueContainerLocationUtil
+ .getContainerMetaDataPath(hddsVolumeDir, scmId, getContainerID());
+ setMetadataPath(containerMetaDataPath.getPath());
+
+ File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
+ hddsVolumeDir, scmId, getContainerID());
+
+ //Set containerData for the KeyValueContainer.
+ setChunksPath(chunksPath.getPath());
+ setDbFile(getContainerDBFile());
+ setVolume(containerVolume);
+
+ }
+
+ /**
+ * Returns container DB file.
+ *
+ * @return
+ */
+ public File getContainerDBFile() {
+ return new File(getMetadataPath(),
+ getContainerID() + OzoneConsts.DN_CONTAINER_DB);
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index dbc2a97..74c33e4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.ozone.container.keyvalue;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
@@ -54,7 +52,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -910,34 +907,6 @@ public class KeyValueHandler extends Handler {
}
@Override
- public Container importContainer(ContainerData originalContainerData,
- final InputStream rawContainerStream,
- final TarContainerPacker packer)
- throws IOException {
-
- KeyValueContainerData containerData =
- new KeyValueContainerData(originalContainerData);
-
- KeyValueContainer container = new KeyValueContainer(containerData,
- conf);
-
- populateContainerPathFields(container);
- container.importContainerData(rawContainerStream, packer);
- sendICR(container);
- return container;
-
- }
-
- @Override
- public void exportContainer(final Container container,
- final OutputStream outputStream,
- final TarContainerPacker packer)
- throws IOException{
- final KeyValueContainer kvc = (KeyValueContainer) container;
- kvc.exportContainerData(outputStream, packer);
- }
-
- @Override
public void markContainerForClose(Container container)
throws IOException {
container.writeLock();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index 349a15d..aaafed4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -32,9 +32,9 @@ import java.util.stream.Stream;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+import static java.util.stream.Collectors.toList;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
@@ -44,13 +44,11 @@ import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.IOUtils;
-import static java.util.stream.Collectors.toList;
-
/**
* Compress/uncompress KeyValueContainer data to a tar.gz archive.
*/
public class TarContainerPacker
- implements ContainerPacker<KeyValueContainerData> {
+ implements ContainerPacker {
static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS;
@@ -62,15 +60,14 @@ public class TarContainerPacker
* Given an input stream (tar file) extract the data to the specified
* directories.
*
- * @param container container which defines the destination structure.
+ * @param containerData container which defines the destination structure.
* @param input the input stream.
*/
@Override
- public byte[] unpackContainerData(Container<KeyValueContainerData> container,
+ public byte[] unpackContainerData(KeyValueContainerData containerData,
InputStream input)
throws IOException {
byte[] descriptorFileContent = null;
- KeyValueContainerData containerData = container.getContainerData();
Path dbRoot = containerData.getDbFile().toPath();
Path chunksRoot = Paths.get(containerData.getChunksPath());
@@ -103,8 +100,7 @@ public class TarContainerPacker
} catch (CompressorException e) {
throw new IOException(
- "Can't uncompress the given container: " + container
- .getContainerData().getContainerID(),
+ "Can't uncompress the given container: " + containerData.getContainerID(),
e);
}
}
@@ -139,16 +135,14 @@ public class TarContainerPacker
* Given a containerData include all the required container data/metadata
* in a tar file.
*
- * @param container Container to archive (data + metadata).
+ * @param containerData Container to archive
* @param output Destination tar file/stream.
*/
@Override
- public void pack(Container<KeyValueContainerData> container,
+ public void pack(KeyValueContainerData containerData,
OutputStream output)
throws IOException {
- KeyValueContainerData containerData = container.getContainerData();
-
try (OutputStream compressed = compress(output);
ArchiveOutputStream archiveOutput = tar(compressed)) {
@@ -158,7 +152,7 @@ public class TarContainerPacker
includePath(Paths.get(containerData.getChunksPath()), CHUNKS_DIR_NAME,
archiveOutput);
- includeFile(container.getContainerFile(), CONTAINER_FILE_NAME,
+ includeFile(containerData.getContainerFile(), CONTAINER_FILE_NAME,
archiveOutput);
} catch (CompressorException e) {
throw new IOException(
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 2398c85..4d710f5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -17,26 +17,19 @@
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
-import org.apache.hadoop.hdds.protocol.datanode.proto
- .ContainerProtos.ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerDataProto.State;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+
/**
* Control plane for container management in datanode.
*/
@@ -130,22 +123,6 @@ public class ContainerController {
getHandler(container).closeContainer(container);
}
- public Container importContainer(
- final ContainerData containerData,
- final InputStream rawContainerStream,
- final TarContainerPacker packer)
- throws IOException {
- return handlers.get(containerData.getContainerType())
- .importContainer(containerData, rawContainerStream, packer);
- }
-
- public void exportContainer(final ContainerType type,
- final long containerId, final OutputStream outputStream,
- final TarContainerPacker packer) throws IOException {
- handlers.get(type).exportContainer(
- containerSet.getContainer(containerId), outputStream, packer);
- }
-
/**
* Deletes a container given its Id.
* @param containerId Id of the container to be deleted
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 3ecddac..4ea2d6d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -347,4 +347,8 @@ public class OzoneContainer {
public MutableVolumeSet getVolumeSet() {
return volumeSet;
}
+
+ public String getScmId() {
+ return hddsDispatcher.getScmId();
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
index 9511241..16578bb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
@@ -18,11 +18,10 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.Closeable;
-import java.nio.file.Path;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
/**
* Service to download container data from other datanodes.
@@ -34,7 +33,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
*/
public interface ContainerDownloader extends Closeable {
- CompletableFuture<Path> getContainerDataFromReplicas(long containerId,
- List<DatanodeDetails> sources);
+ KeyValueContainerData getContainerDataFromReplicas(
+ KeyValueContainerData containerData, List<DatanodeDetails> sources);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
index 827b9d6..939761f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
/**
* Service to do the real replication task.
- *
+ * <p>
* An implementation should download the container and im
*/
public interface ContainerReplicator {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index cdab0fd..06b5b33 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -17,22 +17,25 @@
*/
package org.apache.hadoop.ozone.container.replication;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,74 +52,78 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
private final ContainerSet containerSet;
- private final ContainerController controller;
-
private final ContainerDownloader downloader;
+ private final ConfigurationSource config;
+
+ private final Supplier<String> scmId;
- private final TarContainerPacker packer;
+ private VolumeSet volumeSet;
public DownloadAndImportReplicator(
+ ConfigurationSource config,
+ Supplier<String> scmId,
ContainerSet containerSet,
- ContainerController controller,
ContainerDownloader downloader,
- TarContainerPacker packer) {
+ VolumeSet volumeSet
+ ) {
this.containerSet = containerSet;
- this.controller = controller;
this.downloader = downloader;
- this.packer = packer;
+ this.config = config;
+ this.scmId = scmId;
+ this.volumeSet = volumeSet;
}
- public void importContainer(long containerID, Path tarFilePath)
- throws IOException {
- try {
- ContainerData originalContainerData;
- try (FileInputStream tempContainerTarStream = new FileInputStream(
- tarFilePath.toFile())) {
- byte[] containerDescriptorYaml =
- packer.unpackContainerDescriptor(tempContainerTarStream);
- originalContainerData = ContainerDataYaml.readContainer(
- containerDescriptorYaml);
- }
-
- try (FileInputStream tempContainerTarStream = new FileInputStream(
- tarFilePath.toFile())) {
-
- Container container = controller.importContainer(
- originalContainerData, tempContainerTarStream, packer);
-
- containerSet.addContainer(container);
- }
-
- } finally {
- try {
- Files.delete(tarFilePath);
- } catch (Exception ex) {
- LOG.error("Got exception while deleting downloaded container file: "
- + tarFilePath.toAbsolutePath().toString(), ex);
- }
- }
- }
@Override
public void replicate(ReplicationTask task) {
long containerID = task.getContainerId();
-
+ if (scmId.get() == null) {
+ LOG.error("Replication task is called before first SCM call");
+ task.setStatus(Status.FAILED);
+ }
List<DatanodeDetails> sourceDatanodes = task.getSources();
LOG.info("Starting replication of container {} from {}", containerID,
sourceDatanodes);
- CompletableFuture<Path> tempTarFile = downloader
- .getContainerDataFromReplicas(containerID,
- sourceDatanodes);
-
try {
- //wait for the download. This thread pool is limiting the paralell
- //downloads, so it's ok to block here and wait for the full download.
- Path path = tempTarFile.get();
+
+ VolumeChoosingPolicy volumeChoosingPolicy = config.getClass(
+ HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
+ .class, VolumeChoosingPolicy.class).newInstance();
+
+ long maxContainerSize = (long) config.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+ KeyValueContainerData containerData =
+ new KeyValueContainerData(containerID,
+ ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+ final KeyValueContainerData loadedContainerData =
+ downloader
+ .getContainerDataFromReplicas(containerData, sourceDatanodes);
+
+ final HddsVolume volume = volumeChoosingPolicy
+ .chooseVolume(volumeSet.getVolumesList(), maxContainerSize);
+ loadedContainerData.assignToVolume(scmId.get(), volume);
+
+ //write out container descriptor
+ KeyValueContainer keyValueContainer =
+ new KeyValueContainer(loadedContainerData, config);
+
+ //rewriting the yaml file with new checksum calculation.
+ keyValueContainer.update(loadedContainerData.getMetadata(), true);
+
+ //fill in memory stat counter (keycount, byte usage)
+ KeyValueContainerUtil.parseKVContainerData(containerData, config);
+
+ //load container
+ containerSet.addContainer(keyValueContainer);
+
LOG.info("Container {} is downloaded, starting to import.",
containerID);
- importContainer(containerID, path);
+
LOG.info("Container {} is replicated successfully", containerID);
task.setStatus(Status.DONE);
} catch (Exception e) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index 53dac9d..89e23fe 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -19,14 +19,10 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.UncheckedIOException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.security.cert.X509Certificate;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
@@ -35,8 +31,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServi
import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
@@ -49,7 +45,7 @@ import org.slf4j.LoggerFactory;
/**
* Client to read container data from gRPC.
*/
-public class GrpcReplicationClient implements AutoCloseable{
+public class GrpcReplicationClient implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcReplicationClient.class);
@@ -91,23 +87,18 @@ public class GrpcReplicationClient implements AutoCloseable{
workingDirectory = workingDir;
}
- public CompletableFuture<Path> download(long containerId) {
+ public void download(
+ KeyValueContainerData containerData,
+ OutputStream outputStream
+ ) {
CopyContainerRequestProto request =
CopyContainerRequestProto.newBuilder()
- .setContainerID(containerId)
+ .setContainerID(containerData.getContainerID())
.setLen(-1)
.setReadOffset(0)
.build();
- CompletableFuture<Path> response = new CompletableFuture<>();
-
- Path destinationPath =
- getWorkingDirectory().resolve("container-" + containerId + ".tar.gz");
-
- client.download(request,
- new StreamDownloader(containerId, response, destinationPath));
-
- return response;
+ client.download(request, new StreamDownloader(outputStream));
}
private Path getWorkingDirectory() {
@@ -134,74 +125,41 @@ public class GrpcReplicationClient implements AutoCloseable{
public static class StreamDownloader
implements StreamObserver<CopyContainerResponseProto> {
- private final CompletableFuture<Path> response;
- private final long containerId;
- private final OutputStream stream;
- private final Path outputPath;
+ private final OutputStream outputStream;
- public StreamDownloader(long containerId, CompletableFuture<Path> response,
- Path outputPath) {
- this.response = response;
- this.containerId = containerId;
- this.outputPath = outputPath;
- try {
- Preconditions.checkNotNull(outputPath, "Output path cannot be null");
- Path parentPath = Preconditions.checkNotNull(outputPath.getParent());
- Files.createDirectories(parentPath);
- stream = new FileOutputStream(outputPath.toFile());
- } catch (IOException e) {
- throw new UncheckedIOException(
- "Output path can't be used: " + outputPath, e);
- }
+ public StreamDownloader(
+ OutputStream output
+ ) {
+ this.outputStream = output;
}
@Override
public void onNext(CopyContainerResponseProto chunk) {
try {
- chunk.getData().writeTo(stream);
+ chunk.getData().writeTo(outputStream);
} catch (IOException e) {
- response.completeExceptionally(e);
+ e.printStackTrace();
}
}
@Override
public void onError(Throwable throwable) {
try {
- LOG.error("Download of container {} was unsuccessful",
- containerId, throwable);
- stream.close();
- deleteOutputOnFailure();
- response.completeExceptionally(throwable);
+ outputStream.close();
} catch (IOException e) {
- LOG.error("Failed to close {} for container {}",
- outputPath, containerId, e);
- response.completeExceptionally(e);
+ e.printStackTrace();
}
}
@Override
public void onCompleted() {
try {
- stream.close();
- LOG.info("Container {} is downloaded to {}", containerId, outputPath);
- response.complete(outputPath);
+ outputStream.close();
} catch (IOException e) {
- LOG.error("Downloaded container {} OK, but failed to close {}",
- containerId, outputPath, e);
- response.completeExceptionally(e);
+ e.printStackTrace();
}
}
- private void deleteOutputOnFailure() {
- try {
- Files.delete(outputPath);
- } catch (IOException ex) {
- LOG.error("Failed to delete temporary destination {} for " +
- "unsuccessful download of container {}",
- outputPath, containerId, ex);
- }
- }
}
-
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
index 40d8e48..474333e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -20,11 +20,11 @@ package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
import java.io.OutputStream;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
/**
* A naive implementation of the replication source which creates a tar file
@@ -51,13 +51,13 @@ public class OnDemandContainerReplicationSource
public void copyData(long containerId, OutputStream destination)
throws IOException {
- Container container = controller.getContainer(containerId);
+ KeyValueContainer container =
+ (KeyValueContainer) controller.getContainer(containerId);
Preconditions.checkNotNull(
container, "Container is not found " + containerId);
- controller.exportContainer(
- container.getContainerType(), containerId, destination, packer);
+ container.exportContainerData(destination, packer);
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 6becf62..8dcb646 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
@@ -45,6 +46,8 @@ public class ReplicationSupervisor {
private final ContainerReplicator replicator;
private final ExecutorService executor;
+ private ConfigurationSource config;
+
private final AtomicLong requestCounter = new AtomicLong();
private final AtomicLong successCounter = new AtomicLong();
private final AtomicLong failureCounter = new AtomicLong();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 0967503..3f0e63a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.cert.X509Certificate;
@@ -32,6 +34,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import com.google.common.annotations.VisibleForTesting;
import org.jetbrains.annotations.NotNull;
@@ -53,6 +58,7 @@ public class SimpleContainerDownloader implements ContainerDownloader {
private final Path workingDirectory;
private final SecurityConfig securityConfig;
private final X509Certificate caCert;
+ private TarContainerPacker packer = new TarContainerPacker();
public SimpleContainerDownloader(
ConfigurationSource conf,
@@ -73,40 +79,27 @@ public class SimpleContainerDownloader implements ContainerDownloader {
}
@Override
- public CompletableFuture<Path> getContainerDataFromReplicas(
- long containerId,
+ public KeyValueContainerData getContainerDataFromReplicas(
+ KeyValueContainerData containerData,
List<DatanodeDetails> sourceDatanodes
) {
- CompletableFuture<Path> result = null;
-
final List<DatanodeDetails> shuffledDatanodes =
shuffleDatanodes(sourceDatanodes);
for (DatanodeDetails datanode : shuffledDatanodes) {
try {
- if (result == null) {
- result = downloadContainer(containerId, datanode);
- } else {
-
- result = result.exceptionally(t -> {
- LOG.error("Error on replicating container: " + containerId, t);
- try {
- return downloadContainer(containerId, datanode).join();
- } catch (Exception e) {
- LOG.error("Error on replicating container: " + containerId,
- e);
- return null;
- }
- });
- }
+ return downloadContainer(containerData, datanode);
} catch (Exception ex) {
LOG.error(String.format(
"Container %s download from datanode %s was unsuccessful. "
- + "Trying the next datanode", containerId, datanode), ex);
+ + "Trying the next datanode", containerData.getContainerID(),
+ datanode), ex);
}
}
- return result;
+ throw new RuntimeException(
+ "Couldn't download container from any of the datanodes " + containerData
+ .getContainerID());
}
@@ -127,8 +120,8 @@ public class SimpleContainerDownloader implements ContainerDownloader {
}
@VisibleForTesting
- protected CompletableFuture<Path> downloadContainer(
- long containerId,
+ protected KeyValueContainerData downloadContainer(
+ KeyValueContainerData containerData,
DatanodeDetails datanode
) throws IOException {
CompletableFuture<Path> result;
@@ -136,17 +129,30 @@ public class SimpleContainerDownloader implements ContainerDownloader {
new GrpcReplicationClient(datanode.getIpAddress(),
datanode.getPort(Name.REPLICATION).getValue(),
workingDirectory, securityConfig, caCert);
- result = grpcReplicationClient.download(containerId)
- .thenApply(r -> {
- try {
- grpcReplicationClient.close();
- } catch (Exception e) {
- LOG.error("Couldn't close Grpc replication client", e);
- }
- return r;
- });
-
- return result;
+
+ PipedOutputStream outputStream = new PipedOutputStream();
+
+ grpcReplicationClient.download(containerData, outputStream);
+ final byte[] descriptor = packer
+ .unpackContainerData(containerData, new PipedInputStream(outputStream));
+
+ //parse descriptor
+ //now, we have extracted the container descriptor from the previous
+ //datanode. We can load it and upload it with the current data
+ // (original metadata + current filepath fields)
+ KeyValueContainerData originalContainerData =
+ (KeyValueContainerData) ContainerDataYaml
+ .readContainer(descriptor);
+
+ containerData.setState(originalContainerData.getState());
+ containerData
+ .setContainerDBType(originalContainerData.getContainerDBType());
+ containerData.setSchemaVersion(originalContainerData.getSchemaVersion());
+ containerData.setLayoutVersion(
+ originalContainerData.getLayOutVersion().getVersion());
+
+ //update descriptor
+ return containerData;
}
@Override
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 4000e34..6ca5e44 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -18,63 +18,56 @@
package org.apache.hadoop.ozone.container.keyvalue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .StorageContainerException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume
- .RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import static org.apache.ratis.util.Preconditions.assertTrue;
import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
-import org.rocksdb.ColumnFamilyOptions;
-
-import java.io.File;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.ratis.util.Preconditions.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
+import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
+import org.rocksdb.ColumnFamilyOptions;
/**
* Class to test KeyValue Container operations.
@@ -141,74 +134,11 @@ public class TestKeyValueContainer {
// Check whether container file and container db file exists or not.
assertTrue(keyValueContainer.getContainerFile().exists(),
".Container File does not exist");
- assertTrue(keyValueContainer.getContainerDBFile().exists(), "Container " +
- "DB does not exist");
+// assertTrue(keyValueContainer.getContainerDBFile().exists(), "Container " +
+// "DB does not exist");
}
- @Test
- public void testContainerImportExport() throws Exception {
- long containerId = keyValueContainer.getContainerData().getContainerID();
- createContainer();
- long numberOfKeysToWrite = 12;
- closeContainer();
- populate(numberOfKeysToWrite);
-
- //destination path
- File folderToExport = folder.newFile("exported.tar.gz");
-
- TarContainerPacker packer = new TarContainerPacker();
-
- //export the container
- try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
- keyValueContainer
- .exportContainerData(fos, packer);
- }
-
- //delete the original one
- keyValueContainer.delete();
- //create a new one
- KeyValueContainerData containerData =
- new KeyValueContainerData(containerId,
- keyValueContainerData.getLayOutVersion(),
- keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
- datanodeId.toString());
- KeyValueContainer container = new KeyValueContainer(containerData, CONF);
-
- HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
- .getVolumesList(), 1);
- String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
-
- container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
- try (FileInputStream fis = new FileInputStream(folderToExport)) {
- container.importContainerData(fis, packer);
- }
-
- assertEquals("value1", containerData.getMetadata().get("key1"));
- assertEquals(keyValueContainerData.getContainerDBType(),
- containerData.getContainerDBType());
- assertEquals(keyValueContainerData.getState(),
- containerData.getState());
- assertEquals(numberOfKeysToWrite,
- containerData.getKeyCount());
- assertEquals(keyValueContainerData.getLayOutVersion(),
- containerData.getLayOutVersion());
- assertEquals(keyValueContainerData.getMaxSize(),
- containerData.getMaxSize());
- assertEquals(keyValueContainerData.getBytesUsed(),
- containerData.getBytesUsed());
-
- //Can't overwrite existing container
- try {
- try (FileInputStream fis = new FileInputStream(folderToExport)) {
- container.importContainerData(fis, packer);
- }
- fail("Container is imported twice. Previous files are overwritten");
- } catch (IOException ex) {
- //all good
- }
-
- }
/**
* Create the container on disk.
@@ -327,8 +257,8 @@ public class TestKeyValueContainer {
assertFalse("Container File still exists",
keyValueContainer.getContainerFile().exists());
- assertFalse("Container DB file still exists",
- keyValueContainer.getContainerDBFile().exists());
+// assertFalse("Container DB file still exists",
+// keyValueContainer.getContainerDBFile().exists());
}
@Test
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index d248ac1..15b2526 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -33,21 +33,22 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -55,8 +56,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
-
/**
* Test the tar/untar for a given container.
*/
@@ -73,7 +72,7 @@ public class TestTarContainerPacker {
private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
- private final ContainerPacker<KeyValueContainerData> packer
+ private final ContainerPacker packer
= new TarContainerPacker();
private static final Path SOURCE_CONTAINER_ROOT =
@@ -163,7 +162,7 @@ public class TestTarContainerPacker {
//WHEN: pack it
try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) {
- packer.pack(sourceContainer, output);
+ packer.pack(sourceContainer.getContainerData(), output);
}
//THEN: check the result
@@ -193,15 +192,13 @@ public class TestTarContainerPacker {
KeyValueContainerData destinationContainerData =
createContainer(DEST_CONTAINER_ROOT);
- KeyValueContainer destinationContainer =
- new KeyValueContainer(destinationContainerData, conf);
-
String descriptor;
//unpackContainerData
try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
descriptor =
- new String(packer.unpackContainerData(destinationContainer, input),
+ new String(
+ packer.unpackContainerData(destinationContainerData, input),
StandardCharsets.UTF_8);
}
@@ -214,7 +211,7 @@ public class TestTarContainerPacker {
Assert.assertFalse(
"Descriptor file should not have been extracted by the "
+ "unpackContainerData Call",
- destinationContainer.getContainerFile().exists());
+ destinationContainerData.getContainerFile().exists());
Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor);
}
@@ -297,14 +294,14 @@ public class TestTarContainerPacker {
try (FileInputStream input = new FileInputStream(containerFile)) {
OzoneConfiguration conf = new OzoneConfiguration();
KeyValueContainerData data = createContainer(DEST_CONTAINER_ROOT);
- KeyValueContainer container = new KeyValueContainer(data, conf);
- packer.unpackContainerData(container, input);
+ packer.unpackContainerData(data, input);
return data;
}
}
private void writeDescriptor(KeyValueContainer container) throws IOException {
- try (FileWriter writer = new FileWriter(container.getContainerFile())) {
+ try (FileWriter writer = new FileWriter(
+ container.getContainerData().getContainerFile())) {
IOUtils.write(TEST_DESCRIPTOR_FILE_CONTENT, writer);
}
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index df9ffbc..85be438 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.replication;
+import javax.annotation.Nonnull;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
@@ -29,14 +30,17 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-
import org.apache.hadoop.test.GenericTestUtils;
+
+import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static java.util.Collections.emptyList;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -45,11 +49,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
-import javax.annotation.Nonnull;
-
-import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
-import static java.util.Collections.emptyList;
-
/**
* Test the replication supervisor.
*/
@@ -189,12 +188,10 @@ public class TestReplicationSupervisor {
Mockito.mock(SimpleContainerDownloader.class);
CompletableFuture<Path> res = new CompletableFuture<>();
res.complete(Paths.get("file:/tmp/no-such-file"));
- Mockito.when(
- moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList()))
- .thenReturn(res);
ContainerReplicator replicatorFactory =
- new DownloadAndImportReplicator(set, null, moc, null);
+ new DownloadAndImportReplicator(new InMemoryConfiguration(),
+ () -> "", set, moc, null);
replicatorRef.set(replicatorFactory);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
index 7070425..f73b022 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
@@ -18,19 +18,17 @@
package org.apache.hadoop.ozone.container.replication;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.junit.Assert;
import org.junit.Test;
@@ -47,17 +45,17 @@ public class TestSimpleContainerDownloader {
//GIVEN
List<DatanodeDetails> datanodes = createDatanodes();
+ KeyValueContainerData kvc = new KeyValueContainerData(1L);
SimpleContainerDownloader downloader =
createDownloaderWithPredefinedFailures(true);
//WHEN
- final Path result =
- downloader.getContainerDataFromReplicas(1L, datanodes)
- .get(1L, TimeUnit.SECONDS);
+ downloader.getContainerDataFromReplicas(kvc, datanodes);
//THEN
- Assert.assertEquals(datanodes.get(0).getUuidString(), result.toString());
+ // Assert.assertEquals(datanodes.get(0).getUuidString(), result
+ // .toString());
}
@Test
@@ -71,13 +69,14 @@ public class TestSimpleContainerDownloader {
createDownloaderWithPredefinedFailures(true, datanodes.get(0));
//WHEN
- final Path result =
- downloader.getContainerDataFromReplicas(1L, datanodes)
- .get(1L, TimeUnit.SECONDS);
+
+ final KeyValueContainerData data = downloader
+ .getContainerDataFromReplicas(new KeyValueContainerData(1L), datanodes);
//THEN
//first datanode is failed, second worked
- Assert.assertEquals(datanodes.get(1).getUuidString(), result.toString());
+ Assert.assertEquals(datanodes.get(1).getUuidString(),
+ data.getContainerDBType());
}
@Test
@@ -90,13 +89,13 @@ public class TestSimpleContainerDownloader {
createDownloaderWithPredefinedFailures(false, datanodes.get(0));
//WHEN
- final Path result =
- downloader.getContainerDataFromReplicas(1L, datanodes)
- .get(1L, TimeUnit.SECONDS);
+ final KeyValueContainerData data = downloader
+ .getContainerDataFromReplicas(new KeyValueContainerData(1L), datanodes);
//THEN
//first datanode is failed, second worked
- Assert.assertEquals(datanodes.get(1).getUuidString(), result.toString());
+ Assert.assertEquals(datanodes.get(1).getUuidString(),
+ data.getContainerDBType());
}
/**
@@ -113,21 +112,22 @@ public class TestSimpleContainerDownloader {
new SimpleContainerDownloader(new OzoneConfiguration(), null) {
@Override
- protected CompletableFuture<Path> downloadContainer(
- long containerId, DatanodeDetails datanode
- ) {
- //download is always successful.
- return CompletableFuture
- .completedFuture(Paths.get(datanode.getUuidString()));
+ protected KeyValueContainerData downloadContainer(
+ KeyValueContainerData containerData, DatanodeDetails datanode
+ ) throws IOException {
+ containerData.setContainerDBType(datanode.getUuidString());
+ return containerData;
}
};
//WHEN executed, THEN at least once the second datanode should be
//returned.
for (int i = 0; i < 10000; i++) {
- Path path =
- downloader.getContainerDataFromReplicas(1L, datanodes).get();
- if (path.toString().equals(datanodes.get(1).getUuidString())) {
+ final KeyValueContainerData containerData =
+ downloader.getContainerDataFromReplicas(new KeyValueContainerData(1L),
+ datanodes);
+ if (containerData.getContainerDBType()
+ .equals(datanodes.get(1).getUuidString())) {
return;
}
}
@@ -166,8 +166,8 @@ public class TestSimpleContainerDownloader {
}
@Override
- protected CompletableFuture<Path> downloadContainer(
- long containerId,
+ protected KeyValueContainerData downloadContainer(
+ KeyValueContainerData containerData,
DatanodeDetails datanode
) {
@@ -175,15 +175,13 @@ public class TestSimpleContainerDownloader {
if (directException) {
throw new RuntimeException("Unavailable datanode");
} else {
- return CompletableFuture.supplyAsync(() -> {
- throw new RuntimeException("Unavailable datanode");
- });
+ throw new RuntimeException("Unavailable datanode");
+
}
} else {
-
//path includes the dn id to make it possible to assert.
- return CompletableFuture.completedFuture(
- Paths.get(datanode.getUuidString()));
+ containerData.setContainerDBType(datanode.getUuidString());
+ return containerData;
}
}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index ad2810a..7d7af35 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -22,9 +22,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
@@ -33,18 +31,13 @@ import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
@@ -168,36 +161,17 @@ public class ClosedContainerReplicator extends BaseFreonGenerator implements
}
ContainerSet containerSet = new ContainerSet();
-
- ContainerMetrics metrics = ContainerMetrics.create(conf);
-
+
MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf);
- Map<ContainerType, Handler> handlers = new HashMap<>();
-
- for (ContainerType containerType : ContainerType.values()) {
- final Handler handler =
- Handler.getHandlerForContainerType(
- containerType,
- conf,
- fakeDatanodeUuid,
- containerSet,
- volumeSet,
- metrics,
- containerReplicaProto -> {
- });
- handler.setScmID(UUID.randomUUID().toString());
- handlers.put(containerType, handler);
- }
-
- ContainerController controller =
- new ContainerController(containerSet, handlers);
-
+ final UUID scmId = UUID.randomUUID();
ContainerReplicator replicator =
- new DownloadAndImportReplicator(containerSet,
- controller,
+ new DownloadAndImportReplicator(
+ conf,
+ () -> scmId.toString(),
+ containerSet,
new SimpleContainerDownloader(conf, null),
- new TarContainerPacker());
+ volumeSet);
supervisor = new ReplicationSupervisor(containerSet, replicator, 10);
}
diff --git a/pom.xml b/pom.xml
index 9800005..c2db18e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,7 +153,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<protobuf.version>2.5.0</protobuf.version>
<findbugs.version>3.0.0</findbugs.version>
- <spotbugs.version>3.1.12</spotbugs.version>
+ <spotbugs.version>4.2.0</spotbugs.version>
<dnsjava.version>2.1.7</dnsjava.version>
<guava.version>28.2-jre</guava.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org