You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2021/11/19 06:44:21 UTC
[ozone] branch HDDS-4454 updated: HDDS-5743.[Ozone-Streaming] Add option to write files via streaming api in ofs and o3fs. (#2770)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4454 by this push:
new c117885 HDDS-5743.[Ozone-Streaming] Add option to write files via streaming api in ofs and o3fs. (#2770)
c117885 is described below
commit c11788569fcf5741bb5c885f779988fdea7ab485
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Fri Nov 19 12:14:05 2021 +0530
HDDS-5743.[Ozone-Streaming] Add option to write files via streaming api in ofs and o3fs. (#2770)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 7 ++
.../common/src/main/resources/ozone-default.xml | 7 ++
.../apache/hadoop/ozone/client/OzoneBucket.java | 8 ++
.../ozone/client/protocol/ClientProtocol.java | 5 +
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 20 ++++
.../fs/ozone/BasicOzoneClientAdapterImpl.java | 32 +++++++
.../hadoop/fs/ozone/BasicOzoneFileSystem.java | 8 ++
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 38 ++++++++
.../fs/ozone/BasicRootedOzoneFileSystem.java | 8 ++
.../apache/hadoop/fs/ozone/OzoneClientAdapter.java | 3 +
.../hadoop/fs/ozone/OzoneFSDataStreamOutput.java | 103 +++++++++++++++++++++
11 files changed, 239 insertions(+)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index b60f754..01b4eab 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -98,6 +98,13 @@ public final class OzoneConfigKeys {
= 9855;
/**
+ * Flag to enable ratis streaming on filesystem writes.
+ */
+ public static final String OZONE_FS_DATASTREAM_ENABLE =
+ "ozone.fs.datastream.enable";
+ public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = false;
+
+ /**
* When set to true, allocate a random free port for ozone container, so that
* a mini cluster is able to launch multiple containers on a node.
*/
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 54f8e5c..31ed276 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3030,4 +3030,11 @@
will create intermediate directories.
</description>
</property>
+ <property>
+ <name>ozone.fs.datastream.enable</name>
+ <value>false</value>
+ <tag>OZONE, DATANODE</tag>
+ <description> To enable/disable filesystem write via ratis streaming.
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 0c5f3cc..a166fbb 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -874,6 +874,14 @@ public class OzoneBucket extends WithMetadata {
overWrite, recursive);
}
+ public OzoneDataStreamOutput createStreamFile(String keyName, long size,
+ ReplicationConfig replicationConfig, boolean overWrite,
+ boolean recursive) throws IOException {
+ return proxy
+ .createStreamFile(volumeName, name, keyName, size, replicationConfig,
+ overWrite, recursive);
+ }
+
/**
* List the status for a file or a directory and its contents.
*
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index c059fe0..3d5e713 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -698,6 +698,11 @@ public interface ClientProtocol {
String keyName, long size, ReplicationConfig replicationConfig,
boolean overWrite, boolean recursive) throws IOException;
+ @SuppressWarnings("checkstyle:parameternumber")
+ OzoneDataStreamOutput createStreamFile(String volumeName, String bucketName,
+ String keyName, long size, ReplicationConfig replicationConfig,
+ boolean overWrite, boolean recursive) throws IOException;
+
/**
* List the status for a file or a directory and its contents.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 7c8d14a..bd32691 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1387,6 +1387,26 @@ public class RpcClient implements ClientProtocol {
}
@Override
+ public OzoneDataStreamOutput createStreamFile(String volumeName,
+ String bucketName, String keyName, long size,
+ ReplicationConfig replicationConfig, boolean overWrite, boolean recursive)
+ throws IOException {
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setDataSize(size)
+ .setReplicationConfig(replicationConfig)
+ .setAcls(getAclList())
+ .setLatestVersionLocation(getLatestVersionLocation)
+ .build();
+ OpenKeySession keySession =
+ ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
+ return createDataStreamOutput(keySession, UUID.randomUUID().toString(),
+ replicationConfig);
+ }
+
+ @Override
public List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
String keyName, boolean recursive, String startKey, long numEntries)
throws IOException {
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index b966825..70750bb 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -220,6 +221,37 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
}
@Override
+ public OzoneFSDataStreamOutput createStreamFile(String key, short replication,
+ boolean overWrite, boolean recursive) throws IOException {
+ incrementCounter(Statistic.OBJECTS_CREATED, 1);
+ try {
+ OzoneDataStreamOutput ozoneDataStreamOutput = null;
+ if (replication == ReplicationFactor.ONE.getValue()
+ || replication == ReplicationFactor.THREE.getValue()) {
+
+ ReplicationConfig customReplicationConfig =
+ ReplicationConfig.adjustReplication(replicationConfig, replication);
+ ozoneDataStreamOutput = bucket
+ .createStreamFile(key, 0, customReplicationConfig, overWrite,
+ recursive);
+ } else {
+ ozoneDataStreamOutput = bucket
+ .createStreamFile(key, 0, replicationConfig, overWrite, recursive);
+ }
+ return new OzoneFSDataStreamOutput(
+ ozoneDataStreamOutput.getByteBufStreamOutput());
+ } catch (OMException ex) {
+ if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
+ || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+ throw new FileAlreadyExistsException(
+ ex.getResult().name() + ": " + ex.getMessage());
+ } else {
+ throw ex;
+ }
+ }
+ }
+
+ @Override
public void renameKey(String key, String newKeyName) throws IOException {
incrementCounter(Statistic.OBJECTS_RENAMED, 1);
bucket.renameKey(key, newKeyName);
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index c920747..1592032 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -259,6 +260,13 @@ public class BasicOzoneFileSystem extends FileSystem {
private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
+ boolean isRatisStreamingEnabled = getConf().getBoolean(
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE,
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT);
+ if (isRatisStreamingEnabled){
+ return new FSDataOutputStream(adapter.createStreamFile(key,
+ replication, overwrite, recursive), statistics);
+ }
return new FSDataOutputStream(adapter.createFile(key,
replication, overwrite, recursive), statistics);
}
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 43406b7..e5d8bca 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -340,6 +341,43 @@ public class BasicRootedOzoneClientAdapterImpl
}
@Override
+ public OzoneFSDataStreamOutput createStreamFile(String pathStr,
+ short replication, boolean overWrite, boolean recursive)
+ throws IOException {
+ incrementCounter(Statistic.OBJECTS_CREATED, 1);
+ OFSPath ofsPath = new OFSPath(pathStr);
+ if (ofsPath.isRoot() || ofsPath.isVolume() || ofsPath.isBucket()) {
+ throw new IOException("Cannot create file under root or volume.");
+ }
+ String key = ofsPath.getKeyName();
+ try {
+ // Hadoop CopyCommands class always sets recursive to true
+ OzoneBucket bucket = getBucket(ofsPath, recursive);
+ OzoneDataStreamOutput ozoneDataStreamOutput = null;
+ if (replication == ReplicationFactor.ONE.getValue()
+ || replication == ReplicationFactor.THREE.getValue()) {
+
+ ozoneDataStreamOutput = bucket.createStreamFile(key, 0,
+ ReplicationConfig.adjustReplication(replicationConfig, replication),
+ overWrite, recursive);
+ } else {
+ ozoneDataStreamOutput = bucket
+ .createStreamFile(key, 0, replicationConfig, overWrite, recursive);
+ }
+ return new OzoneFSDataStreamOutput(
+ ozoneDataStreamOutput.getByteBufStreamOutput());
+ } catch (OMException ex) {
+ if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
+ || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+ throw new FileAlreadyExistsException(
+ ex.getResult().name() + ": " + ex.getMessage());
+ } else {
+ throw ex;
+ }
+ }
+ }
+
+ @Override
public void renameKey(String key, String newKeyName) throws IOException {
throw new IOException("OFS doesn't support renameKey, use rename instead.");
}
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 35065f0..aea8fe2 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.OFSPath;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -235,6 +236,13 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
+ boolean isRatisStreamingEnabled = getConf().getBoolean(
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE,
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT);
+ if (isRatisStreamingEnabled){
+ return new FSDataOutputStream(adapter.createStreamFile(key,
+ replication, overwrite, recursive), statistics);
+ }
return new FSDataOutputStream(adapter.createFile(key,
replication, overwrite, recursive), statistics);
}
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index 0258f69..d34c97b 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -44,6 +44,9 @@ public interface OzoneClientAdapter {
OzoneFSOutputStream createFile(String key, short replication,
boolean overWrite, boolean recursive) throws IOException;
+ OzoneFSDataStreamOutput createStreamFile(String key, short replication,
+ boolean overWrite, boolean recursive) throws IOException;
+
void renameKey(String key, String newKeyName) throws IOException;
// Users should use rename instead of renameKey in OFS.
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
new file mode 100644
index 0000000..515dbca
--- /dev/null
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * The ByteBuffer output stream for Ozone file system.
+ */
+public class OzoneFSDataStreamOutput extends OutputStream
+ implements ByteBufferStreamOutput {
+
+ private final ByteBufferStreamOutput byteBufferStreamOutput;
+
+ public OzoneFSDataStreamOutput(
+ ByteBufferStreamOutput byteBufferStreamOutput) {
+ this.byteBufferStreamOutput = byteBufferStreamOutput;
+ }
+
+ /**
+ * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public void write(ByteBuffer b, int off, int len)
+ throws IOException {
+ byteBufferStreamOutput.write(b, off, len);
+ }
+
+ /**
+ * Writes the specified byte to this output stream. The general
+ * contract for <code>write</code> is that one byte is written
+ * to the output stream. The byte to be written is the eight
+ * low-order bits of the argument <code>b</code>. The 24
+ * high-order bits of <code>b</code> are ignored.
+ * <p>
+ * Subclasses of <code>OutputStream</code> must provide an
+ * implementation for this method.
+ *
+ * @param b the <code>byte</code>.
+ * @throws IOException if an I/O error occurs. In particular,
+ * an <code>IOException</code> may be thrown if the
+ * output stream has been closed.
+ */
+ @Override
+ public void write(int b) throws IOException {
+ byte[] singleBytes = new byte[1];
+ singleBytes[0] = (byte) b;
+ byteBufferStreamOutput.write(ByteBuffer.wrap(singleBytes));
+ }
+
+ /**
+ * Flushes this DataStream output and forces any buffered output bytes
+ * to be written out.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public void flush() throws IOException {
+ byteBufferStreamOutput.flush();
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * <p> As noted in {@link AutoCloseable#close()}, cases where the
+ * close may fail require careful attention. It is strongly advised
+ * to relinquish the underlying resources and to internally
+ * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+ * the {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ byteBufferStreamOutput.close();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org