You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/10/25 01:15:47 UTC

[ozone] 38/38: HDDS-6955. [Ozone-streaming] Add explicit stream flag in ozone shell (#3559)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 653a120236f70829f592da88e70b87d94c36dc02
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Tue Jul 12 18:49:09 2022 +0800

    HDDS-6955. [Ozone-streaming] Add explicit stream flag in ozone shell (#3559)
    
    * Add stream option in ozone sh key
    
    (cherry picked from commit 4c7151b5e861c3f3c8bfd02db73e8731f10eafd6)
    (cherry picked from commit c84da64fbcd34e62b17f1bd2a52238b0497952e8)
---
 .../hadoop/hdds/client/ReplicationConfig.java      | 11 +++
 .../hadoop/ozone/shell/keys/PutKeyHandler.java     | 90 ++++++++++++++--------
 2 files changed, 67 insertions(+), 34 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
index 610419527a..7542409679 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
@@ -76,6 +76,17 @@ public interface ReplicationConfig {
     return parse(null, replication, config);
   }
 
+  static ReplicationConfig resolve(ReplicationConfig replicationConfig,
+      ReplicationConfig bucketReplicationConfig, ConfigurationSource conf) {
+    if (replicationConfig == null) {
+      replicationConfig = bucketReplicationConfig;
+    }
+    if (replicationConfig == null) {
+      replicationConfig = getDefault(conf);
+    }
+    return replicationConfig;
+  }
+
   /**
    * Helper method to serialize from proto.
    * <p>
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index 1f7c1ef7f4..16af2c5fd1 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -29,6 +29,7 @@ import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -42,13 +43,13 @@ import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.shell.OzoneAddress;
 
 import org.apache.commons.codec.digest.DigestUtils;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
 
 import org.apache.hadoop.ozone.shell.ShellReplicationOptions;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
+import picocli.CommandLine.Option;
 import picocli.CommandLine.Parameters;
 
 /**
@@ -61,6 +62,9 @@ public class PutKeyHandler extends KeyHandler {
   @Parameters(index = "1", arity = "1..1", description = "File to upload")
   private String fileName;
 
+  @Option(names = "--stream")
+  private boolean stream;
+
   @Mixin
   private ShellReplicationOptions replication;
 
@@ -96,41 +100,59 @@ public class PutKeyHandler extends KeyHandler {
     int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
         OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES);
 
-    Boolean useAsync = false;
-    if (dataFile.length() <= chunkSize ||
-        (replicationConfig != null &&
-        replicationConfig.getReplicationType()  == EC) ||
-        bucket.getReplicationConfig() instanceof ECReplicationConfig) {
-      useAsync = true;
-    }
-    if (useAsync) {
-      if (isVerbose()) {
-        out().println("API: async");
-      }
-      try (InputStream input = new FileInputStream(dataFile);
-           OutputStream output = bucket.createKey(keyName, dataFile.length(),
-               replicationConfig, keyMetadata)) {
-        IOUtils.copyBytes(input, output, chunkSize);
-      }
+    if (stream) {
+      stream(dataFile, bucket, keyName, keyMetadata,
+          replicationConfig, chunkSize);
     } else {
-      if (isVerbose()) {
-        out().println("API: streaming");
-      }
-      try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r");
-           OzoneDataStreamOutput out = bucket.createStreamKey(keyName,
-               dataFile.length(), replicationConfig, keyMetadata)) {
-        FileChannel ch = raf.getChannel();
-        long len = raf.length();
-        long off = 0;
-        while (len > 0) {
-          long writeLen = Math.min(len, chunkSize);
-          ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
-          out.write(bb);
-          off += writeLen;
-          len -= writeLen;
-        }
-      }
+      async(dataFile, bucket, keyName, keyMetadata,
+          replicationConfig, chunkSize);
+    }
+  }
+
+  void async(
+      File dataFile, OzoneBucket bucket,
+      String keyName, Map<String, String> keyMetadata,
+      ReplicationConfig replicationConfig, int chunkSize)
+      throws IOException {
+    if (isVerbose()) {
+      out().println("API: async");
+    }
+    try (InputStream input = new FileInputStream(dataFile);
+         OutputStream output = bucket.createKey(keyName, dataFile.length(),
+             replicationConfig, keyMetadata)) {
+      IOUtils.copyBytes(input, output, chunkSize);
     }
   }
 
+  void stream(
+      File dataFile, OzoneBucket bucket,
+      String keyName, Map<String, String> keyMetadata,
+      ReplicationConfig replicationConfig, int chunkSize)
+      throws IOException {
+    if (isVerbose()) {
+      out().println("API: streaming");
+    }
+    // In streaming mode, always resolve replication config at client side,
+    // because streaming is not compatible for writing EC keys.
+    replicationConfig = ReplicationConfig.resolve(replicationConfig,
+        bucket.getReplicationConfig(), getConf());
+    Preconditions.checkArgument(
+        !(replicationConfig instanceof ECReplicationConfig),
+        "Can not put EC key by streaming");
+
+    try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r");
+         OzoneDataStreamOutput out = bucket.createStreamKey(keyName,
+             dataFile.length(), replicationConfig, keyMetadata)) {
+      FileChannel ch = raf.getChannel();
+      long len = raf.length();
+      long off = 0;
+      while (len > 0) {
+        long writeLen = Math.min(len, chunkSize);
+        ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
+        out.write(bb);
+        off += writeLen;
+        len -= writeLen;
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org