You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/06/23 10:50:42 UTC

[GitHub] [ozone] elek opened a new pull request #2360: Streaming closed container

elek opened a new pull request #2360:
URL: https://github.com/apache/ozone/pull/2360


   ## What changes were proposed in this pull request?
   
   It's a refactor of the Closed container replication path
   
    * Doesn't use `/tmp` any more
    * Based on netty based streaming (not on GRPC)
    * [Performance](https://github.com/elek/ozone-notes/tree/master/20210610-closed-container-replication)  is at least 2x
   
   Please see the Jira description for more details...
    
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-5188
   
   ## How was this patch tested?
   
   Copied 100TB data on high-density datanode cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r672352984



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);

Review comment:
       Can we use `StreamingException` or other specific exception (whereever applicable)?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
##########
@@ -155,17 +157,30 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
     dnCertClient = certClient;
     nextHB = new AtomicLong(Time.monotonicNow());
 
+    SslContext sslContext = null;
+    SecurityConfig securityConfig = new SecurityConfig(conf);
+    if (securityConfig.isSecurityEnabled()) {
+
+      sslContext = SslContextBuilder.forClient()
+          .trustManager(InsecureTrustManagerFactory.INSTANCE)

Review comment:
       Is `InsecureTrustManagerFactory` intentional here in non-test code?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingException.java
##########
@@ -24,7 +24,7 @@
  */
 public class StreamingException extends RuntimeException {
 
-  public StreamingException(InterruptedException ex) {
+  public StreamingException(Exception ex) {

Review comment:
       With the new `StreamingException(Throwable)` constructor, I think this one is no longer needed.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
##########
@@ -83,7 +83,12 @@ public void doRead(ChannelHandlerContext ctx, ByteBuf buffer)
         buffer.skipBytes(1);
         String[] parts = currentFileName.toString().split(" ", 2);
         remaining = Long.parseLong(parts[0]);
-        Path destFilePath = destination.mapToDestination(parts[1]);
+        final String logicalFileName = parts[1];
+        if (currentFileName.toString().equals(END_MARKER)) {

Review comment:
       `currentFileName.toString()` could be stored in variable (used twice).

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {
+      throw new RuntimeException(
+          "Only (quasi)closed containers can be exported, but ContainerId=: "
+              + id + " is in state " + container.getContainerState());
+    }
+
+    try {
+
+      final File dbPath =
+          container.getContainerData().getContainerDBFile();
+
+      final List<Path> dbFiles =
+          Files.list(dbPath.toPath()).collect(Collectors.toList());

Review comment:
       Sonar reports that `Files.list` (and `Files.walk` below) should be used in try-with-resources.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,162 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();
     }
+
   }
 
   @Override
   public void replicate(ReplicationTask task) {
     long containerID = task.getContainerId();
-
+    if (clusterId.get() == null) {
+      LOG.error("Replication task is called before first SCM call");
+      task.setStatus(Status.FAILED);
+    }
     List<DatanodeDetails> sourceDatanodes = task.getSources();
 
     LOG.info("Starting replication of container {} from {}", containerID,
         sourceDatanodes);
 
-    CompletableFuture<Path> tempTarFile = downloader
-        .getContainerDataFromReplicas(containerID,
-            sourceDatanodes);
-    if (tempTarFile == null) {
-      task.setStatus(Status.FAILED);
-    } else {
-      try {
-        // Wait for the download. This thread pool is limiting the parallel
-        // downloads, so it's ok to block here and wait for the full download.
-        Path path = tempTarFile.get();
-        long bytes = Files.size(path);
-
-        LOG.info("Container {} is downloaded with size {}, starting to import.",
-                containerID, bytes);
-        task.setTransferredBytes(bytes);
-
-        importContainer(containerID, path);
+    try {
+
+      long maxContainerSize = (long) config.getStorageSize(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerID,
+              ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+      //choose a volume
+      final HddsVolume volume = volumeChoosingPolicy
+          .chooseVolume(
+              StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+              maxContainerSize);
+
+      //fill the path fields
+      containerData.assignToVolume(clusterId.get(), volume);
+
+      Collections.shuffle(sourceDatanodes);
+
+      //download data
+      final DatanodeDetails datanode = sourceDatanodes.get(0);
+
+      try (StreamingClient client =
+               new StreamingClient(datanode.getIpAddress(),
+                   datanode.getPort(Name.REPLICATION).getValue(),
+                   new ContainerStreamingDestination(containerData),
+                   sslContext)
+      ) {
+        client.stream("" + containerData.getContainerID());
+
+        LOG.info("Container " + containerData.getContainerID()
+            + " is downloaded successfully");

Review comment:
       ```suggestion
           LOG.info("Container {} is downloaded successfully", containerID);
   ```

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,162 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();

Review comment:
       Leftover `printStackTrace`?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {

Review comment:
       Should we check this before `release()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#issuecomment-895128615


   Hey @elek , thanks for improving the memory usage of DN replication.  
   
   I generally support the idea, which will makes DN more stable.  After gone through the code,  one question is what's the expected state of metadata and data consistency and integrity if the file under container directory is transferred one by one. 
   
   We have observed several container replication issues in our cluster, such as need more metrics, two much grpc thread, no flow control on SCM side,  downloaded .tar.gz failed because of missing container file or chunks directory,  temp files left by DN crash is not clean, etc,  and are trying to mitigate them one by one.  
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#issuecomment-880096223


   Thanks a lot @elek for this improvement.  I am yet to review the change in detail, but I have tried running the existing replication smoketest and it failed:
   
   ```
   Creating ozonesecure_datanode_3 ... done
   ==============================================================================
   Wait :: Wait for replication to succeed
   ==============================================================================
   Wait Until Container Replicated                                       | FAIL |
   Test timeout 5 minutes exceeded.
   ------------------------------------------------------------------------------
   Wait :: Wait for replication to succeed                               | FAIL |
   1 test, 0 passed, 1 failed
   ```
   
   Same in unsecure.
   
   The only error message I see:
   
   ```
   datanode_3  | 2021-07-14 16:35:04,130 [ContainerReplicationThread-8] INFO replication.DownloadAndImportReplicator: Starting replication of container 1 from [b6bfac09-f440-4a0c-99df-72a2ef6d7c1a{ip: 172.24.0.4, host: ozonesecure_datanode_2.ozonesecure_default, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}, 0e9c8c1e-57de-4556-870d-09e757478913{ip: 172.24.0.7, host: ozonesecure_datanode_1.ozonesecure_default, ports: [REPLICATION=9886, RATIS=9858, RATIS_ADMIN=9857, RATIS_SERVER=9856, STANDALONE=9859], networkLocation: /default-rack, certSerialId: null, persistedOpState: IN_SERVICE, persistedOpStateExpiryEpochSec: 0}]
   ...
   datanode_3  | 2021-07-14 16:35:14,148 [ContainerReplicationThread-8] ERROR replication.DownloadAndImportReplicator: Error on replicating container 1
   datanode_3  | org.apache.hadoop.ozone.container.stream.StreamingException: Streaming is failed. Not all files are streamed. Please check the log of the server. Last (partial?) streamed file: 
   datanode_3  | 	at org.apache.hadoop.ozone.container.stream.StreamingClient.stream(StreamingClient.java:99)
   datanode_3  | 	at org.apache.hadoop.ozone.container.stream.StreamingClient.stream(StreamingClient.java:86)
   datanode_3  | 	at org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator.replicate(DownloadAndImportReplicator.java:142)
   datanode_3  | 	at org.apache.hadoop.ozone.container.replication.ReplicationSupervisor$TaskRunner.run(ReplicationSupervisor.java:142)
   datanode_3  | 2021-07-14 16:35:14,148 [ContainerReplicationThread-8] ERROR replication.ReplicationSupervisor: Container 1 can't be downloaded from any of the datanodes.
   ```
   
   No related messages found on datanode 1 or 2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
elek commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r673760896



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {
+      throw new RuntimeException(
+          "Only (quasi)closed containers can be exported, but ContainerId=: "
+              + id + " is in state " + container.getContainerState());
+    }
+
+    try {
+
+      final File dbPath =
+          container.getContainerData().getContainerDBFile();
+
+      final List<Path> dbFiles =
+          Files.list(dbPath.toPath()).collect(Collectors.toList());

Review comment:
       That's an interesting question, I never realized that streams are `AutoClosable`. Tried to find out more information and found this one: 
   
   https://www.baeldung.com/java-stream-close
   
   > When dealing with these sorts of streams, we shouldn't close them explicitly.
   
   I tried to check the implementation of `Files.list`/`Files.walk` and based on my understanding we don't need to close them as there are no open resources (I think `AbstractPipeline.close() is called in both cases). 
   
   But please let me know if I am wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
elek commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r673765404



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {

Review comment:
       Yes, thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r674658876



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {
+      throw new RuntimeException(
+          "Only (quasi)closed containers can be exported, but ContainerId=: "
+              + id + " is in state " + container.getContainerState());
+    }
+
+    try {
+
+      final File dbPath =
+          container.getContainerData().getContainerDBFile();
+
+      final List<Path> dbFiles =
+          Files.list(dbPath.toPath()).collect(Collectors.toList());

Review comment:
       Javadoc for `Files.list` says:
   
   > If timely disposal of file system resources is required, the try-with-resources construct should be used to ensure that the stream's `close` method is invoked after the stream operations are completed.
   
   You can also verify this by adding an `onClose` handler to the stream:
   
   ```
     @Test
     public void filesListClose() throws Exception {
       org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(java.nio.file.Files.class);
       java.util.concurrent.atomic.AtomicBoolean closed = new java.util.concurrent.atomic.AtomicBoolean();
       java.util.stream.Stream<java.nio.file.Path> list = java.nio.file.Files
           .list(java.nio.file.Paths.get("/etc"))
           .onClose(() -> closed.set(true));
       log.info("ls -l /etc | wc -l: {}", list.count());
       org.apache.ozone.test.GenericTestUtils.waitFor(closed::get, 100, 10000);
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#issuecomment-1061714926


   /pending


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek closed pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
elek closed pull request #2360:
URL: https://github.com/apache/ozone/pull/2360


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r685053664



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,158 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException | IllegalAccessException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
     }
   }
 
   @Override
   public void replicate(ReplicationTask task) {
     long containerID = task.getContainerId();
-
+    if (clusterId.get() == null) {
+      LOG.error("Replication task is called before first SCM call");
+      task.setStatus(Status.FAILED);
+    }
     List<DatanodeDetails> sourceDatanodes = task.getSources();
 
     LOG.info("Starting replication of container {} from {}", containerID,
         sourceDatanodes);
 
-    CompletableFuture<Path> tempTarFile = downloader
-        .getContainerDataFromReplicas(containerID,
-            sourceDatanodes);
-    if (tempTarFile == null) {
-      task.setStatus(Status.FAILED);
-    } else {
-      try {
-        // Wait for the download. This thread pool is limiting the parallel
-        // downloads, so it's ok to block here and wait for the full download.
-        Path path = tempTarFile.get();
-        long bytes = Files.size(path);
-
-        LOG.info("Container {} is downloaded with size {}, starting to import.",
-                containerID, bytes);
-        task.setTransferredBytes(bytes);
-
-        importContainer(containerID, path);
+    try {
+
+      long maxContainerSize = (long) config.getStorageSize(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerID,
+              ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+      //choose a volume
+      final HddsVolume volume = volumeChoosingPolicy
+          .chooseVolume(
+              StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+              maxContainerSize);
+
+      //fill the path fields
+      containerData.assignToVolume(clusterId.get(), volume);
+
+      Collections.shuffle(sourceDatanodes);
+
+      //download data
+      final DatanodeDetails datanode = sourceDatanodes.get(0);
+
+      try (StreamingClient client =
+               new StreamingClient(datanode.getIpAddress(),
+                   datanode.getPort(Name.REPLICATION).getValue(),
+                   new ContainerStreamingDestination(containerData),
+                   sslContext)
+      ) {
+        client.stream("" + containerData.getContainerID());
+
+        LOG.info("Container {} is downloaded successfully", containerID);
+        KeyValueContainerData loadedContainerData =
+            updateContainerData(containerData);
+        LOG.info("Container {} is downloaded, starting to import.",
+            containerID);
+        importContainer(loadedContainerData);
         LOG.info("Container {} is replicated successfully", containerID);
         task.setStatus(Status.DONE);
-      } catch (Exception e) {
-        LOG.error("Container {} replication was unsuccessful.", containerID, e);
-        task.setStatus(Status.FAILED);
+
       }
+    } catch (IOException | RuntimeException ex) {
+      LOG.error("Error on replicating container " + containerID, ex);
+      task.setStatus(Status.FAILED);

Review comment:
       Should the partically downloaded container be deleted in case of failure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r685034118



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
##########
@@ -155,17 +156,30 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
     dnCertClient = certClient;
     nextHB = new AtomicLong(Time.monotonicNow());
 
+    SslContext sslContext = null;
+    SecurityConfig securityConfig = new SecurityConfig(conf);
+    if (securityConfig.isSecurityEnabled()) {
+
+      sslContext = SslContextBuilder.forClient()
+          .trustManager(certClient.getCACertificate())
+          .clientAuth(ClientAuth.REQUIRE)
+          .keyManager(certClient.getPrivateKey(), certClient.getCertificate())
+          .build();
+
+    }
+
     ContainerReplicator replicator =
-        new DownloadAndImportReplicator(container.getContainerSet(),
-            container.getController(),
-            new SimpleContainerDownloader(conf, dnCertClient),
-            new TarContainerPacker());
+        new DownloadAndImportReplicator(conf,
+            container::getClusterId,
+            container.getContainerSet(),
+            container.getVolumeSet(),
+            sslContext);
 
     replicatorMetrics = new MeasuredReplicator(replicator);
 
     supervisor =
-        new ReplicationSupervisor(container.getContainerSet(), context,
-            replicatorMetrics, dnConf.getReplicationMaxStreams());
+        new ReplicationSupervisor(container.getContainerSet(), replicator,
+            dnConf.getReplicationMaxStreams());

Review comment:
       Will this lead to replicatorMetrics never be used? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r672352984



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);

Review comment:
       Can we use `StreamingException` or other specific exception (whereever applicable)?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
##########
@@ -155,17 +157,30 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
     dnCertClient = certClient;
     nextHB = new AtomicLong(Time.monotonicNow());
 
+    SslContext sslContext = null;
+    SecurityConfig securityConfig = new SecurityConfig(conf);
+    if (securityConfig.isSecurityEnabled()) {
+
+      sslContext = SslContextBuilder.forClient()
+          .trustManager(InsecureTrustManagerFactory.INSTANCE)

Review comment:
       Is `InsecureTrustManagerFactory` intentional here in non-test code?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingException.java
##########
@@ -24,7 +24,7 @@
  */
 public class StreamingException extends RuntimeException {
 
-  public StreamingException(InterruptedException ex) {
+  public StreamingException(Exception ex) {

Review comment:
       With the new `StreamingException(Throwable)` constructor, I think this one is no longer needed.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
##########
@@ -83,7 +83,12 @@ public void doRead(ChannelHandlerContext ctx, ByteBuf buffer)
         buffer.skipBytes(1);
         String[] parts = currentFileName.toString().split(" ", 2);
         remaining = Long.parseLong(parts[0]);
-        Path destFilePath = destination.mapToDestination(parts[1]);
+        final String logicalFileName = parts[1];
+        if (currentFileName.toString().equals(END_MARKER)) {

Review comment:
       `currentFileName.toString()` could be stored in variable (used twice).

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {
+      throw new RuntimeException(
+          "Only (quasi)closed containers can be exported, but ContainerId=: "
+              + id + " is in state " + container.getContainerState());
+    }
+
+    try {
+
+      final File dbPath =
+          container.getContainerData().getContainerDBFile();
+
+      final List<Path> dbFiles =
+          Files.list(dbPath.toPath()).collect(Collectors.toList());

Review comment:
       Sonar reports that `Files.list` (and `Files.walk` below) should be used in try-with-resources.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,162 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();
     }
+
   }
 
   @Override
   public void replicate(ReplicationTask task) {
     long containerID = task.getContainerId();
-
+    if (clusterId.get() == null) {
+      LOG.error("Replication task is called before first SCM call");
+      task.setStatus(Status.FAILED);
+    }
     List<DatanodeDetails> sourceDatanodes = task.getSources();
 
     LOG.info("Starting replication of container {} from {}", containerID,
         sourceDatanodes);
 
-    CompletableFuture<Path> tempTarFile = downloader
-        .getContainerDataFromReplicas(containerID,
-            sourceDatanodes);
-    if (tempTarFile == null) {
-      task.setStatus(Status.FAILED);
-    } else {
-      try {
-        // Wait for the download. This thread pool is limiting the parallel
-        // downloads, so it's ok to block here and wait for the full download.
-        Path path = tempTarFile.get();
-        long bytes = Files.size(path);
-
-        LOG.info("Container {} is downloaded with size {}, starting to import.",
-                containerID, bytes);
-        task.setTransferredBytes(bytes);
-
-        importContainer(containerID, path);
+    try {
+
+      long maxContainerSize = (long) config.getStorageSize(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerID,
+              ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+      //choose a volume
+      final HddsVolume volume = volumeChoosingPolicy
+          .chooseVolume(
+              StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+              maxContainerSize);
+
+      //fill the path fields
+      containerData.assignToVolume(clusterId.get(), volume);
+
+      Collections.shuffle(sourceDatanodes);
+
+      //download data
+      final DatanodeDetails datanode = sourceDatanodes.get(0);
+
+      try (StreamingClient client =
+               new StreamingClient(datanode.getIpAddress(),
+                   datanode.getPort(Name.REPLICATION).getValue(),
+                   new ContainerStreamingDestination(containerData),
+                   sslContext)
+      ) {
+        client.stream("" + containerData.getContainerID());
+
+        LOG.info("Container " + containerData.getContainerID()
+            + " is downloaded successfully");

Review comment:
       ```suggestion
           LOG.info("Container {} is downloaded successfully", containerID);
   ```

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,162 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();

Review comment:
       Leftover `printStackTrace`?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {

Review comment:
       Should we check this before `release()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
elek commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r682390549



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {
+      throw new RuntimeException(
+          "Only (quasi)closed containers can be exported, but ContainerId=: "
+              + id + " is in state " + container.getContainerState());
+    }
+
+    try {
+
+      final File dbPath =
+          container.getContainerData().getContainerDBFile();
+
+      final List<Path> dbFiles =
+          Files.list(dbPath.toPath()).collect(Collectors.toList());

Review comment:
       > If timely disposal of file system resources is required, 
   
   Based on my understanding, this is about the `DirectoryStream`. The full context:
   
   >  The returned stream encapsulates a DirectoryStream. If timely disposal of file system resources is required, the try-with-resources construct should be used to ensure that the stream's close method is invoked after the stream operations are completed. 
   
   Based on my understanding, it suggests to explicitly close the stream if you need *timely* disposal of resources. I couldn't see any meaningful operation in the related close methods. So I assume the disposal of resources will happen anyway.
   
   At least this is my understanding.
   
   But to remain on the safe side, I just added the try-catch-resources 76ed04dd9.
    
   Let me know if you have any other concerns, try to help this PR to be merged as it's quite big...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] ChenSammi commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
ChenSammi commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r685057499



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingException;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {
+      throw new StreamingException(
+          "Only (quasi)closed containers can be exported, but ContainerId=: "
+              + id + " is in state " + container.getContainerState());
+    }
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new StreamingException("Container couldn't be released: " + id, e);
+    }
+
+
+    try {
+
+      final File dbPath =
+          container.getContainerData().getContainerDBFile();
+
+      List<Path> dbFiles;
+      try (Stream<Path> files = Files.list(dbPath.toPath())) {
+        dbFiles = files.collect(Collectors.toList());
+      }
+
+      for (Path dbFile : dbFiles) {
+        if (dbFile.getFileName() != null) {
+          filesToStream.put("DB/" + dbFile.getFileName(), dbFile);
+        }
+      }
+
+      filesToStream.put("container.yaml",
+          container.getContainerData().getContainerFile().toPath());
+
+      final Path dataPath =
+          Paths.get(container.getContainerData().getChunksPath());
+
+      try (Stream<Path> walk = Files.walk(dataPath)) {
+
+        walk.filter(Files::isRegularFile)
+            .forEach(path -> {
+              filesToStream
+                  .put("DATA/" + dataPath.relativize(path), path);
+            });
+      }
+      
+    } catch (IOException e) {
+      throw new StreamingException("Couldn't stream container " + containerId,
+          e);
+    }
+    return filesToStream;

Review comment:
       There is a lock protection during the whole container transfer phase currenlty,to prevent concurrent modification of the client, say delete some blocks.  
   
   Now this lock is removed.  I'm more sure If the file in filesToStream gets deleted before transmitted to server, what will be the consequence?  How to guarantee that the metadata and chunk data is consistent. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek commented on pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
elek commented on pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#issuecomment-882316584


   Thanks to checking it @adoroszlai 
   
   I merged this with the latest master and fixed the earlier merge problems. Now the replication acceptance test is passing:
   
   ![image](https://user-images.githubusercontent.com/170549/126121057-c7a48092-0ee5-4e91-a27e-d13934d5a03b.png)
   
   And I had a full green build:
   
   https://github.com/elek/ozone/actions/runs/1037876107
   
   The pr is ready to review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r672352984



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);

Review comment:
       Can we use `StreamingException` or other specific exception (whereever applicable)?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
##########
@@ -155,17 +157,30 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
     dnCertClient = certClient;
     nextHB = new AtomicLong(Time.monotonicNow());
 
+    SslContext sslContext = null;
+    SecurityConfig securityConfig = new SecurityConfig(conf);
+    if (securityConfig.isSecurityEnabled()) {
+
+      sslContext = SslContextBuilder.forClient()
+          .trustManager(InsecureTrustManagerFactory.INSTANCE)

Review comment:
       Is `InsecureTrustManagerFactory` intentional here in non-test code?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingException.java
##########
@@ -24,7 +24,7 @@
  */
 public class StreamingException extends RuntimeException {
 
-  public StreamingException(InterruptedException ex) {
+  public StreamingException(Exception ex) {

Review comment:
       With the new `StreamingException(Throwable)` constructor, I think this one is no longer needed.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
##########
@@ -83,7 +83,12 @@ public void doRead(ChannelHandlerContext ctx, ByteBuf buffer)
         buffer.skipBytes(1);
         String[] parts = currentFileName.toString().split(" ", 2);
         remaining = Long.parseLong(parts[0]);
-        Path destFilePath = destination.mapToDestination(parts[1]);
+        final String logicalFileName = parts[1];
+        if (currentFileName.toString().equals(END_MARKER)) {

Review comment:
       `currentFileName.toString()` could be stored in variable (used twice).

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {
+      throw new RuntimeException(
+          "Only (quasi)closed containers can be exported, but ContainerId=: "
+              + id + " is in state " + container.getContainerState());
+    }
+
+    try {
+
+      final File dbPath =
+          container.getContainerData().getContainerDBFile();
+
+      final List<Path> dbFiles =
+          Files.list(dbPath.toPath()).collect(Collectors.toList());

Review comment:
       Sonar reports that `Files.list` (and `Files.walk` below) should be used in try-with-resources.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,162 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();
     }
+
   }
 
   @Override
   public void replicate(ReplicationTask task) {
     long containerID = task.getContainerId();
-
+    if (clusterId.get() == null) {
+      LOG.error("Replication task is called before first SCM call");
+      task.setStatus(Status.FAILED);
+    }
     List<DatanodeDetails> sourceDatanodes = task.getSources();
 
     LOG.info("Starting replication of container {} from {}", containerID,
         sourceDatanodes);
 
-    CompletableFuture<Path> tempTarFile = downloader
-        .getContainerDataFromReplicas(containerID,
-            sourceDatanodes);
-    if (tempTarFile == null) {
-      task.setStatus(Status.FAILED);
-    } else {
-      try {
-        // Wait for the download. This thread pool is limiting the parallel
-        // downloads, so it's ok to block here and wait for the full download.
-        Path path = tempTarFile.get();
-        long bytes = Files.size(path);
-
-        LOG.info("Container {} is downloaded with size {}, starting to import.",
-                containerID, bytes);
-        task.setTransferredBytes(bytes);
-
-        importContainer(containerID, path);
+    try {
+
+      long maxContainerSize = (long) config.getStorageSize(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerID,
+              ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+      //choose a volume
+      final HddsVolume volume = volumeChoosingPolicy
+          .chooseVolume(
+              StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+              maxContainerSize);
+
+      //fill the path fields
+      containerData.assignToVolume(clusterId.get(), volume);
+
+      Collections.shuffle(sourceDatanodes);
+
+      //download data
+      final DatanodeDetails datanode = sourceDatanodes.get(0);
+
+      try (StreamingClient client =
+               new StreamingClient(datanode.getIpAddress(),
+                   datanode.getPort(Name.REPLICATION).getValue(),
+                   new ContainerStreamingDestination(containerData),
+                   sslContext)
+      ) {
+        client.stream("" + containerData.getContainerID());
+
+        LOG.info("Container " + containerData.getContainerID()
+            + " is downloaded successfully");

Review comment:
       ```suggestion
           LOG.info("Container {} is downloaded successfully", containerID);
   ```

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,162 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();

Review comment:
       Leftover `printStackTrace`?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {

Review comment:
       Should we check this before `release()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org