You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/24 16:27:35 UTC

[ozone] 17/36: HDDS-5743. [Ozone-Streaming] Add option to write files via streaming api in ofs and o3fs. (#2770)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 49564be02a68d4fbd4146ca04a4ccde993384301
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Fri Nov 19 12:14:05 2021 +0530

    HDDS-5743. [Ozone-Streaming] Add option to write files via streaming api in ofs and o3fs. (#2770)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   7 ++
 .../common/src/main/resources/ozone-default.xml    |   7 ++
 .../apache/hadoop/ozone/client/OzoneBucket.java    |   8 ++
 .../ozone/client/protocol/ClientProtocol.java      |   5 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  20 ++++
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  33 +++++++
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |   8 ++
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  39 ++++++++
 .../fs/ozone/BasicRootedOzoneFileSystem.java       |   8 ++
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |   3 +
 .../hadoop/fs/ozone/OzoneFSDataStreamOutput.java   | 103 +++++++++++++++++++++
 11 files changed, 241 insertions(+)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 7ee874a60b..de238e207f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -97,6 +97,13 @@ public final class OzoneConfigKeys {
   public static final int DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT
       = 9855;
 
+  /**
+   * Flag to enable ratis streaming on filesystem writes.
+   */
+  public static final String OZONE_FS_DATASTREAM_ENABLE =
+      "ozone.fs.datastream.enable";
+  public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = false;
+
   /**
    * When set to true, allocate a random free port for ozone container, so that
    * a mini cluster is able to launch multiple containers on a node.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e4d753d1b9..8dfa81a633 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3209,4 +3209,11 @@
       log level is debug. Ex: "CREATE_CONTAINER,READ_CONTAINER,UPDATE_CONTAINER".
     </description>
   </property>
+  <property>
+    <name>ozone.fs.datastream.enable</name>
+    <value>false</value>
+    <tag>OZONE, DATANODE</tag>
+    <description> To enable/disable filesystem write via ratis streaming.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index bf7f618b96..3cd0ca7f62 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -952,6 +952,14 @@ public class OzoneBucket extends WithMetadata {
             overWrite, recursive);
   }
 
+  public OzoneDataStreamOutput createStreamFile(String keyName, long size,
+      ReplicationConfig replicationConfig, boolean overWrite,
+      boolean recursive) throws IOException {
+    return proxy
+        .createStreamFile(volumeName, name, keyName, size, replicationConfig,
+            overWrite, recursive);
+  }
+
   /**
    * List the status for a file or a directory and its contents.
    *
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 5083f5a78a..6e0cc6602f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -702,6 +702,11 @@ public interface ClientProtocol {
       String keyName, long size, ReplicationConfig replicationConfig,
       boolean overWrite, boolean recursive) throws IOException;
 
+  @SuppressWarnings("checkstyle:parameternumber")
+  OzoneDataStreamOutput createStreamFile(String volumeName, String bucketName,
+      String keyName, long size, ReplicationConfig replicationConfig,
+      boolean overWrite, boolean recursive) throws IOException;
+
 
   /**
    * List the status for a file or a directory and its contents.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index a576b9e93e..af03a97b09 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1639,6 +1639,26 @@ public class RpcClient implements ClientProtocol {
     return createOutputStream(keySession, UUID.randomUUID().toString());
   }
 
+  @Override
+  public OzoneDataStreamOutput createStreamFile(String volumeName,
+      String bucketName, String keyName, long size,
+      ReplicationConfig replicationConfig, boolean overWrite, boolean recursive)
+      throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(size)
+        .setReplicationConfig(replicationConfig)
+        .setAcls(getAclList())
+        .setLatestVersionLocation(getLatestVersionLocation)
+        .build();
+    OpenKeySession keySession =
+        ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
+    return createDataStreamOutput(keySession, UUID.randomUUID().toString(),
+        replicationConfig);
+  }
+
   @Override
   public List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
       String keyName, boolean recursive, String startKey, long numEntries)
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index bec00e9299..be919b76fe 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -265,6 +266,38 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
     return this.bucketReplicationConfig;
   }
 
+  @Override
+  public OzoneFSDataStreamOutput createStreamFile(String key, short replication,
+      boolean overWrite, boolean recursive) throws IOException {
+    incrementCounter(Statistic.OBJECTS_CREATED, 1);
+    try {
+      OzoneDataStreamOutput ozoneDataStreamOutput = null;
+      if (replication == ReplicationFactor.ONE.getValue()
+          || replication == ReplicationFactor.THREE.getValue()) {
+
+        ReplicationConfig customReplicationConfig =
+            ReplicationConfig.adjustReplication(bucketReplicationConfig,
+                replication, config);
+        ozoneDataStreamOutput = bucket
+            .createStreamFile(key, 0, customReplicationConfig, overWrite,
+                recursive);
+      } else {
+        ozoneDataStreamOutput = bucket.createStreamFile(
+            key, 0, bucketReplicationConfig, overWrite, recursive);
+      }
+      return new OzoneFSDataStreamOutput(
+          ozoneDataStreamOutput.getByteBufStreamOutput());
+    } catch (OMException ex) {
+      if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
+          || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+        throw new FileAlreadyExistsException(
+            ex.getResult().name() + ": " + ex.getMessage());
+      } else {
+        throw ex;
+      }
+    }
+  }
+
   @Override
   public void renameKey(String key, String newKeyName) throws IOException {
     incrementCounter(Statistic.OBJECTS_RENAMED, 1);
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index 910ca45584..645992c3ab 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -259,6 +260,13 @@ public class BasicOzoneFileSystem extends FileSystem {
 
   private FSDataOutputStream createOutputStream(String key, short replication,
       boolean overwrite, boolean recursive) throws IOException {
+    boolean isRatisStreamingEnabled = getConf().getBoolean(
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE,
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT);
+    if (isRatisStreamingEnabled){
+      return new FSDataOutputStream(adapter.createStreamFile(key,
+          replication, overwrite, recursive), statistics);
+    }
     return new FSDataOutputStream(adapter.createFile(key,
         replication, overwrite, recursive), statistics);
   }
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 052c6c8ba6..5295aa33d6 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -352,6 +353,44 @@ public class BasicRootedOzoneClientAdapterImpl
     }
   }
 
+  @Override
+  public OzoneFSDataStreamOutput createStreamFile(String pathStr,
+      short replication, boolean overWrite, boolean recursive)
+      throws IOException {
+    incrementCounter(Statistic.OBJECTS_CREATED, 1);
+    OFSPath ofsPath = new OFSPath(pathStr);
+    if (ofsPath.isRoot() || ofsPath.isVolume() || ofsPath.isBucket()) {
+      throw new IOException("Cannot create file under root or volume.");
+    }
+    String key = ofsPath.getKeyName();
+    try {
+      // Hadoop CopyCommands class always sets recursive to true
+      OzoneBucket bucket = getBucket(ofsPath, recursive);
+      OzoneDataStreamOutput ozoneDataStreamOutput = null;
+      if (replication == ReplicationFactor.ONE.getValue()
+          || replication == ReplicationFactor.THREE.getValue()) {
+
+        ozoneDataStreamOutput = bucket.createStreamFile(key, 0,
+            ReplicationConfig.adjustReplication(
+                clientConfiguredReplicationConfig, replication, config),
+            overWrite, recursive);
+      } else {
+        ozoneDataStreamOutput = bucket.createStreamFile(
+            key, 0, clientConfiguredReplicationConfig, overWrite, recursive);
+      }
+      return new OzoneFSDataStreamOutput(
+          ozoneDataStreamOutput.getByteBufStreamOutput());
+    } catch (OMException ex) {
+      if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
+          || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+        throw new FileAlreadyExistsException(
+            ex.getResult().name() + ": " + ex.getMessage());
+      } else {
+        throw ex;
+      }
+    }
+  }
+
   @Override
   public void renameKey(String key, String newKeyName) throws IOException {
     throw new IOException("OFS doesn't support renameKey, use rename instead.");
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 0360e345b2..0bed09ba8a 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.ozone.OFSPath;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -235,6 +236,13 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
 
   private FSDataOutputStream createOutputStream(String key, short replication,
       boolean overwrite, boolean recursive) throws IOException {
+    boolean isRatisStreamingEnabled = getConf().getBoolean(
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE,
+        OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT);
+    if (isRatisStreamingEnabled){
+      return new FSDataOutputStream(adapter.createStreamFile(key,
+          replication, overwrite, recursive), statistics);
+    }
     return new FSDataOutputStream(adapter.createFile(key,
         replication, overwrite, recursive), statistics);
   }
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index 31bf351f01..24566cb83f 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -45,6 +45,9 @@ public interface OzoneClientAdapter {
   OzoneFSOutputStream createFile(String key, short replication,
       boolean overWrite, boolean recursive) throws IOException;
 
+  OzoneFSDataStreamOutput createStreamFile(String key, short replication,
+      boolean overWrite, boolean recursive) throws IOException;
+
   void renameKey(String key, String newKeyName) throws IOException;
 
   // Users should use rename instead of renameKey in OFS.
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
new file mode 100644
index 0000000000..515dbca92b
--- /dev/null
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * The ByteBuffer output stream for Ozone file system.
+ */
+public class OzoneFSDataStreamOutput extends OutputStream
+    implements ByteBufferStreamOutput {
+
+  private final ByteBufferStreamOutput byteBufferStreamOutput;
+
+  public OzoneFSDataStreamOutput(
+      ByteBufferStreamOutput byteBufferStreamOutput) {
+    this.byteBufferStreamOutput = byteBufferStreamOutput;
+  }
+
+  /**
+   * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
+   *
+   * @param b   the data.
+   * @param off the start offset in the data.
+   * @param len the number of bytes to write.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  public void write(ByteBuffer b, int off, int len)
+      throws IOException {
+    byteBufferStreamOutput.write(b, off, len);
+  }
+
+  /**
+   * Writes the specified byte to this output stream. The general
+   * contract for <code>write</code> is that one byte is written
+   * to the output stream. The byte to be written is the eight
+   * low-order bits of the argument <code>b</code>. The 24
+   * high-order bits of <code>b</code> are ignored.
+   * <p>
+   * Subclasses of <code>OutputStream</code> must provide an
+   * implementation for this method.
+   *
+   * @param b the <code>byte</code>.
+   * @throws IOException if an I/O error occurs. In particular,
+   *                     an <code>IOException</code> may be thrown if the
+   *                     output stream has been closed.
+   */
+  @Override
+  public void write(int b) throws IOException {
+    byte[] singleBytes = new byte[1];
+    singleBytes[0] = (byte) b;
+    byteBufferStreamOutput.write(ByteBuffer.wrap(singleBytes));
+  }
+
+  /**
+   * Flushes this DataStream output and forces any buffered output bytes
+   * to be written out.
+   *
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  public void flush() throws IOException {
+    byteBufferStreamOutput.flush();
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   *
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the
+   * close may fail require careful attention. It is strongly advised
+   * to relinquish the underlying resources and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+   * the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    byteBufferStreamOutput.close();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org