You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/11/11 16:59:54 UTC
[ozone] branch HDDS-4454 updated: HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (#3949)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4454 by this push:
new c17be7013a HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (#3949)
c17be7013a is described below
commit c17be7013ade5920ef2ae056b48b8b1d8b04d136
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Nov 11 08:59:46 2022 -0800
HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (#3949)
---
.../fs/ozone/TestOzoneFileSystemWithStreaming.java | 158 +++++++++++++++++++++
.../fs/ozone/BasicOzoneClientAdapterImpl.java | 23 +--
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 23 ++-
3 files changed, 173 insertions(+), 31 deletions(-)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
new file mode 100644
index 0000000000..f2aa527598
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.Timeout;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY;
+
+/**
+ * Ozone file system tests with Streaming.
+ */
+public class TestOzoneFileSystemWithStreaming {
+ @Rule
+ public Timeout timeout = Timeout.seconds(300);
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneBucket bucket;
+
+ private final OzoneConfiguration conf = new OzoneConfiguration();
+
+ {
+ try {
+ init();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void init() throws Exception {
+ final int chunkSize = 16 << 10;
+ final int flushSize = 2 * chunkSize;
+ final int maxFlushSize = 2 * flushSize;
+ final int blockSize = 2 * maxFlushSize;
+ final BucketLayout layout = BucketLayout.FILE_SYSTEM_OPTIMIZED;
+
+ conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, true);
+ conf.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true);
+ conf.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false);
+ conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(5)
+ .setTotalPipelineNumLimit(10)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setDataStreamBufferFlushize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
+ .setDataStreamMinPacketSize(chunkSize)
+ .setDataStreamStreamWindowSize(5 * chunkSize)
+ .build();
+ cluster.waitForClusterToBeReady();
+
+ // create a volume and a bucket to be used by OzoneFileSystem
+ bucket = TestDataUtil.createVolumeAndBucket(cluster, layout);
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testO3fsCreateFile() throws Exception {
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s.%s/",
+ OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final Path file = new Path("/file");
+
+ try (FileSystem fs = FileSystem.get(conf)) {
+ runTestCreateFile(fs, file);
+ }
+ }
+
+ @Test
+ public void testOfsCreateFile() throws Exception {
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName()
+ + OZONE_URI_DELIMITER + bucket.getName();
+ final Path file = new Path(dir, "file");
+
+ try (FileSystem fs = FileSystem.get(conf)) {
+ runTestCreateFile(fs, file);
+ }
+ }
+
+ static void runTestCreateFile(FileSystem fs, Path file) throws Exception {
+ final byte[] bytes = new byte[1 << 20];
+ ThreadLocalRandom.current().nextBytes(bytes);
+
+ ContractTestUtils.createFile(fs, file, true, bytes);
+
+ final byte[] buffer = new byte[4 << 10];
+ int offset = 0;
+ try (FSDataInputStream in = fs.open(file)) {
+ for (; ;) {
+ final int n = in.read(buffer, 0, buffer.length);
+ if (n <= 0) {
+ break;
+ }
+ for (int i = 0; i < n; i++) {
+ Assertions.assertEquals(bytes[offset + i], buffer[i]);
+ }
+ offset += n;
+ }
+ }
+ Assertions.assertEquals(bytes.length, offset);
+ }
+}
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 0c21ab6ec0..65607aae5c 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -272,22 +272,13 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
boolean overWrite, boolean recursive) throws IOException {
incrementCounter(Statistic.OBJECTS_CREATED, 1);
try {
- OzoneDataStreamOutput ozoneDataStreamOutput = null;
- if (replication == ReplicationFactor.ONE.getValue()
- || replication == ReplicationFactor.THREE.getValue()) {
-
- ReplicationConfig customReplicationConfig =
- ReplicationConfig.adjustReplication(bucketReplicationConfig,
- replication, config);
- ozoneDataStreamOutput = bucket
- .createStreamFile(key, 0, customReplicationConfig, overWrite,
- recursive);
- } else {
- ozoneDataStreamOutput = bucket.createStreamFile(
- key, 0, bucketReplicationConfig, overWrite, recursive);
- }
- return new OzoneFSDataStreamOutput(
- ozoneDataStreamOutput.getByteBufStreamOutput());
+ final ReplicationConfig replicationConfig
+ = OzoneClientUtils.resolveClientSideReplicationConfig(
+ replication, clientConfiguredReplicationConfig,
+ getReplicationConfigWithRefreshCheck(), config);
+ final OzoneDataStreamOutput out = bucket.createStreamFile(
+ key, 0, replicationConfig, overWrite, recursive);
+ return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput());
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 5d51cec7e9..843bcd8119 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -406,21 +406,14 @@ public class BasicRootedOzoneClientAdapterImpl
String key = ofsPath.getKeyName();
try {
// Hadoop CopyCommands class always sets recursive to true
- OzoneBucket bucket = getBucket(ofsPath, recursive);
- OzoneDataStreamOutput ozoneDataStreamOutput = null;
- if (replication == ReplicationFactor.ONE.getValue()
- || replication == ReplicationFactor.THREE.getValue()) {
-
- ozoneDataStreamOutput = bucket.createStreamFile(key, 0,
- ReplicationConfig.adjustReplication(
- clientConfiguredReplicationConfig, replication, config),
- overWrite, recursive);
- } else {
- ozoneDataStreamOutput = bucket.createStreamFile(
- key, 0, clientConfiguredReplicationConfig, overWrite, recursive);
- }
- return new OzoneFSDataStreamOutput(
- ozoneDataStreamOutput.getByteBufStreamOutput());
+ final OzoneBucket bucket = getBucket(ofsPath, recursive);
+ final ReplicationConfig replicationConfig
+ = OzoneClientUtils.resolveClientSideReplicationConfig(
+ replication, clientConfiguredReplicationConfig,
+ bucket.getReplicationConfig(), config);
+ final OzoneDataStreamOutput out = bucket.createStreamFile(
+ key, 0, replicationConfig, overWrite, recursive);
+ return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput());
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org