You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:45:54 UTC
[ozone] 05/27: fixing closed container replication
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit c308e1b25f11abb22d4e7526f23933792090ae0d
Author: Elek Márton <el...@apache.org>
AuthorDate: Mon Jan 25 11:57:29 2021 +0100
fixing closed container replication
---
.../container/keyvalue/KeyValueContainer.java | 2 +-
.../container/keyvalue/KeyValueContainerData.java | 8 ----
.../replication/DownloadAndImportReplicator.java | 19 ++++++----
.../replication/SimpleContainerDownloader.java | 41 ++++++++++++++-------
.../rpc/TestContainerReplicationEndToEnd.java | 43 +++++++++++-----------
5 files changed, 62 insertions(+), 51 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index d2111f7..763f5ac 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -413,7 +413,7 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
// to flush the update container data to disk.
long containerId = containerData.getContainerID();
if(!containerData.isValid()) {
- LOG.debug("Invalid container data. ContainerID: {}", containerId);
+ LOG.warn("Invalid container data. ContainerID: {}", containerId);
throw new StorageContainerException("Invalid container data. " +
"ContainerID: " + containerId, INVALID_CONTAINER_STATE);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index c9b0b31..7c3ec7b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import static java.lang.Math.max;
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_COUNT;
@@ -114,13 +113,6 @@ public class KeyValueContainerData extends ContainerData {
this.deleteTransactionId = 0;
}
- public KeyValueContainerData(ContainerData source) {
- super(source);
- Preconditions.checkArgument(source.getContainerType()
- == ContainerProtos.ContainerType.KeyValueContainer);
- this.numPendingDeletionBlocks = new AtomicLong(0);
- this.deleteTransactionId = 0;
- }
/**
* @param version The schema version indicating the table layout of the
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 06b5b33..21fd50e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -100,13 +100,21 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
new KeyValueContainerData(containerID,
ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+ //choose a volume
+ final HddsVolume volume = volumeChoosingPolicy
+ .chooseVolume(volumeSet.getVolumesList(), maxContainerSize);
+
+ //fill the path fields
+ containerData.assignToVolume(scmId.get(), volume);
+
+ //download data
final KeyValueContainerData loadedContainerData =
downloader
.getContainerDataFromReplicas(containerData, sourceDatanodes);
- final HddsVolume volume = volumeChoosingPolicy
- .chooseVolume(volumeSet.getVolumesList(), maxContainerSize);
- loadedContainerData.assignToVolume(scmId.get(), volume);
+ LOG.info("Container {} is downloaded, starting to import.",
+ containerID);
//write out container descriptor
KeyValueContainer keyValueContainer =
@@ -116,14 +124,11 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
keyValueContainer.update(loadedContainerData.getMetadata(), true);
//fill in memory stat counter (keycount, byte usage)
- KeyValueContainerUtil.parseKVContainerData(containerData, config);
+ KeyValueContainerUtil.parseKVContainerData(loadedContainerData, config);
//load container
containerSet.addContainer(keyValueContainer);
- LOG.info("Container {} is downloaded, starting to import.",
- containerID);
-
LOG.info("Container {} is replicated successfully", containerID);
task.setStatus(Status.DONE);
} catch (Exception e) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 3f0e63a..d3e7029 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -121,7 +121,7 @@ public class SimpleContainerDownloader implements ContainerDownloader {
@VisibleForTesting
protected KeyValueContainerData downloadContainer(
- KeyValueContainerData containerData,
+ KeyValueContainerData preCreated,
DatanodeDetails datanode
) throws IOException {
CompletableFuture<Path> result;
@@ -132,27 +132,42 @@ public class SimpleContainerDownloader implements ContainerDownloader {
PipedOutputStream outputStream = new PipedOutputStream();
- grpcReplicationClient.download(containerData, outputStream);
+ grpcReplicationClient.download(preCreated, outputStream);
final byte[] descriptor = packer
- .unpackContainerData(containerData, new PipedInputStream(outputStream));
+ .unpackContainerData(preCreated, new PipedInputStream(outputStream));
//parse descriptor
//now, we have extracted the container descriptor from the previous
//datanode. We can load it and upload it with the current data
// (original metadata + current filepath fields)
- KeyValueContainerData originalContainerData =
+ KeyValueContainerData replicated =
(KeyValueContainerData) ContainerDataYaml
.readContainer(descriptor);
- containerData.setState(originalContainerData.getState());
- containerData
- .setContainerDBType(originalContainerData.getContainerDBType());
- containerData.setSchemaVersion(originalContainerData.getSchemaVersion());
- containerData.setLayoutVersion(
- originalContainerData.getLayOutVersion().getVersion());
-
- //update descriptor
- return containerData;
+ KeyValueContainerData updated = new KeyValueContainerData(
+ replicated.getContainerID(),
+ replicated.getLayOutVersion(),
+ replicated.getMaxSize(),
+ replicated.getOriginPipelineId(),
+ replicated.getOriginNodeId());
+
+ //inherited from the replicated
+ updated
+ .setState(replicated.getState());
+ updated
+ .setContainerDBType(replicated.getContainerDBType());
+ updated
+ .updateBlockCommitSequenceId(replicated.getBlockCommitSequenceId());
+ updated
+ .setSchemaVersion(replicated.getSchemaVersion());
+
+ //inherited from the pre-created seed container
+ updated.setMetadataPath(preCreated.getMetadataPath());
+ updated.setDbFile(preCreated.getDbFile());
+ updated.setChunksPath(preCreated.getChunksPath());
+ updated.setVolume(preCreated.getVolume());
+
+ return updated;
}
@Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index d9f7578..5e73a56 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -17,8 +17,17 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -37,30 +46,20 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.ozone.container.TestHelper;
-
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
-
/**
* Tests delete key operation with a slow follower in the datanode
* pipeline.
@@ -180,6 +179,7 @@ public class TestContainerReplicationEndToEnd {
// wait for container to move to OPEN state in SCM
Thread.sleep(2 * containerReportInterval);
DatanodeDetails oldReplicaNode = pipeline.getFirstNode();
+
// now move the container to the closed on the datanode.
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
@@ -191,13 +191,16 @@ public class TestContainerReplicationEndToEnd {
request.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
xceiverClient.sendCommand(request.build());
+
// wait for container to move to closed state in SCM
Thread.sleep(2 * containerReportInterval);
Assert.assertTrue(
cluster.getStorageContainerManager().getContainerInfo(containerID)
.getState() == HddsProtos.LifeCycleState.CLOSED);
+
// shutdown the replica node
cluster.shutdownHddsDatanode(oldReplicaNode);
+
// now the container is under replicated and will be moved to a different dn
HddsDatanodeService dnService = null;
@@ -212,13 +215,9 @@ public class TestContainerReplicationEndToEnd {
Assert.assertNotNull(dnService);
final HddsDatanodeService newReplicaNode = dnService;
// wait for the container to get replicated
- GenericTestUtils.waitFor(() -> {
- return newReplicaNode.getDatanodeStateMachine().getContainer()
- .getContainerSet().getContainer(containerID) != null;
- }, 500, 100000);
- Assert.assertTrue(newReplicaNode.getDatanodeStateMachine().getContainer()
- .getContainerSet().getContainer(containerID).getContainerData()
- .getBlockCommitSequenceId() > 0);
+ GenericTestUtils.waitFor(() -> newReplicaNode.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID) != null, 500, 100000);
+
// wait for SCM to update the replica Map
Thread.sleep(5 * containerReportInterval);
// now shutdown the other two dns of the original pipeline and try reading
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org