You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:46:14 UTC
[ozone] 25/27: fix replication
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 4afb0856b87b13b04da916f4691b9b000d1671c7
Author: Elek Márton <el...@apache.org>
AuthorDate: Tue Feb 23 14:17:43 2021 +0100
fix replication
---
.../replication/ContainerStreamingDestination.java | 2 +-
.../replication/ContainerStreamingSource.java | 19 +++++++++----------
.../container/stream/DirstreamClientHandler.java | 1 -
.../container/stream/DirstreamServerHandler.java | 3 ++-
.../container/replication/TestReplicationService.java | 13 ++++++++++---
5 files changed, 22 insertions(+), 16 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
index 28bf2f9..4f95997 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
@@ -26,7 +26,7 @@ public class ContainerStreamingDestination implements StreamingDestination {
return Paths.get(containerData.getContainerDBFile().getAbsolutePath(),
parts[1]);
} else if (parts[0].equals("DATA")) {
- return Paths.get(containerData.getContainerPath(),
+ return Paths.get(containerData.getChunksPath(),
parts[1]);
}
throw new IllegalArgumentException("Unknown container part: " + parts[0]);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
index 00997de..b289e95 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
@@ -47,16 +47,15 @@ public class ContainerStreamingSource implements StreamingSource {
filesToStream.put("container.yaml",
container.getContainerData().getContainerFile().toPath());
- final String dataPath =
- container.getContainerData().getContainerPath();
- final List<Path> dataFiles =
- Files.list(Paths.get(dataPath)).collect(Collectors.toList());
- for (Path dataFile : dataFiles) {
- if (!Files.isDirectory(dataFile)) {
- filesToStream
- .put("DATA/" + dataFile.getFileName().toString(), dataFile);
- }
- }
+ final Path dataPath =
+ Paths.get(container.getContainerData().getChunksPath());
+
+ Files.walk(dataPath)
+ .filter(Files::isRegularFile)
+ .forEach(path -> {
+ filesToStream
+ .put("DATA/" + dataPath.relativize(path), path);
+ });
} catch (IOException e) {
throw new RuntimeException("Couldn't stream countainer " + containerId,
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
index d6076e5..5182d50 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
@@ -42,7 +42,6 @@ public class DirstreamClientHandler extends ChannelInboundHandlerAdapter {
buffer.readBytes(1);
String[] parts = currentFileName.toString().split(" ", 2);
remaining = Long.parseLong(parts[0]);
- System.out.println("Starting to write to " + parts[1]);
Path destFilePath = destination.mapToDestination(parts[1]);
Files.createDirectories(destFilePath.getParent());
this.destFile =
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
index d4b99eb..84572ed 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
@@ -23,7 +23,8 @@ public class DirstreamServerHandler extends ChannelInboundHandlerAdapter {
throws Exception {
ChannelFuture future = null;
- for (Map.Entry<String, Path> entries : source.getFilesToStream("1")
+ for (Map.Entry<String, Path> entries : source
+ .getFilesToStream((String) msg)
.entrySet()) {
Path file = entries.getValue();
String name = entries.getKey();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
index 7c680d4..43589aa 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
@@ -1,6 +1,7 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -42,6 +43,7 @@ import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -80,9 +82,14 @@ public class TestReplicationService {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
- return destinationContainerSet.getContainer(1L) != null;
+ return destinationContainerSet.getContainer(2L) != null;
}
}, 1000, 10_000);
+
+ final Path firstBlockFile = destDir.resolve(
+ "hdds/" + scmUuid + "/current/containerDir0/" + 2L + "/chunks/1.block");
+ Assert.assertTrue("Block files is missing " + firstBlockFile,
+ Files.exists(firstBlockFile));
}
@NotNull
@@ -119,7 +126,7 @@ public class TestReplicationService {
ReplicationSupervisor supervisor =
new ReplicationSupervisor(destinationContainerSet, replicator, 10);
- replicator.replicate(new ReplicationTask(1L, sourceDatanodes));
+ replicator.replicate(new ReplicationTask(2L, sourceDatanodes));
return destinationContainerSet;
}
@@ -141,7 +148,7 @@ public class TestReplicationService {
v.chooseVolume(sourceVolumes.getVolumesList(), 5L);
volume.format(clusterUuid);
- KeyValueContainerData kvd = new KeyValueContainerData(1L, "/tmp/asd");
+ KeyValueContainerData kvd = new KeyValueContainerData(2L, "/tmp/asd");
kvd.setState(State.OPEN);
kvd.assignToVolume(scmUuid.toString(), volume);
kvd.setSchemaVersion(OzoneConsts.SCHEMA_V2);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org