You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/10/19 11:53:35 UTC

[iceberg] branch master updated: Aliyun: Add OSSOutputStream (#3288)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 329fa5b  Aliyun: Add OSSOutputStream (#3288)
329fa5b is described below

commit 329fa5b6d8789f5a367215b349096006cb889110
Author: mikewu <xi...@gmail.com>
AuthorDate: Tue Oct 19 19:53:25 2021 +0800

    Aliyun: Add OSSOutputStream (#3288)
---
 .../apache/iceberg/aliyun/AliyunProperties.java    |  41 ++++++
 .../apache/iceberg/aliyun/oss/OSSOutputStream.java | 164 +++++++++++++++++++++
 .../java/org/apache/iceberg/aliyun/oss/OSSURI.java | 109 ++++++++++++++
 .../iceberg/aliyun/oss/AliyunOSSTestBase.java      |  53 +++++++
 .../iceberg/aliyun/oss/AliyunOSSTestRule.java      |  12 ++
 .../iceberg/aliyun/oss/TestOSSOutputStream.java    | 126 ++++++++++++++++
 .../org/apache/iceberg/aliyun/oss/TestOSSURI.java  | 111 ++++++++++++++
 .../iceberg/aliyun/oss/mock/AliyunOSSMockRule.java |   5 +
 build.gradle                                       |   2 +
 9 files changed, 623 insertions(+)

diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java
new file mode 100644
index 0000000..f0a1d19
--- /dev/null
+++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class AliyunProperties implements Serializable {
+  /**
+   * Location to put staging files for uploading to OSS, defaults to the directory value of java.io.tmpdir.
+   */
+  public static final String OSS_STAGING_DIRECTORY = "oss.staging-dir";
+  private final String ossStagingDirectory;
+
+  public AliyunProperties(Map<String, String> properties) {
+    this.ossStagingDirectory = PropertyUtil.propertyAsString(properties, OSS_STAGING_DIRECTORY,
+        System.getProperty("java.io.tmpdir"));
+  }
+
+  public String ossStagingDirectory() {
+    return ossStagingDirectory;
+  }
+}
diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java
new file mode 100644
index 0000000..3d4758f
--- /dev/null
+++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.iceberg.aliyun.AliyunProperties;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OSSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(OSSOutputStream.class);
+  private final StackTraceElement[] createStack;
+
+  private final OSS client;
+  private final OSSURI uri;
+
+  private final File currentStagingFile;
+  private final OutputStream stream;
+  private long pos = 0;
+  private boolean closed = false;
+
+  OSSOutputStream(OSS client, OSSURI uri, AliyunProperties aliyunProperties) {
+    this.client = client;
+    this.uri = uri;
+    this.createStack = Thread.currentThread().getStackTrace();
+
+    this.currentStagingFile = newStagingFile(aliyunProperties.ossStagingDirectory());
+    this.stream = newStream(currentStagingFile);
+  }
+
+  private static File newStagingFile(String ossStagingDirectory) {
+    try {
+      File stagingFile = File.createTempFile("oss-file-io-", ".tmp", new File(ossStagingDirectory));
+      stagingFile.deleteOnExit();
+      return stagingFile;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private static OutputStream newStream(File currentStagingFile) {
+    try {
+      return new BufferedOutputStream(new FileOutputStream(currentStagingFile));
+    } catch (FileNotFoundException e) {
+      throw new NotFoundException(e, "Failed to create file: %s", currentStagingFile);
+    }
+  }
+
+  private static InputStream uncheckedInputStream(File file) {
+    try {
+      return new FileInputStream(file);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    Preconditions.checkState(!closed, "Already closed.");
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    Preconditions.checkState(!closed, "Already closed.");
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    Preconditions.checkState(!closed, "Already closed.");
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    super.close();
+    closed = true;
+
+    try {
+      stream.close();
+      completeUploads();
+    } finally {
+      cleanUpStagingFiles();
+    }
+  }
+
+  private void completeUploads() {
+    long contentLength = currentStagingFile.length();
+    if (contentLength == 0) {
+      LOG.debug("Skipping empty upload to OSS");
+      return;
+    }
+
+    LOG.debug("Uploading {} staged bytes to OSS", contentLength);
+    InputStream contentStream = uncheckedInputStream(currentStagingFile);
+    ObjectMetadata metadata = new ObjectMetadata();
+    metadata.setContentLength(contentLength);
+
+    PutObjectRequest request = new PutObjectRequest(uri.bucket(), uri.key(), contentStream, metadata);
+    client.putObject(request);
+  }
+
+  private void cleanUpStagingFiles() {
+    if (!currentStagingFile.delete()) {
+      LOG.warn("Failed to delete staging file: {}", currentStagingFile);
+    }
+  }
+
+  @SuppressWarnings("checkstyle:NoFinalizer")
+  @Override
+  protected void finalize() throws Throwable {
+    super.finalize();
+    if (!closed) {
+      close(); // releasing resources is more important than printing the warning.
+      String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+      LOG.warn("Unclosed output stream created by:\n\t{}", trace);
+    }
+  }
+}
diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java
new file mode 100644
index 0000000..7898e75
--- /dev/null
+++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import com.aliyun.oss.internal.OSSUtils;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+/**
+ * This class represents a fully qualified location in OSS for input/output
+ * operations expressed as as URI.  This implementation is provided to
+ * ensure compatibility with Hadoop Path implementations that may introduce
+ * encoding issues with native URI implementation.
+ *
+ * Note: Path-style access is deprecated and not supported by this
+ * implementation.
+ */
+public class OSSURI {
+  private static final String SCHEME_DELIM = "://";
+  private static final String PATH_DELIM = "/";
+  private static final String QUERY_DELIM = "\\?";
+  private static final String FRAGMENT_DELIM = "#";
+  private static final Set<String> VALID_SCHEMES = ImmutableSet.of("https", "oss");
+  private final String location;
+  private final String bucket;
+  private final String key;
+
+  /**
+   * Creates a new OSSURI based on the bucket and key parsed from the location
+   * The location in string form has the syntax as below, which refers to RFC2396:
+   * [scheme:][//bucket][object key][#fragment]
+   * [scheme:][//bucket][object key][?query][#fragment]
+   *
+   * It specifies precisely which characters are permitted in the various components of a URI reference
+   * in Aliyun OSS documentation as below:
+   * Bucket: https://help.aliyun.com/document_detail/257087.html
+   * Object: https://help.aliyun.com/document_detail/273129.html
+   * Scheme: https or oss
+   *
+   * <p>
+   * Supported access styles are https and oss://... URIs.
+   *
+   * @param location fully qualified URI.
+   */
+  public OSSURI(String location) {
+    Preconditions.checkNotNull(location, "OSS location cannot be null.");
+
+    this.location = location;
+    String[] schemeSplit = location.split(SCHEME_DELIM, -1);
+    ValidationException.check(schemeSplit.length == 2, "Invalid OSS location: %s", location);
+
+    String scheme = schemeSplit[0];
+    ValidationException.check(VALID_SCHEMES.contains(scheme.toLowerCase()),
+            "Invalid scheme: %s in OSS location %s", scheme, location);
+
+    String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
+    ValidationException.check(authoritySplit.length == 2,
+            "Invalid bucket or key in OSS location: %s", location);
+    ValidationException.check(!authoritySplit[1].trim().isEmpty(),
+            "Missing key in OSS location: %s", location);
+    this.bucket = authoritySplit[0];
+    OSSUtils.ensureBucketNameValid(bucket);
+
+    // Strip query and fragment if they exist
+    String path = authoritySplit[1];
+    path = path.split(QUERY_DELIM, -1)[0];
+    path = path.split(FRAGMENT_DELIM, -1)[0];
+    this.key = path;
+    OSSUtils.ensureObjectKeyValid(key);
+  }
+
+  /**
+   * Return OSS bucket name.
+   */
+  public String bucket() {
+    return bucket;
+  }
+
+  /**
+   * Return OSS object key name.
+   */
+  public String key() {
+    return key;
+  }
+
+  @Override
+  public String toString() {
+    return location;
+  }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java
new file mode 100644
index 0000000..24e89bf
--- /dev/null
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import com.aliyun.oss.OSS;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+public abstract class AliyunOSSTestBase {
+  @ClassRule
+  public static final AliyunOSSTestRule OSS_TEST_RULE = AliyunOSSTestUtility.initialize();
+
+  private final SerializableSupplier<OSS> ossClient = OSS_TEST_RULE::createOSSClient;
+  private final String bucketName = OSS_TEST_RULE.testBucketName();
+  private final String keyPrefix = OSS_TEST_RULE.keyPrefix();
+
+  @Before
+  public void before() {
+    OSS_TEST_RULE.setUpBucket(bucketName);
+  }
+
+  @After
+  public void after() {
+    OSS_TEST_RULE.tearDownBucket(bucketName);
+  }
+
+  protected String location(String key) {
+    return String.format("oss://%s/%s%s", bucketName, keyPrefix, key);
+  }
+
+  protected SerializableSupplier<OSS> ossClient() {
+    return ossClient;
+  }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java
index 1c327cf..3e43e5d 100644
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java
@@ -56,6 +56,18 @@ public interface AliyunOSSTestRule extends TestRule {
   }
 
   /**
+   * Returns the common key prefix for those newly created objects in test cases. For example, we set the test bucket
+   * to be 'oss-testing-bucket' and the key prefix to be 'iceberg-objects/', then the produced objects in test cases
+   * will be:
+   * <pre>
+   *   oss://oss-testing-bucket/iceberg-objects/a.dat
+   *   oss://oss-testing-bucket/iceberg-objects/b.dat
+   *   ...
+   * </pre>
+   */
+  String keyPrefix();
+
+  /**
    * Start the Aliyun Object storage services application that the OSS client could connect to.
    */
   void start();
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java
new file mode 100644
index 0000000..4f566b6
--- /dev/null
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import com.aliyun.oss.OSS;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.aliyun.AliyunProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestOSSOutputStream extends AliyunOSSTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(TestOSSOutputStream.class);
+
+  private final OSS ossClient = ossClient().get();
+  private final OSS ossMock = mock(OSS.class, delegatesTo(ossClient));
+
+  private final Path tmpDir = Files.createTempDirectory("oss-file-io-test-");
+  private static final Random random = ThreadLocalRandom.current();
+
+  private final AliyunProperties props = new AliyunProperties(ImmutableMap.of(
+      AliyunProperties.OSS_STAGING_DIRECTORY, tmpDir.toString()
+  ));
+
+  public TestOSSOutputStream() throws IOException {
+  }
+
+  @Test
+  public void testWrite() throws IOException {
+    OSSURI uri = randomURI();
+
+    for (int i = 0; i < 2; i++) {
+      boolean arrayWrite = i % 2 == 0;
+      // Write small file.
+      writeAndVerify(ossMock, uri, data256(), arrayWrite);
+      verify(ossMock, times(1)).putObject(any());
+      reset(ossMock);
+
+      // Write large file.
+      writeAndVerify(ossMock, uri, randomData(32 * 1024 * 1024), arrayWrite);
+      verify(ossMock, times(1)).putObject(any());
+      reset(ossMock);
+    }
+  }
+
+  private void writeAndVerify(OSS mock, OSSURI uri, byte[] data, boolean arrayWrite)
+      throws IOException {
+    LOG.info("Write and verify for arguments uri: {}, data length: {}, arrayWrite: {}",
+            uri, data.length, arrayWrite);
+
+    try (OSSOutputStream out = new OSSOutputStream(mock, uri, props)) {
+      if (arrayWrite) {
+        out.write(data);
+        Assert.assertEquals("OSSOutputStream position", data.length, out.getPos());
+      } else {
+        for (int i = 0; i < data.length; i++) {
+          out.write(data[i]);
+          Assert.assertEquals("OSSOutputStream position", i + 1, out.getPos());
+        }
+      }
+    }
+
+    Assert.assertTrue("OSS object should exist", ossClient.doesObjectExist(uri.bucket(), uri.key()));
+    Assert.assertEquals("Object length",
+        ossClient.getObject(uri.bucket(), uri.key()).getObjectMetadata().getContentLength(), data.length);
+
+    byte[] actual = new byte[data.length];
+    IOUtils.readFully(ossClient.getObject(uri.bucket(), uri.key()).getObjectContent(), actual);
+    Assert.assertArrayEquals("Object content", data, actual);
+
+    // Verify all staging files are cleaned up.
+    Assert.assertEquals("Staging files should clean up",
+        0, Files.list(Paths.get(props.ossStagingDirectory())).count());
+  }
+
+  private OSSURI randomURI() {
+    return new OSSURI(location(String.format("%s.dat", UUID.randomUUID())));
+  }
+
+  private byte[] data256() {
+    byte[] data = new byte[256];
+    for (int i = 0; i < 256; i++) {
+      data[i] = (byte) i;
+    }
+    return data;
+  }
+
+  private byte[] randomData(int size) {
+    byte[] data = new byte[size];
+    random.nextBytes(data);
+    return data;
+  }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java
new file mode 100644
index 0000000..f76383d
--- /dev/null
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static com.aliyun.oss.internal.OSSUtils.OSS_RESOURCE_MANAGER;
+
+public class TestOSSURI {
+  @Test
+  public void testUrlParsing() {
+    String location = "oss://bucket/path/to/file";
+    OSSURI uri = new OSSURI(location);
+
+    Assert.assertEquals("bucket", uri.bucket());
+    Assert.assertEquals("path/to/file", uri.key());
+    Assert.assertEquals(location, uri.toString());
+  }
+
+  @Test
+  public void testEncodedString() {
+    String location = "oss://bucket/path%20to%20file";
+    OSSURI uri = new OSSURI(location);
+
+    Assert.assertEquals("bucket", uri.bucket());
+    Assert.assertEquals("path%20to%20file", uri.key());
+    Assert.assertEquals(location, uri.toString());
+  }
+
+  @Test
+  public void invalidBucket() {
+    AssertHelpers.assertThrows("Invalid bucket", IllegalArgumentException.class,
+        OSS_RESOURCE_MANAGER.getFormattedString("BucketNameInvalid", "test_bucket"),
+        () -> new OSSURI("https://test_bucket/path/to/file"));
+  }
+
+  @Test
+  public void missingKey() {
+    AssertHelpers.assertThrows("Missing key", ValidationException.class,
+        "Missing key in OSS location", () -> new OSSURI("https://bucket/"));
+  }
+
+  @Test
+  public void invalidKey() {
+    AssertHelpers.assertThrows("Invalid key", IllegalArgumentException.class,
+        OSS_RESOURCE_MANAGER.getFormattedString("ObjectKeyInvalid", "\\path/to/file"),
+        () -> new OSSURI("https://bucket/\\path/to/file"));
+  }
+
+  @Test
+  public void relativePathing() {
+    AssertHelpers.assertThrows("Cannot use relative oss location.", ValidationException.class,
+        "Invalid OSS location", () -> new OSSURI("/path/to/file"));
+  }
+
+  @Test
+  public void invalidScheme() {
+    AssertHelpers.assertThrows("Only support scheme: oss/https", ValidationException.class,
+        "Invalid scheme", () -> new OSSURI("invalid://bucket/"));
+  }
+
+  @Test
+  public void testFragment() {
+    String location = "oss://bucket/path/to/file#print";
+    OSSURI uri = new OSSURI(location);
+
+    Assert.assertEquals("bucket", uri.bucket());
+    Assert.assertEquals("path/to/file", uri.key());
+    Assert.assertEquals(location, uri.toString());
+  }
+
+  @Test
+  public void testQueryAndFragment() {
+    String location = "oss://bucket/path/to/file?query=foo#bar";
+    OSSURI uri = new OSSURI(location);
+
+    Assert.assertEquals("bucket", uri.bucket());
+    Assert.assertEquals("path/to/file", uri.key());
+    Assert.assertEquals(location, uri.toString());
+  }
+
+  @Test
+  public void testValidSchemes() {
+    for (String scheme : Lists.newArrayList("https", "oss")) {
+      OSSURI uri = new OSSURI(scheme + "://bucket/path/to/file");
+      Assert.assertEquals("bucket", uri.bucket());
+      Assert.assertEquals("path/to/file", uri.key());
+    }
+  }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java
index 80f25c0..b0f3785 100644
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java
@@ -47,6 +47,11 @@ public class AliyunOSSMockRule implements AliyunOSSTestRule {
   }
 
   @Override
+  public String keyPrefix() {
+    return "mock-objects/";
+  }
+
+  @Override
   public void start() {
     ossMockApp = AliyunOSSMockApp.start(properties);
   }
diff --git a/build.gradle b/build.gradle
index 57dcce6..936fd5a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -282,6 +282,7 @@ project(':iceberg-aliyun') {
   dependencies {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
     api project(':iceberg-api')
+    implementation project(':iceberg-core')
     implementation project(':iceberg-common')
 
     compileOnly 'com.aliyun.oss:aliyun-sdk-oss'
@@ -296,6 +297,7 @@ project(':iceberg-aliyun') {
     }
 
     testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
+    testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
     testImplementation 'org.springframework:spring-web'
     testImplementation('org.springframework.boot:spring-boot-starter-jetty') {
       exclude module: 'logback-classic'