You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/07/19 16:08:30 UTC

[GitHub] [ozone] adoroszlai commented on a change in pull request #2360: HDDS-5188. Replace GRPC based closed-container replication with Netty based streaming

adoroszlai commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r672352984



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);

Review comment:
       Can we use `StreamingException` or other specific exception (whereever applicable)?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
##########
@@ -155,17 +157,30 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
     dnCertClient = certClient;
     nextHB = new AtomicLong(Time.monotonicNow());
 
+    SslContext sslContext = null;
+    SecurityConfig securityConfig = new SecurityConfig(conf);
+    if (securityConfig.isSecurityEnabled()) {
+
+      sslContext = SslContextBuilder.forClient()
+          .trustManager(InsecureTrustManagerFactory.INSTANCE)

Review comment:
       Is `InsecureTrustManagerFactory` intentional here in non-test code?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingException.java
##########
@@ -24,7 +24,7 @@
  */
 public class StreamingException extends RuntimeException {
 
-  public StreamingException(InterruptedException ex) {
+  public StreamingException(Exception ex) {

Review comment:
       With the new `StreamingException(Throwable)` constructor, I think this one is no longer needed.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
##########
@@ -83,7 +83,12 @@ public void doRead(ChannelHandlerContext ctx, ByteBuf buffer)
         buffer.skipBytes(1);
         String[] parts = currentFileName.toString().split(" ", 2);
         remaining = Long.parseLong(parts[0]);
-        Path destFilePath = destination.mapToDestination(parts[1]);
+        final String logicalFileName = parts[1];
+        if (currentFileName.toString().equals(END_MARKER)) {

Review comment:
       `currentFileName.toString()` could be stored in variable (used twice).

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {
+      throw new RuntimeException(
+          "Only (quasi)closed containers can be exported, but ContainerId=: "
+              + id + " is in state " + container.getContainerState());
+    }
+
+    try {
+
+      final File dbPath =
+          container.getContainerData().getContainerDBFile();
+
+      final List<Path> dbFiles =
+          Files.list(dbPath.toPath()).collect(Collectors.toList());

Review comment:
       Sonar reports that `Files.list` (and `Files.walk` below) should be used in try-with-resources.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,162 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();
     }
+
   }
 
   @Override
   public void replicate(ReplicationTask task) {
     long containerID = task.getContainerId();
-
+    if (clusterId.get() == null) {
+      LOG.error("Replication task is called before first SCM call");
+      task.setStatus(Status.FAILED);
+    }
     List<DatanodeDetails> sourceDatanodes = task.getSources();
 
     LOG.info("Starting replication of container {} from {}", containerID,
         sourceDatanodes);
 
-    CompletableFuture<Path> tempTarFile = downloader
-        .getContainerDataFromReplicas(containerID,
-            sourceDatanodes);
-    if (tempTarFile == null) {
-      task.setStatus(Status.FAILED);
-    } else {
-      try {
-        // Wait for the download. This thread pool is limiting the parallel
-        // downloads, so it's ok to block here and wait for the full download.
-        Path path = tempTarFile.get();
-        long bytes = Files.size(path);
-
-        LOG.info("Container {} is downloaded with size {}, starting to import.",
-                containerID, bytes);
-        task.setTransferredBytes(bytes);
-
-        importContainer(containerID, path);
+    try {
+
+      long maxContainerSize = (long) config.getStorageSize(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerID,
+              ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+      //choose a volume
+      final HddsVolume volume = volumeChoosingPolicy
+          .chooseVolume(
+              StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+              maxContainerSize);
+
+      //fill the path fields
+      containerData.assignToVolume(clusterId.get(), volume);
+
+      Collections.shuffle(sourceDatanodes);
+
+      //download data
+      final DatanodeDetails datanode = sourceDatanodes.get(0);
+
+      try (StreamingClient client =
+               new StreamingClient(datanode.getIpAddress(),
+                   datanode.getPort(Name.REPLICATION).getValue(),
+                   new ContainerStreamingDestination(containerData),
+                   sslContext)
+      ) {
+        client.stream("" + containerData.getContainerID());
+
+        LOG.info("Container " + containerData.getContainerID()
+            + " is downloaded successfully");

Review comment:
       ```suggestion
           LOG.info("Container {} is downloaded successfully", containerID);
   ```

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,162 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + volumeChoosingPolicyType,
+          ex);
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();

Review comment:
       Leftover `printStackTrace`?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+/**
+ * Streaming source for closed-container replication.
+ */
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final long containerId = Long.parseLong(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+
+    try {
+      container.release();
+    } catch (StorageContainerException e) {
+      throw new RuntimeException("Container couldn't be released: " + id, e);
+    }
+
+    if (container.getContainerState() != State.CLOSED
+        && container.getContainerState() != State.QUASI_CLOSED) {

Review comment:
       Should we check this before `release()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org