You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ca...@apache.org on 2022/07/12 10:49:14 UTC
[ozone] branch HDDS-4454 updated: HDDS-6955. [Ozone-streaming] Add explicit stream flag in ozone shell (#3559)
This is an automated email from the ASF dual-hosted git repository.
captainzmc pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4454 by this push:
new 4c7151b5e8 HDDS-6955. [Ozone-streaming] Add explicit stream flag in ozone shell (#3559)
4c7151b5e8 is described below
commit 4c7151b5e861c3f3c8bfd02db73e8731f10eafd6
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Tue Jul 12 18:49:09 2022 +0800
HDDS-6955. [Ozone-streaming] Add explicit stream flag in ozone shell (#3559)
* Add stream option in ozone sh key
---
.../hadoop/hdds/client/ReplicationConfig.java | 11 +++
.../hadoop/ozone/shell/keys/PutKeyHandler.java | 90 ++++++++++++++--------
2 files changed, 67 insertions(+), 34 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
index 6135883158..2b88f943de 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java
@@ -76,6 +76,17 @@ public interface ReplicationConfig {
return parse(null, replication, config);
}
+ static ReplicationConfig resolve(ReplicationConfig replicationConfig,
+ ReplicationConfig bucketReplicationConfig, ConfigurationSource conf) {
+ if (replicationConfig == null) {
+ replicationConfig = bucketReplicationConfig;
+ }
+ if (replicationConfig == null) {
+ replicationConfig = getDefault(conf);
+ }
+ return replicationConfig;
+ }
+
/**
* Helper method to serialize from proto.
* <p>
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index 1f7c1ef7f4..16af2c5fd1 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -29,6 +29,7 @@ import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -42,13 +43,13 @@ import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.shell.OzoneAddress;
import org.apache.commons.codec.digest.DigestUtils;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
import org.apache.hadoop.ozone.shell.ShellReplicationOptions;
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
+import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;
/**
@@ -61,6 +62,9 @@ public class PutKeyHandler extends KeyHandler {
@Parameters(index = "1", arity = "1..1", description = "File to upload")
private String fileName;
+ @Option(names = "--stream")
+ private boolean stream;
+
@Mixin
private ShellReplicationOptions replication;
@@ -96,41 +100,59 @@ public class PutKeyHandler extends KeyHandler {
int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES);
- Boolean useAsync = false;
- if (dataFile.length() <= chunkSize ||
- (replicationConfig != null &&
- replicationConfig.getReplicationType() == EC) ||
- bucket.getReplicationConfig() instanceof ECReplicationConfig) {
- useAsync = true;
- }
- if (useAsync) {
- if (isVerbose()) {
- out().println("API: async");
- }
- try (InputStream input = new FileInputStream(dataFile);
- OutputStream output = bucket.createKey(keyName, dataFile.length(),
- replicationConfig, keyMetadata)) {
- IOUtils.copyBytes(input, output, chunkSize);
- }
+ if (stream) {
+ stream(dataFile, bucket, keyName, keyMetadata,
+ replicationConfig, chunkSize);
} else {
- if (isVerbose()) {
- out().println("API: streaming");
- }
- try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r");
- OzoneDataStreamOutput out = bucket.createStreamKey(keyName,
- dataFile.length(), replicationConfig, keyMetadata)) {
- FileChannel ch = raf.getChannel();
- long len = raf.length();
- long off = 0;
- while (len > 0) {
- long writeLen = Math.min(len, chunkSize);
- ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
- out.write(bb);
- off += writeLen;
- len -= writeLen;
- }
- }
+ async(dataFile, bucket, keyName, keyMetadata,
+ replicationConfig, chunkSize);
+ }
+ }
+
+ void async(
+ File dataFile, OzoneBucket bucket,
+ String keyName, Map<String, String> keyMetadata,
+ ReplicationConfig replicationConfig, int chunkSize)
+ throws IOException {
+ if (isVerbose()) {
+ out().println("API: async");
+ }
+ try (InputStream input = new FileInputStream(dataFile);
+ OutputStream output = bucket.createKey(keyName, dataFile.length(),
+ replicationConfig, keyMetadata)) {
+ IOUtils.copyBytes(input, output, chunkSize);
}
}
+ void stream(
+ File dataFile, OzoneBucket bucket,
+ String keyName, Map<String, String> keyMetadata,
+ ReplicationConfig replicationConfig, int chunkSize)
+ throws IOException {
+ if (isVerbose()) {
+ out().println("API: streaming");
+ }
+ // In streaming mode, always resolve replication config at client side,
+ // because streaming is not compatible for writing EC keys.
+ replicationConfig = ReplicationConfig.resolve(replicationConfig,
+ bucket.getReplicationConfig(), getConf());
+ Preconditions.checkArgument(
+ !(replicationConfig instanceof ECReplicationConfig),
+ "Can not put EC key by streaming");
+
+ try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r");
+ OzoneDataStreamOutput out = bucket.createStreamKey(keyName,
+ dataFile.length(), replicationConfig, keyMetadata)) {
+ FileChannel ch = raf.getChannel();
+ long len = raf.length();
+ long off = 0;
+ while (len > 0) {
+ long writeLen = Math.min(len, chunkSize);
+ ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
+ out.write(bb);
+ off += writeLen;
+ len -= writeLen;
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org