You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:22 UTC
[ozone] 17/35: HDDS-5743. [Ozone-Streaming] Add option to write files via streaming api in ofs and o3fs. (#2770)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit e1d9db9899310d3a9541239c72babf3e83d45d8f
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Fri Nov 19 12:14:05 2021 +0530
HDDS-5743. [Ozone-Streaming] Add option to write files via streaming api in ofs and o3fs. (#2770)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 7 ++
.../common/src/main/resources/ozone-default.xml | 7 ++
.../apache/hadoop/ozone/client/OzoneBucket.java | 8 ++
.../ozone/client/protocol/ClientProtocol.java | 5 +
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 20 ++++
.../fs/ozone/BasicOzoneClientAdapterImpl.java | 33 +++++++
.../hadoop/fs/ozone/BasicOzoneFileSystem.java | 8 ++
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 39 ++++++++
.../fs/ozone/BasicRootedOzoneFileSystem.java | 8 ++
.../apache/hadoop/fs/ozone/OzoneClientAdapter.java | 3 +
.../hadoop/fs/ozone/OzoneFSDataStreamOutput.java | 103 +++++++++++++++++++++
11 files changed, 241 insertions(+)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index ee0556dd43..305be444af 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -97,6 +97,13 @@ public final class OzoneConfigKeys {
public static final int DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT
= 9855;
+ /**
+ * Flag to enable ratis streaming on filesystem writes.
+ */
+ public static final String OZONE_FS_DATASTREAM_ENABLE =
+ "ozone.fs.datastream.enable";
+ public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = false;
+
/**
* When set to true, allocate a random free port for ozone container, so that
* a mini cluster is able to launch multiple containers on a node.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e0732fc4c6..49a81cb968 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3188,4 +3188,11 @@
log level is debug. Ex: "CREATE_CONTAINER,READ_CONTAINER,UPDATE_CONTAINER".
</description>
</property>
+ <property>
+ <name>ozone.fs.datastream.enable</name>
+ <value>false</value>
+ <tag>OZONE, DATANODE</tag>
+ <description> To enable/disable filesystem write via ratis streaming.
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index bf7f618b96..3cd0ca7f62 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -952,6 +952,14 @@ public class OzoneBucket extends WithMetadata {
overWrite, recursive);
}
+ public OzoneDataStreamOutput createStreamFile(String keyName, long size,
+ ReplicationConfig replicationConfig, boolean overWrite,
+ boolean recursive) throws IOException {
+ return proxy
+ .createStreamFile(volumeName, name, keyName, size, replicationConfig,
+ overWrite, recursive);
+ }
+
/**
* List the status for a file or a directory and its contents.
*
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 5083f5a78a..6e0cc6602f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -702,6 +702,11 @@ public interface ClientProtocol {
String keyName, long size, ReplicationConfig replicationConfig,
boolean overWrite, boolean recursive) throws IOException;
+ @SuppressWarnings("checkstyle:parameternumber")
+ OzoneDataStreamOutput createStreamFile(String volumeName, String bucketName,
+ String keyName, long size, ReplicationConfig replicationConfig,
+ boolean overWrite, boolean recursive) throws IOException;
+
/**
* List the status for a file or a directory and its contents.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c3002a5abc..42891c492b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1630,6 +1630,26 @@ public class RpcClient implements ClientProtocol {
return createOutputStream(keySession, UUID.randomUUID().toString());
}
+ @Override
+ public OzoneDataStreamOutput createStreamFile(String volumeName,
+ String bucketName, String keyName, long size,
+ ReplicationConfig replicationConfig, boolean overWrite, boolean recursive)
+ throws IOException {
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setDataSize(size)
+ .setReplicationConfig(replicationConfig)
+ .setAcls(getAclList())
+ .setLatestVersionLocation(getLatestVersionLocation)
+ .build();
+ OpenKeySession keySession =
+ ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
+ return createDataStreamOutput(keySession, UUID.randomUUID().toString(),
+ replicationConfig);
+ }
+
@Override
public List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
String keyName, boolean recursive, String startKey, long numEntries)
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index bec00e9299..be919b76fe 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -265,6 +266,38 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
return this.bucketReplicationConfig;
}
+ @Override
+ public OzoneFSDataStreamOutput createStreamFile(String key, short replication,
+ boolean overWrite, boolean recursive) throws IOException {
+ incrementCounter(Statistic.OBJECTS_CREATED, 1);
+ try {
+ OzoneDataStreamOutput ozoneDataStreamOutput = null;
+ if (replication == ReplicationFactor.ONE.getValue()
+ || replication == ReplicationFactor.THREE.getValue()) {
+
+ ReplicationConfig customReplicationConfig =
+ ReplicationConfig.adjustReplication(bucketReplicationConfig,
+ replication, config);
+ ozoneDataStreamOutput = bucket
+ .createStreamFile(key, 0, customReplicationConfig, overWrite,
+ recursive);
+ } else {
+ ozoneDataStreamOutput = bucket.createStreamFile(
+ key, 0, bucketReplicationConfig, overWrite, recursive);
+ }
+ return new OzoneFSDataStreamOutput(
+ ozoneDataStreamOutput.getByteBufStreamOutput());
+ } catch (OMException ex) {
+ if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
+ || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+ throw new FileAlreadyExistsException(
+ ex.getResult().name() + ": " + ex.getMessage());
+ } else {
+ throw ex;
+ }
+ }
+ }
+
@Override
public void renameKey(String key, String newKeyName) throws IOException {
incrementCounter(Statistic.OBJECTS_RENAMED, 1);
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index 34961050ad..e669f1c5cc 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -259,6 +260,13 @@ public class BasicOzoneFileSystem extends FileSystem {
private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
+ boolean isRatisStreamingEnabled = getConf().getBoolean(
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE,
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT);
+ if (isRatisStreamingEnabled){
+ return new FSDataOutputStream(adapter.createStreamFile(key,
+ replication, overwrite, recursive), statistics);
+ }
return new FSDataOutputStream(adapter.createFile(key,
replication, overwrite, recursive), statistics);
}
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 052c6c8ba6..5295aa33d6 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -352,6 +353,44 @@ public class BasicRootedOzoneClientAdapterImpl
}
}
+ @Override
+ public OzoneFSDataStreamOutput createStreamFile(String pathStr,
+ short replication, boolean overWrite, boolean recursive)
+ throws IOException {
+ incrementCounter(Statistic.OBJECTS_CREATED, 1);
+ OFSPath ofsPath = new OFSPath(pathStr);
+ if (ofsPath.isRoot() || ofsPath.isVolume() || ofsPath.isBucket()) {
+ throw new IOException("Cannot create file under root or volume.");
+ }
+ String key = ofsPath.getKeyName();
+ try {
+ // Hadoop CopyCommands class always sets recursive to true
+ OzoneBucket bucket = getBucket(ofsPath, recursive);
+ OzoneDataStreamOutput ozoneDataStreamOutput = null;
+ if (replication == ReplicationFactor.ONE.getValue()
+ || replication == ReplicationFactor.THREE.getValue()) {
+
+ ozoneDataStreamOutput = bucket.createStreamFile(key, 0,
+ ReplicationConfig.adjustReplication(
+ clientConfiguredReplicationConfig, replication, config),
+ overWrite, recursive);
+ } else {
+ ozoneDataStreamOutput = bucket.createStreamFile(
+ key, 0, clientConfiguredReplicationConfig, overWrite, recursive);
+ }
+ return new OzoneFSDataStreamOutput(
+ ozoneDataStreamOutput.getByteBufStreamOutput());
+ } catch (OMException ex) {
+ if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
+ || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+ throw new FileAlreadyExistsException(
+ ex.getResult().name() + ": " + ex.getMessage());
+ } else {
+ throw ex;
+ }
+ }
+ }
+
@Override
public void renameKey(String key, String newKeyName) throws IOException {
throw new IOException("OFS doesn't support renameKey, use rename instead.");
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 0360e345b2..0bed09ba8a 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.OFSPath;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -235,6 +236,13 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
+ boolean isRatisStreamingEnabled = getConf().getBoolean(
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE,
+ OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT);
+ if (isRatisStreamingEnabled){
+ return new FSDataOutputStream(adapter.createStreamFile(key,
+ replication, overwrite, recursive), statistics);
+ }
return new FSDataOutputStream(adapter.createFile(key,
replication, overwrite, recursive), statistics);
}
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index 31bf351f01..24566cb83f 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -45,6 +45,9 @@ public interface OzoneClientAdapter {
OzoneFSOutputStream createFile(String key, short replication,
boolean overWrite, boolean recursive) throws IOException;
+ OzoneFSDataStreamOutput createStreamFile(String key, short replication,
+ boolean overWrite, boolean recursive) throws IOException;
+
void renameKey(String key, String newKeyName) throws IOException;
// Users should use rename instead of renameKey in OFS.
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
new file mode 100644
index 0000000000..515dbca92b
--- /dev/null
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * The ByteBuffer output stream for Ozone file system.
+ */
+public class OzoneFSDataStreamOutput extends OutputStream
+ implements ByteBufferStreamOutput {
+
+ private final ByteBufferStreamOutput byteBufferStreamOutput;
+
+ public OzoneFSDataStreamOutput(
+ ByteBufferStreamOutput byteBufferStreamOutput) {
+ this.byteBufferStreamOutput = byteBufferStreamOutput;
+ }
+
+ /**
+ * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public void write(ByteBuffer b, int off, int len)
+ throws IOException {
+ byteBufferStreamOutput.write(b, off, len);
+ }
+
+ /**
+ * Writes the specified byte to this output stream. The general
+ * contract for <code>write</code> is that one byte is written
+ * to the output stream. The byte to be written is the eight
+ * low-order bits of the argument <code>b</code>. The 24
+ * high-order bits of <code>b</code> are ignored.
+ * <p>
+ * Subclasses of <code>OutputStream</code> must provide an
+ * implementation for this method.
+ *
+ * @param b the <code>byte</code>.
+ * @throws IOException if an I/O error occurs. In particular,
+ * an <code>IOException</code> may be thrown if the
+ * output stream has been closed.
+ */
+ @Override
+ public void write(int b) throws IOException {
+ byte[] singleBytes = new byte[1];
+ singleBytes[0] = (byte) b;
+ byteBufferStreamOutput.write(ByteBuffer.wrap(singleBytes));
+ }
+
+ /**
+ * Flushes this DataStream output and forces any buffered output bytes
+ * to be written out.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public void flush() throws IOException {
+ byteBufferStreamOutput.flush();
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * <p> As noted in {@link AutoCloseable#close()}, cases where the
+ * close may fail require careful attention. It is strongly advised
+ * to relinquish the underlying resources and to internally
+ * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+ * the {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ byteBufferStreamOutput.close();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org