You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/10/19 11:53:35 UTC
[iceberg] branch master updated: Aliyun: Add OSSOutputStream (#3288)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 329fa5b Aliyun: Add OSSOutputStream (#3288)
329fa5b is described below
commit 329fa5b6d8789f5a367215b349096006cb889110
Author: mikewu <xi...@gmail.com>
AuthorDate: Tue Oct 19 19:53:25 2021 +0800
Aliyun: Add OSSOutputStream (#3288)
---
.../apache/iceberg/aliyun/AliyunProperties.java | 41 ++++++
.../apache/iceberg/aliyun/oss/OSSOutputStream.java | 164 +++++++++++++++++++++
.../java/org/apache/iceberg/aliyun/oss/OSSURI.java | 109 ++++++++++++++
.../iceberg/aliyun/oss/AliyunOSSTestBase.java | 53 +++++++
.../iceberg/aliyun/oss/AliyunOSSTestRule.java | 12 ++
.../iceberg/aliyun/oss/TestOSSOutputStream.java | 126 ++++++++++++++++
.../org/apache/iceberg/aliyun/oss/TestOSSURI.java | 111 ++++++++++++++
.../iceberg/aliyun/oss/mock/AliyunOSSMockRule.java | 5 +
build.gradle | 2 +
9 files changed, 623 insertions(+)
diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java
new file mode 100644
index 0000000..f0a1d19
--- /dev/null
+++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class AliyunProperties implements Serializable {
+ /**
+ * Location to put staging files for uploading to OSS, defaults to the directory value of java.io.tmpdir.
+ */
+ public static final String OSS_STAGING_DIRECTORY = "oss.staging-dir";
+ private final String ossStagingDirectory;
+
+ public AliyunProperties(Map<String, String> properties) {
+ this.ossStagingDirectory = PropertyUtil.propertyAsString(properties, OSS_STAGING_DIRECTORY,
+ System.getProperty("java.io.tmpdir"));
+ }
+
+ public String ossStagingDirectory() {
+ return ossStagingDirectory;
+ }
+}
diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java
new file mode 100644
index 0000000..3d4758f
--- /dev/null
+++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.iceberg.aliyun.AliyunProperties;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OSSOutputStream extends PositionOutputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(OSSOutputStream.class);
+ private final StackTraceElement[] createStack;
+
+ private final OSS client;
+ private final OSSURI uri;
+
+ private final File currentStagingFile;
+ private final OutputStream stream;
+ private long pos = 0;
+ private boolean closed = false;
+
+ OSSOutputStream(OSS client, OSSURI uri, AliyunProperties aliyunProperties) {
+ this.client = client;
+ this.uri = uri;
+ this.createStack = Thread.currentThread().getStackTrace();
+
+ this.currentStagingFile = newStagingFile(aliyunProperties.ossStagingDirectory());
+ this.stream = newStream(currentStagingFile);
+ }
+
+ private static File newStagingFile(String ossStagingDirectory) {
+ try {
+ File stagingFile = File.createTempFile("oss-file-io-", ".tmp", new File(ossStagingDirectory));
+ stagingFile.deleteOnExit();
+ return stagingFile;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private static OutputStream newStream(File currentStagingFile) {
+ try {
+ return new BufferedOutputStream(new FileOutputStream(currentStagingFile));
+ } catch (FileNotFoundException e) {
+ throw new NotFoundException(e, "Failed to create file: %s", currentStagingFile);
+ }
+ }
+
+ private static InputStream uncheckedInputStream(File file) {
+ try {
+ return new FileInputStream(file);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ Preconditions.checkState(!closed, "Already closed.");
+ stream.flush();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ Preconditions.checkState(!closed, "Already closed.");
+ stream.write(b);
+ pos += 1;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ Preconditions.checkState(!closed, "Already closed.");
+ stream.write(b, off, len);
+ pos += len;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ super.close();
+ closed = true;
+
+ try {
+ stream.close();
+ completeUploads();
+ } finally {
+ cleanUpStagingFiles();
+ }
+ }
+
+ private void completeUploads() {
+ long contentLength = currentStagingFile.length();
+ if (contentLength == 0) {
+ LOG.debug("Skipping empty upload to OSS");
+ return;
+ }
+
+ LOG.debug("Uploading {} staged bytes to OSS", contentLength);
+ InputStream contentStream = uncheckedInputStream(currentStagingFile);
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentLength(contentLength);
+
+ PutObjectRequest request = new PutObjectRequest(uri.bucket(), uri.key(), contentStream, metadata);
+ client.putObject(request);
+ }
+
+ private void cleanUpStagingFiles() {
+ if (!currentStagingFile.delete()) {
+ LOG.warn("Failed to delete staging file: {}", currentStagingFile);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:NoFinalizer")
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning.
+ String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed output stream created by:\n\t{}", trace);
+ }
+ }
+}
diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java
new file mode 100644
index 0000000..7898e75
--- /dev/null
+++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import com.aliyun.oss.internal.OSSUtils;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+/**
+ * This class represents a fully qualified location in OSS for input/output
+ * operations expressed as as URI. This implementation is provided to
+ * ensure compatibility with Hadoop Path implementations that may introduce
+ * encoding issues with native URI implementation.
+ *
+ * Note: Path-style access is deprecated and not supported by this
+ * implementation.
+ */
+public class OSSURI {
+ private static final String SCHEME_DELIM = "://";
+ private static final String PATH_DELIM = "/";
+ private static final String QUERY_DELIM = "\\?";
+ private static final String FRAGMENT_DELIM = "#";
+ private static final Set<String> VALID_SCHEMES = ImmutableSet.of("https", "oss");
+ private final String location;
+ private final String bucket;
+ private final String key;
+
+ /**
+ * Creates a new OSSURI based on the bucket and key parsed from the location
+ * The location in string form has the syntax as below, which refers to RFC2396:
+ * [scheme:][//bucket][object key][#fragment]
+ * [scheme:][//bucket][object key][?query][#fragment]
+ *
+ * It specifies precisely which characters are permitted in the various components of a URI reference
+ * in Aliyun OSS documentation as below:
+ * Bucket: https://help.aliyun.com/document_detail/257087.html
+ * Object: https://help.aliyun.com/document_detail/273129.html
+ * Scheme: https or oss
+ *
+ * <p>
+ * Supported access styles are https and oss://... URIs.
+ *
+ * @param location fully qualified URI.
+ */
+ public OSSURI(String location) {
+ Preconditions.checkNotNull(location, "OSS location cannot be null.");
+
+ this.location = location;
+ String[] schemeSplit = location.split(SCHEME_DELIM, -1);
+ ValidationException.check(schemeSplit.length == 2, "Invalid OSS location: %s", location);
+
+ String scheme = schemeSplit[0];
+ ValidationException.check(VALID_SCHEMES.contains(scheme.toLowerCase()),
+ "Invalid scheme: %s in OSS location %s", scheme, location);
+
+ String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
+ ValidationException.check(authoritySplit.length == 2,
+ "Invalid bucket or key in OSS location: %s", location);
+ ValidationException.check(!authoritySplit[1].trim().isEmpty(),
+ "Missing key in OSS location: %s", location);
+ this.bucket = authoritySplit[0];
+ OSSUtils.ensureBucketNameValid(bucket);
+
+ // Strip query and fragment if they exist
+ String path = authoritySplit[1];
+ path = path.split(QUERY_DELIM, -1)[0];
+ path = path.split(FRAGMENT_DELIM, -1)[0];
+ this.key = path;
+ OSSUtils.ensureObjectKeyValid(key);
+ }
+
+ /**
+ * Return OSS bucket name.
+ */
+ public String bucket() {
+ return bucket;
+ }
+
+ /**
+ * Return OSS object key name.
+ */
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public String toString() {
+ return location;
+ }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java
new file mode 100644
index 0000000..24e89bf
--- /dev/null
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import com.aliyun.oss.OSS;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+public abstract class AliyunOSSTestBase {
+ @ClassRule
+ public static final AliyunOSSTestRule OSS_TEST_RULE = AliyunOSSTestUtility.initialize();
+
+ private final SerializableSupplier<OSS> ossClient = OSS_TEST_RULE::createOSSClient;
+ private final String bucketName = OSS_TEST_RULE.testBucketName();
+ private final String keyPrefix = OSS_TEST_RULE.keyPrefix();
+
+ @Before
+ public void before() {
+ OSS_TEST_RULE.setUpBucket(bucketName);
+ }
+
+ @After
+ public void after() {
+ OSS_TEST_RULE.tearDownBucket(bucketName);
+ }
+
+ protected String location(String key) {
+ return String.format("oss://%s/%s%s", bucketName, keyPrefix, key);
+ }
+
+ protected SerializableSupplier<OSS> ossClient() {
+ return ossClient;
+ }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java
index 1c327cf..3e43e5d 100644
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java
@@ -56,6 +56,18 @@ public interface AliyunOSSTestRule extends TestRule {
}
/**
+ * Returns the common key prefix for those newly created objects in test cases. For example, we set the test bucket
+ * to be 'oss-testing-bucket' and the key prefix to be 'iceberg-objects/', then the produced objects in test cases
+ * will be:
+ * <pre>
+ * oss://oss-testing-bucket/iceberg-objects/a.dat
+ * oss://oss-testing-bucket/iceberg-objects/b.dat
+ * ...
+ * </pre>
+ */
+ String keyPrefix();
+
+ /**
* Start the Aliyun Object storage services application that the OSS client could connect to.
*/
void start();
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java
new file mode 100644
index 0000000..4f566b6
--- /dev/null
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import com.aliyun.oss.OSS;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.aliyun.AliyunProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestOSSOutputStream extends AliyunOSSTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TestOSSOutputStream.class);
+
+ private final OSS ossClient = ossClient().get();
+ private final OSS ossMock = mock(OSS.class, delegatesTo(ossClient));
+
+ private final Path tmpDir = Files.createTempDirectory("oss-file-io-test-");
+ private static final Random random = ThreadLocalRandom.current();
+
+ private final AliyunProperties props = new AliyunProperties(ImmutableMap.of(
+ AliyunProperties.OSS_STAGING_DIRECTORY, tmpDir.toString()
+ ));
+
+ public TestOSSOutputStream() throws IOException {
+ }
+
+ @Test
+ public void testWrite() throws IOException {
+ OSSURI uri = randomURI();
+
+ for (int i = 0; i < 2; i++) {
+ boolean arrayWrite = i % 2 == 0;
+ // Write small file.
+ writeAndVerify(ossMock, uri, data256(), arrayWrite);
+ verify(ossMock, times(1)).putObject(any());
+ reset(ossMock);
+
+ // Write large file.
+ writeAndVerify(ossMock, uri, randomData(32 * 1024 * 1024), arrayWrite);
+ verify(ossMock, times(1)).putObject(any());
+ reset(ossMock);
+ }
+ }
+
+ private void writeAndVerify(OSS mock, OSSURI uri, byte[] data, boolean arrayWrite)
+ throws IOException {
+ LOG.info("Write and verify for arguments uri: {}, data length: {}, arrayWrite: {}",
+ uri, data.length, arrayWrite);
+
+ try (OSSOutputStream out = new OSSOutputStream(mock, uri, props)) {
+ if (arrayWrite) {
+ out.write(data);
+ Assert.assertEquals("OSSOutputStream position", data.length, out.getPos());
+ } else {
+ for (int i = 0; i < data.length; i++) {
+ out.write(data[i]);
+ Assert.assertEquals("OSSOutputStream position", i + 1, out.getPos());
+ }
+ }
+ }
+
+ Assert.assertTrue("OSS object should exist", ossClient.doesObjectExist(uri.bucket(), uri.key()));
+ Assert.assertEquals("Object length",
+ ossClient.getObject(uri.bucket(), uri.key()).getObjectMetadata().getContentLength(), data.length);
+
+ byte[] actual = new byte[data.length];
+ IOUtils.readFully(ossClient.getObject(uri.bucket(), uri.key()).getObjectContent(), actual);
+ Assert.assertArrayEquals("Object content", data, actual);
+
+ // Verify all staging files are cleaned up.
+ Assert.assertEquals("Staging files should clean up",
+ 0, Files.list(Paths.get(props.ossStagingDirectory())).count());
+ }
+
+ private OSSURI randomURI() {
+ return new OSSURI(location(String.format("%s.dat", UUID.randomUUID())));
+ }
+
+ private byte[] data256() {
+ byte[] data = new byte[256];
+ for (int i = 0; i < 256; i++) {
+ data[i] = (byte) i;
+ }
+ return data;
+ }
+
+ private byte[] randomData(int size) {
+ byte[] data = new byte[size];
+ random.nextBytes(data);
+ return data;
+ }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java
new file mode 100644
index 0000000..f76383d
--- /dev/null
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aliyun.oss;
+
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static com.aliyun.oss.internal.OSSUtils.OSS_RESOURCE_MANAGER;
+
+public class TestOSSURI {
+ @Test
+ public void testUrlParsing() {
+ String location = "oss://bucket/path/to/file";
+ OSSURI uri = new OSSURI(location);
+
+ Assert.assertEquals("bucket", uri.bucket());
+ Assert.assertEquals("path/to/file", uri.key());
+ Assert.assertEquals(location, uri.toString());
+ }
+
+ @Test
+ public void testEncodedString() {
+ String location = "oss://bucket/path%20to%20file";
+ OSSURI uri = new OSSURI(location);
+
+ Assert.assertEquals("bucket", uri.bucket());
+ Assert.assertEquals("path%20to%20file", uri.key());
+ Assert.assertEquals(location, uri.toString());
+ }
+
+ @Test
+ public void invalidBucket() {
+ AssertHelpers.assertThrows("Invalid bucket", IllegalArgumentException.class,
+ OSS_RESOURCE_MANAGER.getFormattedString("BucketNameInvalid", "test_bucket"),
+ () -> new OSSURI("https://test_bucket/path/to/file"));
+ }
+
+ @Test
+ public void missingKey() {
+ AssertHelpers.assertThrows("Missing key", ValidationException.class,
+ "Missing key in OSS location", () -> new OSSURI("https://bucket/"));
+ }
+
+ @Test
+ public void invalidKey() {
+ AssertHelpers.assertThrows("Invalid key", IllegalArgumentException.class,
+ OSS_RESOURCE_MANAGER.getFormattedString("ObjectKeyInvalid", "\\path/to/file"),
+ () -> new OSSURI("https://bucket/\\path/to/file"));
+ }
+
+ @Test
+ public void relativePathing() {
+ AssertHelpers.assertThrows("Cannot use relative oss location.", ValidationException.class,
+ "Invalid OSS location", () -> new OSSURI("/path/to/file"));
+ }
+
+ @Test
+ public void invalidScheme() {
+ AssertHelpers.assertThrows("Only support scheme: oss/https", ValidationException.class,
+ "Invalid scheme", () -> new OSSURI("invalid://bucket/"));
+ }
+
+ @Test
+ public void testFragment() {
+ String location = "oss://bucket/path/to/file#print";
+ OSSURI uri = new OSSURI(location);
+
+ Assert.assertEquals("bucket", uri.bucket());
+ Assert.assertEquals("path/to/file", uri.key());
+ Assert.assertEquals(location, uri.toString());
+ }
+
+ @Test
+ public void testQueryAndFragment() {
+ String location = "oss://bucket/path/to/file?query=foo#bar";
+ OSSURI uri = new OSSURI(location);
+
+ Assert.assertEquals("bucket", uri.bucket());
+ Assert.assertEquals("path/to/file", uri.key());
+ Assert.assertEquals(location, uri.toString());
+ }
+
+ @Test
+ public void testValidSchemes() {
+ for (String scheme : Lists.newArrayList("https", "oss")) {
+ OSSURI uri = new OSSURI(scheme + "://bucket/path/to/file");
+ Assert.assertEquals("bucket", uri.bucket());
+ Assert.assertEquals("path/to/file", uri.key());
+ }
+ }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java
index 80f25c0..b0f3785 100644
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java
@@ -47,6 +47,11 @@ public class AliyunOSSMockRule implements AliyunOSSTestRule {
}
@Override
+ public String keyPrefix() {
+ return "mock-objects/";
+ }
+
+ @Override
public void start() {
ossMockApp = AliyunOSSMockApp.start(properties);
}
diff --git a/build.gradle b/build.gradle
index 57dcce6..936fd5a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -282,6 +282,7 @@ project(':iceberg-aliyun') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
+ implementation project(':iceberg-core')
implementation project(':iceberg-common')
compileOnly 'com.aliyun.oss:aliyun-sdk-oss'
@@ -296,6 +297,7 @@ project(':iceberg-aliyun') {
}
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
+ testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation 'org.springframework:spring-web'
testImplementation('org.springframework.boot:spring-boot-starter-jetty') {
exclude module: 'logback-classic'