You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/11/11 16:59:54 UTC

[ozone] branch HDDS-4454 updated: HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (#3949)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-4454 by this push:
     new c17be7013a HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (#3949)
c17be7013a is described below

commit c17be7013ade5920ef2ae056b48b8b1d8b04d136
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Nov 11 08:59:46 2022 -0800

    HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (#3949)
---
 .../fs/ozone/TestOzoneFileSystemWithStreaming.java | 158 +++++++++++++++++++++
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  23 +--
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  23 ++-
 3 files changed, 173 insertions(+), 31 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
new file mode 100644
index 0000000000..f2aa527598
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.Timeout;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY;
+
+/**
+ * Ozone file system tests with Streaming.
+ */
+public class TestOzoneFileSystemWithStreaming {
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneBucket bucket;
+
+  private final OzoneConfiguration conf = new OzoneConfiguration();
+
+  {
+    try {
+      init();
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void init() throws Exception {
+    final int chunkSize = 16 << 10;
+    final int flushSize = 2 * chunkSize;
+    final int maxFlushSize = 2 * flushSize;
+    final int blockSize = 2 * maxFlushSize;
+    final BucketLayout layout = BucketLayout.FILE_SYSTEM_OPTIMIZED;
+
+    conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, true);
+    conf.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true);
+    conf.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false);
+    conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(5)
+        .setTotalPipelineNumLimit(10)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setDataStreamBufferFlushize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .setDataStreamMinPacketSize(chunkSize)
+        .setDataStreamStreamWindowSize(5 * chunkSize)
+        .build();
+    cluster.waitForClusterToBeReady();
+
+    // create a volume and a bucket to be used by OzoneFileSystem
+    bucket = TestDataUtil.createVolumeAndBucket(cluster, layout);
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testO3fsCreateFile() throws Exception {
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s.%s/",
+        OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final Path file = new Path("/file");
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      runTestCreateFile(fs, file);
+    }
+  }
+
+  @Test
+  public void testOfsCreateFile() throws Exception {
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s/",
+        OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final String dir = OZONE_ROOT + bucket.getVolumeName()
+        + OZONE_URI_DELIMITER + bucket.getName();
+    final Path file = new Path(dir, "file");
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      runTestCreateFile(fs, file);
+    }
+  }
+
+  static void runTestCreateFile(FileSystem fs, Path file) throws Exception {
+    final byte[] bytes = new byte[1 << 20];
+    ThreadLocalRandom.current().nextBytes(bytes);
+
+    ContractTestUtils.createFile(fs, file, true, bytes);
+
+    final byte[] buffer = new byte[4 << 10];
+    int offset = 0;
+    try (FSDataInputStream in = fs.open(file)) {
+      for (; ;) {
+        final int n = in.read(buffer, 0, buffer.length);
+        if (n <= 0) {
+          break;
+        }
+        for (int i = 0; i < n; i++) {
+          Assertions.assertEquals(bytes[offset + i], buffer[i]);
+        }
+        offset += n;
+      }
+    }
+    Assertions.assertEquals(bytes.length, offset);
+  }
+}
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 0c21ab6ec0..65607aae5c 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -272,22 +272,13 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
       boolean overWrite, boolean recursive) throws IOException {
     incrementCounter(Statistic.OBJECTS_CREATED, 1);
     try {
-      OzoneDataStreamOutput ozoneDataStreamOutput = null;
-      if (replication == ReplicationFactor.ONE.getValue()
-          || replication == ReplicationFactor.THREE.getValue()) {
-
-        ReplicationConfig customReplicationConfig =
-            ReplicationConfig.adjustReplication(bucketReplicationConfig,
-                replication, config);
-        ozoneDataStreamOutput = bucket
-            .createStreamFile(key, 0, customReplicationConfig, overWrite,
-                recursive);
-      } else {
-        ozoneDataStreamOutput = bucket.createStreamFile(
-            key, 0, bucketReplicationConfig, overWrite, recursive);
-      }
-      return new OzoneFSDataStreamOutput(
-          ozoneDataStreamOutput.getByteBufStreamOutput());
+      final ReplicationConfig replicationConfig
+          = OzoneClientUtils.resolveClientSideReplicationConfig(
+          replication, clientConfiguredReplicationConfig,
+          getReplicationConfigWithRefreshCheck(), config);
+      final OzoneDataStreamOutput out = bucket.createStreamFile(
+          key, 0, replicationConfig, overWrite, recursive);
+      return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput());
     } catch (OMException ex) {
       if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
           || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 5d51cec7e9..843bcd8119 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -406,21 +406,14 @@ public class BasicRootedOzoneClientAdapterImpl
     String key = ofsPath.getKeyName();
     try {
       // Hadoop CopyCommands class always sets recursive to true
-      OzoneBucket bucket = getBucket(ofsPath, recursive);
-      OzoneDataStreamOutput ozoneDataStreamOutput = null;
-      if (replication == ReplicationFactor.ONE.getValue()
-          || replication == ReplicationFactor.THREE.getValue()) {
-
-        ozoneDataStreamOutput = bucket.createStreamFile(key, 0,
-            ReplicationConfig.adjustReplication(
-                clientConfiguredReplicationConfig, replication, config),
-            overWrite, recursive);
-      } else {
-        ozoneDataStreamOutput = bucket.createStreamFile(
-            key, 0, clientConfiguredReplicationConfig, overWrite, recursive);
-      }
-      return new OzoneFSDataStreamOutput(
-          ozoneDataStreamOutput.getByteBufStreamOutput());
+      final OzoneBucket bucket = getBucket(ofsPath, recursive);
+      final ReplicationConfig replicationConfig
+          = OzoneClientUtils.resolveClientSideReplicationConfig(
+          replication, clientConfiguredReplicationConfig,
+          bucket.getReplicationConfig(), config);
+      final OzoneDataStreamOutput out = bucket.createStreamFile(
+          key, 0, replicationConfig, overWrite, recursive);
+      return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput());
     } catch (OMException ex) {
       if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
           || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org