You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:46:14 UTC

[ozone] 25/27: fix replication

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 4afb0856b87b13b04da916f4691b9b000d1671c7
Author: Elek Márton <el...@apache.org>
AuthorDate: Tue Feb 23 14:17:43 2021 +0100

    fix replication
---
 .../replication/ContainerStreamingDestination.java    |  2 +-
 .../replication/ContainerStreamingSource.java         | 19 +++++++++----------
 .../container/stream/DirstreamClientHandler.java      |  1 -
 .../container/stream/DirstreamServerHandler.java      |  3 ++-
 .../container/replication/TestReplicationService.java | 13 ++++++++++---
 5 files changed, 22 insertions(+), 16 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
index 28bf2f9..4f95997 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
@@ -26,7 +26,7 @@ public class ContainerStreamingDestination implements StreamingDestination {
       return Paths.get(containerData.getContainerDBFile().getAbsolutePath(),
           parts[1]);
     } else if (parts[0].equals("DATA")) {
-      return Paths.get(containerData.getContainerPath(),
+      return Paths.get(containerData.getChunksPath(),
           parts[1]);
     }
     throw new IllegalArgumentException("Unknown container part: " + parts[0]);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
index 00997de..b289e95 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
@@ -47,16 +47,15 @@ public class ContainerStreamingSource implements StreamingSource {
       filesToStream.put("container.yaml",
           container.getContainerData().getContainerFile().toPath());
 
-      final String dataPath =
-          container.getContainerData().getContainerPath();
-      final List<Path> dataFiles =
-          Files.list(Paths.get(dataPath)).collect(Collectors.toList());
-      for (Path dataFile : dataFiles) {
-        if (!Files.isDirectory(dataFile)) {
-          filesToStream
-              .put("DATA/" + dataFile.getFileName().toString(), dataFile);
-        }
-      }
+      final Path dataPath =
+          Paths.get(container.getContainerData().getChunksPath());
+
+      Files.walk(dataPath)
+          .filter(Files::isRegularFile)
+          .forEach(path -> {
+            filesToStream
+                .put("DATA/" + dataPath.relativize(path), path);
+          });
 
     } catch (IOException e) {
       throw new RuntimeException("Couldn't stream countainer " + containerId,
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
index d6076e5..5182d50 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
@@ -42,7 +42,6 @@ public class DirstreamClientHandler extends ChannelInboundHandlerAdapter {
                 buffer.readBytes(1);
                 String[] parts = currentFileName.toString().split(" ", 2);
                 remaining = Long.parseLong(parts[0]);
-                System.out.println("Starting to write to " + parts[1]);
                 Path destFilePath = destination.mapToDestination(parts[1]);
                 Files.createDirectories(destFilePath.getParent());
                 this.destFile =
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
index d4b99eb..84572ed 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
@@ -23,7 +23,8 @@ public class DirstreamServerHandler extends ChannelInboundHandlerAdapter {
         throws Exception {
         ChannelFuture future = null;
 
-        for (Map.Entry<String, Path> entries : source.getFilesToStream("1")
+        for (Map.Entry<String, Path> entries : source
+            .getFilesToStream((String) msg)
             .entrySet()) {
             Path file = entries.getValue();
             String name = entries.getKey();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
index 7c680d4..43589aa 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
@@ -1,6 +1,7 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -42,6 +43,7 @@ import com.google.common.collect.Maps;
 import org.apache.commons.io.FileUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -80,9 +82,14 @@ public class TestReplicationService {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        return destinationContainerSet.getContainer(1L) != null;
+        return destinationContainerSet.getContainer(2L) != null;
       }
     }, 1000, 10_000);
+
+    final Path firstBlockFile = destDir.resolve(
+        "hdds/" + scmUuid + "/current/containerDir0/" + 2L + "/chunks/1.block");
+    Assert.assertTrue("Block files is missing " + firstBlockFile,
+        Files.exists(firstBlockFile));
   }
 
   @NotNull
@@ -119,7 +126,7 @@ public class TestReplicationService {
 
     ReplicationSupervisor supervisor =
         new ReplicationSupervisor(destinationContainerSet, replicator, 10);
-    replicator.replicate(new ReplicationTask(1L, sourceDatanodes));
+    replicator.replicate(new ReplicationTask(2L, sourceDatanodes));
     return destinationContainerSet;
   }
 
@@ -141,7 +148,7 @@ public class TestReplicationService {
         v.chooseVolume(sourceVolumes.getVolumesList(), 5L);
     volume.format(clusterUuid);
 
-    KeyValueContainerData kvd = new KeyValueContainerData(1L, "/tmp/asd");
+    KeyValueContainerData kvd = new KeyValueContainerData(2L, "/tmp/asd");
     kvd.setState(State.OPEN);
     kvd.assignToVolume(scmUuid.toString(), volume);
     kvd.setSchemaVersion(OzoneConsts.SCHEMA_V2);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org