You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2021/12/14 16:48:56 UTC
[iceberg] branch master updated: Add FileIO implementation for Google Cloud Storage (#3711)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e70e7d2 Add FileIO implementation for Google Cloud Storage (#3711)
e70e7d2 is described below
commit e70e7d26feb1d56f31c7cdfde515278beeda26a6
Author: Daniel Weeks <dw...@apache.org>
AuthorDate: Tue Dec 14 08:48:40 2021 -0800
Add FileIO implementation for Google Cloud Storage (#3711)
* Add FileIO implementation for Google Cloud Storage
* Checkstyle
* Checkstyle
* Review comments
* Update version management
---
build.gradle | 24 ++++
.../java/org/apache/iceberg/gcp/GCPProperties.java | 103 +++++++++++++++
.../org/apache/iceberg/gcp/gcs/BaseGCSFile.java | 76 +++++++++++
.../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 125 ++++++++++++++++++
.../org/apache/iceberg/gcp/gcs/GCSInputFile.java | 46 +++++++
.../org/apache/iceberg/gcp/gcs/GCSInputStream.java | 143 +++++++++++++++++++++
.../org/apache/iceberg/gcp/gcs/GCSOutputFile.java | 69 ++++++++++
.../apache/iceberg/gcp/gcs/GCSOutputStream.java | 127 ++++++++++++++++++
.../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java | 96 ++++++++++++++
.../apache/iceberg/gcp/gcs/GCSInputStreamTest.java | 137 ++++++++++++++++++++
.../iceberg/gcp/gcs/GCSOutputStreamTest.java | 98 ++++++++++++++
settings.gradle | 2 +
versions.props | 1 +
13 files changed, 1047 insertions(+)
diff --git a/build.gradle b/build.gradle
index 85bfe42..e290c88 100644
--- a/build.gradle
+++ b/build.gradle
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
}
}
+project(':iceberg-gcp') {
+ dependencies {
+ implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+ api project(':iceberg-api')
+ implementation project(':iceberg-common')
+ implementation project(':iceberg-core')
+
+ implementation platform('com.google.cloud:libraries-bom')
+ implementation 'com.google.cloud:google-cloud-storage'
+
+ testImplementation 'com.google.cloud:google-cloud-nio'
+ testImplementation 'org.assertj:assertj-core'
+
+ testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
+
+ testImplementation("org.apache.hadoop:hadoop-common") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+ exclude group: 'javax.servlet', module: 'servlet-api'
+ exclude group: 'com.google.code.gson', module: 'gson'
+ }
+ }
+}
+
project(':iceberg-hive-metastore') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
new file mode 100644
index 0000000..0f092be
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+
+public class GCPProperties implements Serializable {
+ // Service Options
+ public static final String GCS_PROJECT_ID = "gcs.project-id";
+ public static final String GCS_CLIENT_LIB_TOKEN = "gcs.client-lib-token";
+ public static final String GCS_SERVICE_HOST = "gcs.service.host";
+
+ // GCS Configuration Properties
+ public static final String GCS_DECRYPTION_KEY = "gcs.decryption-key";
+ public static final String GCS_ENCRYPTION_KEY = "gcs.encryption-key";
+ public static final String GCS_USER_PROJECT = "gcs.user-project";
+
+ public static final String GCS_CHANNEL_READ_CHUNK_SIZE = "gcs.channel.read.chunk-size-bytes";
+ public static final String GCS_CHANNEL_WRITE_CHUNK_SIZE = "gcs.channel.write.chunk-size-bytes";
+
+ private String projectId;
+ private String clientLibToken;
+ private String serviceHost;
+
+ private String gcsDecryptionKey;
+ private String gcsEncryptionKey;
+ private String gcsUserProject;
+
+ private Integer gcsChannelReadChunkSize;
+ private Integer gcsChannelWriteChunkSize;
+
+ public GCPProperties() {
+ }
+
+ public GCPProperties(Map<String, String> properties) {
+ projectId = properties.get(GCS_PROJECT_ID);
+ clientLibToken = properties.get(GCS_CLIENT_LIB_TOKEN);
+ serviceHost = properties.get(GCS_SERVICE_HOST);
+
+ gcsDecryptionKey = properties.get(GCS_DECRYPTION_KEY);
+ gcsEncryptionKey = properties.get(GCS_ENCRYPTION_KEY);
+ gcsUserProject = properties.get(GCS_USER_PROJECT);
+
+ if (properties.containsKey(GCS_CHANNEL_READ_CHUNK_SIZE)) {
+ gcsChannelReadChunkSize = Integer.parseInt(properties.get(GCS_CHANNEL_READ_CHUNK_SIZE));
+ }
+
+ if (properties.containsKey(GCS_CHANNEL_WRITE_CHUNK_SIZE)) {
+ gcsChannelWriteChunkSize = Integer.parseInt(properties.get(GCS_CHANNEL_WRITE_CHUNK_SIZE));
+ }
+ }
+
+ public Optional<Integer> channelReadChunkSize() {
+ return Optional.ofNullable(gcsChannelReadChunkSize);
+ }
+
+ public Optional<Integer> channelWriteChunkSize() {
+ return Optional.ofNullable(gcsChannelWriteChunkSize);
+ }
+
+ public Optional<String> clientLibToken() {
+ return Optional.ofNullable(clientLibToken);
+ }
+
+ public Optional<String> decryptionKey() {
+ return Optional.ofNullable(gcsDecryptionKey);
+ }
+
+ public Optional<String> encryptionKey() {
+ return Optional.ofNullable(gcsEncryptionKey);
+ }
+
+ public Optional<String> projectId() {
+ return Optional.ofNullable(projectId);
+ }
+
+ public Optional<String> serviceHost() {
+ return Optional.ofNullable(serviceHost);
+ }
+
+ public Optional<String> userProject() {
+ return Optional.ofNullable(gcsUserProject);
+ }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
new file mode 100644
index 0000000..9842c10
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import java.net.URI;
+import org.apache.iceberg.gcp.GCPProperties;
+
+abstract class BaseGCSFile {
+ private final Storage storage;
+ private final GCPProperties gcpProperties;
+ private final BlobId blobId;
+ private Blob metadata;
+
+ BaseGCSFile(Storage storage, BlobId blobId, GCPProperties gcpProperties) {
+ this.storage = storage;
+ this.blobId = blobId;
+ this.gcpProperties = gcpProperties;
+ }
+
+ public String location() {
+ return blobId.toGsUtilUri();
+ }
+
+ Storage storage() {
+ return storage;
+ }
+
+ URI uri() {
+ return URI.create(blobId.toGsUtilUri());
+ }
+
+ BlobId blobId() {
+ return blobId;
+ }
+
+ protected GCPProperties gcpProperties() {
+ return gcpProperties;
+ }
+
+ public boolean exists() {
+ return getBlob() != null;
+ }
+
+ protected Blob getBlob() {
+ if (metadata == null) {
+ metadata = storage.get(blobId);
+ }
+
+ return metadata;
+ }
+
+ @Override
+ public String toString() {
+ return blobId.toString();
+ }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
new file mode 100644
index 0000000..d1d947a
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}
+ * that follow the convention <pre>{@code gs://<bucket>/<blob_path>}</pre>
+ * <p>
+ * See <a href="https://cloud.google.com/storage/docs/folders#overview">Cloud Storage Overview</a>
+ */
+public class GCSFileIO implements FileIO {
+ private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
+
+ private SerializableSupplier<Storage> storageSupplier;
+ private GCPProperties gcpProperties;
+ private transient Storage storage;
+ private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+
+ /**
+ * No-arg constructor to load the FileIO dynamically.
+ * <p>
+ * All fields are initialized by calling {@link GCSFileIO#initialize(Map)} later.
+ */
+ public GCSFileIO() {
+ }
+
+ /**
+ * Constructor with custom storage supplier and GCP properties.
+ * <p>
+ * Calling {@link GCSFileIO#initialize(Map)} will overwrite information set in this constructor.
+ *
+ * @param storageSupplier storage supplier
+ * @param gcpProperties gcp properties
+ */
+ public GCSFileIO(SerializableSupplier<Storage> storageSupplier, GCPProperties gcpProperties) {
+ this.storageSupplier = storageSupplier;
+ this.gcpProperties = gcpProperties;
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return GCSInputFile.fromLocation(path, client(), gcpProperties);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return GCSOutputFile.fromLocation(path, client(), gcpProperties);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ // There is no specific contract about whether delete should fail
+ // and other FileIO providers ignore failure. Log the failure for
+ // now as it is not a required operation for Iceberg.
+ if (!client().delete(BlobId.fromGsUtilUri(path))) {
+ LOG.warn("Failed to delete path: {}", path);
+ }
+ }
+
+ private Storage client() {
+ if (storage == null) {
+ storage = storageSupplier.get();
+ }
+ return storage;
+ }
+
+ @Override
+ public void initialize(Map<String, String> properties) {
+ this.gcpProperties = new GCPProperties(properties);
+
+ this.storageSupplier = () -> {
+ StorageOptions.Builder builder = StorageOptions.newBuilder();
+
+ gcpProperties.projectId().ifPresent(builder::setProjectId);
+ gcpProperties.clientLibToken().ifPresent(builder::setClientLibToken);
+ gcpProperties.serviceHost().ifPresent(builder::setHost);
+
+ return builder.build().getService();
+ };
+ }
+
+ @Override
+ public void close() {
+ // handles concurrent calls to close()
+ if (isResourceClosed.compareAndSet(false, true)) {
+ if (storage != null) {
+ // GCS Storage does not appear to be closable, so release the reference
+ storage = null;
+ }
+ }
+ }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
new file mode 100644
index 0000000..1f836ec
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+public class GCSInputFile extends BaseGCSFile implements InputFile {
+ public static GCSInputFile fromLocation(String location, Storage storage, GCPProperties gcpProperties) {
+ return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), gcpProperties);
+ }
+
+ GCSInputFile(Storage storage, BlobId blobId, GCPProperties gcpProperties) {
+ super(storage, blobId, gcpProperties);
+ }
+
+ @Override
+ public long getLength() {
+ return getBlob().getSize();
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ return new GCSInputStream(storage(), blobId(), gcpProperties());
+ }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
new file mode 100644
index 0000000..3a83276
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobSourceOption;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSInputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSInputStream extends SeekableInputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(GCSInputStream.class);
+
+ private final StackTraceElement[] createStack;
+ private final Storage storage;
+ private final BlobId blobId;
+ private final GCPProperties gcpProperties;
+
+ private ReadChannel channel;
+ private long pos = 0;
+ private boolean closed = false;
+ private final ByteBuffer singleByteBuffer = ByteBuffer.wrap(new byte[1]);
+ private ByteBuffer byteBuffer;
+
+ GCSInputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) {
+ this.storage = storage;
+ this.blobId = blobId;
+ this.gcpProperties = gcpProperties;
+
+ createStack = Thread.currentThread().getStackTrace();
+
+ openStream();
+ }
+
+ private void openStream() {
+ List<BlobSourceOption> sourceOptions = Lists.newArrayList();
+
+ gcpProperties.decryptionKey().ifPresent(
+ key -> sourceOptions.add(BlobSourceOption.decryptionKey(key)));
+ gcpProperties.userProject().ifPresent(
+ userProject -> sourceOptions.add(BlobSourceOption.userProject(userProject)));
+
+ channel = storage.reader(blobId, sourceOptions.toArray(new BlobSourceOption[0]));
+
+ gcpProperties.channelReadChunkSize().ifPresent(channel::setChunkSize);
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ @Override
+ public void seek(long newPos) {
+ Preconditions.checkState(!closed, "already closed");
+ Preconditions.checkArgument(newPos >= 0, "position is negative: %s", newPos);
+
+ pos = newPos;
+ try {
+ channel.seek(newPos);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ Preconditions.checkState(!closed, "Cannot read: already closed");
+ singleByteBuffer.position(0);
+
+ pos += 1;
+ channel.read(singleByteBuffer);
+
+ return singleByteBuffer.array()[0];
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ Preconditions.checkState(!closed, "Cannot read: already closed");
+
+ byteBuffer = byteBuffer != null && byteBuffer.array() == b ? byteBuffer : ByteBuffer.wrap(b);
+ byteBuffer.position(off);
+ byteBuffer.limit(Math.min(off + len, byteBuffer.capacity()));
+
+ int bytesRead = channel.read(byteBuffer);
+ pos += bytesRead;
+
+ return bytesRead;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ closed = true;
+ if (channel != null) {
+ channel.close();
+ }
+ }
+
+ @SuppressWarnings("checkstyle:NoFinalizer")
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning
+ String trace = Joiner.on("\n\t").join(
+ Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed input stream created by:\n\t{}", trace);
+ }
+ }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
new file mode 100644
index 0000000..db11e2c
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class GCSOutputFile extends BaseGCSFile implements OutputFile {
+ public static GCSOutputFile fromLocation(String location, Storage storage, GCPProperties gcpProperties) {
+ return new GCSOutputFile(storage, BlobId.fromGsUtilUri(location), gcpProperties);
+ }
+
+ GCSOutputFile(Storage storage, BlobId blobId, GCPProperties gcpProperties) {
+ super(storage, blobId, gcpProperties);
+ }
+
+ /**
+ * Create an output stream for the specified location if the target object
+ * does not exist in GCS at the time of invocation.
+ *
+ * @return output stream
+ */
+ @Override
+ public PositionOutputStream create() {
+ if (!exists()) {
+ return createOrOverwrite();
+ } else {
+ throw new AlreadyExistsException("Location already exists: %s", uri());
+ }
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite() {
+ try {
+ return new GCSOutputStream(storage(), blobId(), gcpProperties());
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create output stream for location: " + uri(), e);
+ }
+ }
+
+ @Override
+ public InputFile toInputFile() {
+ return new GCSInputFile(storage(), blobId(), gcpProperties());
+ }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
new file mode 100644
index 0000000..611833d
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+ private final StackTraceElement[] createStack;
+ private final Storage storage;
+ private final BlobId blobId;
+ private final GCPProperties gcpProperties;
+
+ private OutputStream stream;
+
+ private long pos = 0;
+ private boolean closed = false;
+
+ GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+ this.storage = storage;
+ this.blobId = blobId;
+ this.gcpProperties = gcpProperties;
+
+ createStack = Thread.currentThread().getStackTrace();
+
+ openStream();
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ stream.write(b);
+ pos += 1;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ stream.write(b, off, len);
+ pos += len;
+ }
+
+ private void openStream() {
+ List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+ gcpProperties.encryptionKey().ifPresent(
+ key -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+ gcpProperties.userProject().ifPresent(
+ userProject -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+ WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+ writeOptions.toArray(new BlobWriteOption[0]));
+
+ gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);
+
+ stream = Channels.newOutputStream(channel);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ super.close();
+ closed = true;
+ stream.close();
+ }
+
+
+ @SuppressWarnings("checkstyle:NoFinalizer")
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!closed) {
+ close(); // releasing resources is more important than printing the warning
+ String trace = Joiner.on("\n\t").join(
+ Arrays.copyOfRange(createStack, 1, createStack.length));
+ LOG.warn("Unclosed output stream created by:\n\t{}", trace);
+ }
+ }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
new file mode 100644
index 0000000..99bb08d
--- /dev/null
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Random;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+
+public class GCSFileIOTest {
+ private static final String TEST_BUCKET = "TEST_BUCKET";
+ private final Random random = new Random(1);
+
+ private final Storage storage = LocalStorageHelper.getOptions().getService();
+ private GCSFileIO io;
+
+ @Before
+ public void before() {
+ io = new GCSFileIO(() -> storage, new GCPProperties());
+ }
+
+ @Test
+ public void newInputFile() throws IOException {
+ String location = format("gs://%s/path/to/file.txt", TEST_BUCKET);
+ byte [] expected = new byte[1024 * 1024];
+ random.nextBytes(expected);
+
+ InputFile in = io.newInputFile(location);
+ assertFalse(in.exists());
+
+ OutputFile out = io.newOutputFile(location);
+ try (OutputStream os = out.createOrOverwrite()) {
+ IOUtils.write(expected, os);
+ }
+
+ assertThat(in.exists()).isTrue();
+ byte [] actual = new byte[1024 * 1024];
+
+ try (InputStream is = in.newStream()) {
+ IOUtils.readFully(is, actual);
+ }
+
+ assertThat(expected).isEqualTo(actual);
+
+ io.deleteFile(in);
+
+ assertThat(io.newInputFile(location).exists()).isFalse();
+ }
+
+ @Test
+ public void testDelete() {
+ String path = "delete/path/data.dat";
+ storage.create(BlobInfo.newBuilder(TEST_BUCKET, path).build());
+
+ // There should be one blob in the bucket
+ assertThat(StreamSupport.stream(storage.list(TEST_BUCKET).iterateAll().spliterator(), false).count())
+ .isEqualTo(1);
+
+ io.deleteFile(format("gs://%s/%s", TEST_BUCKET, path));
+
+ // The bucket should now be empty
+ assertThat(StreamSupport.stream(storage.list(TEST_BUCKET).iterateAll().spliterator(), false).count())
+ .isZero();
+ }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
new file mode 100644
index 0000000..d9ca744
--- /dev/null
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class GCSInputStreamTest {
+
+ private final Random random = new Random(1);
+
+ private final GCPProperties gcpProperties = new GCPProperties();
+ private final Storage storage = LocalStorageHelper.getOptions().getService();
+
+ @Test
+ public void testRead() throws Exception {
+ BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat");
+ int dataSize = 1024 * 1024 * 10;
+ byte[] data = randomData(dataSize);
+
+ writeGCSData(uri, data);
+
+ try (SeekableInputStream in = new GCSInputStream(storage, uri, gcpProperties)) {
+ int readSize = 1024;
+ byte [] actual = new byte[readSize];
+
+ readAndCheck(in, in.getPos(), readSize, data, false);
+ readAndCheck(in, in.getPos(), readSize, data, true);
+
+ // Seek forward in current stream
+ int seekSize = 1024;
+ readAndCheck(in, in.getPos() + seekSize, readSize, data, false);
+ readAndCheck(in, in.getPos() + seekSize, readSize, data, true);
+
+ // Buffered read
+ readAndCheck(in, in.getPos(), readSize, data, true);
+ readAndCheck(in, in.getPos(), readSize, data, false);
+
+ // Seek with new stream
+ long seekNewStreamPosition = 2 * 1024 * 1024;
+ readAndCheck(in, in.getPos() + seekNewStreamPosition, readSize, data, true);
+ readAndCheck(in, in.getPos() + seekNewStreamPosition, readSize, data, false);
+
+ // Backseek and read
+ readAndCheck(in, 0, readSize, data, true);
+ readAndCheck(in, 0, readSize, data, false);
+ }
+ }
+
+ private void readAndCheck(SeekableInputStream in, long rangeStart, int size, byte [] original, boolean buffered)
+ throws IOException {
+ in.seek(rangeStart);
+ assertEquals(rangeStart, in.getPos());
+
+ long rangeEnd = rangeStart + size;
+ byte [] actual = new byte[size];
+
+ if (buffered) {
+ IOUtils.readFully(in, actual);
+ } else {
+ int read = 0;
+ while (read < size) {
+ actual[read++] = (byte) in.read();
+ }
+ }
+
+ assertEquals(rangeEnd, in.getPos());
+ assertArrayEquals(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd), actual);
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/closed.dat");
+ SeekableInputStream closed = new GCSInputStream(storage, blobId, gcpProperties);
+ closed.close();
+ assertThrows(IllegalStateException.class, () -> closed.seek(0));
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/seek.dat");
+ byte[] data = randomData(1024 * 1024);
+
+ writeGCSData(blobId, data);
+
+ try (SeekableInputStream in = new GCSInputStream(storage, blobId, gcpProperties)) {
+ in.seek(data.length / 2);
+ byte[] actual = new byte[data.length / 2 ];
+
+ IOUtils.readFully(in, actual, 0, data.length / 2);
+
+ byte [] expected = Arrays.copyOfRange(data, data.length / 2, data.length);
+ assertArrayEquals(expected, actual);
+ }
+ }
+
+ private byte[] randomData(int size) {
+ byte[] data = new byte[size];
+ random.nextBytes(data);
+ return data;
+ }
+
+ private void writeGCSData(BlobId blobId, byte[] data) throws IOException {
+ storage.createFrom(BlobInfo.newBuilder(blobId).build(), new ByteArrayInputStream(data));
+ }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSOutputStreamTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSOutputStreamTest.java
new file mode 100644
index 0000000..06f86c3
--- /dev/null
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSOutputStreamTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class GCSOutputStreamTest {
+ private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStreamTest.class);
+ private static final String BUCKET = "test-bucket";
+
+ private final GCPProperties properties = new GCPProperties();
+ private final Storage storage = LocalStorageHelper.getOptions().getService();
+ private final Random random = new Random(1);
+
+ @Test
+ public void testWrite() {
+ // Run tests for both byte and array write paths
+ Stream.of(true, false).forEach(arrayWrite -> {
+ // Test small file write
+ writeAndVerify(storage, randomBlobId(), randomData(1024), arrayWrite);
+
+ // Test large file
+ writeAndVerify(storage, randomBlobId(), randomData(10 * 1024 * 1024), arrayWrite);
+ });
+ }
+
+ @Test
+ public void testMultipleClose() throws IOException {
+ GCSOutputStream stream = new GCSOutputStream(storage, randomBlobId(), properties);
+ stream.close();
+ stream.close();
+ }
+
+
+ private void writeAndVerify(Storage client, BlobId uri, byte [] data, boolean arrayWrite) {
+ try (GCSOutputStream stream = new GCSOutputStream(client, uri, properties)) {
+ if (arrayWrite) {
+ stream.write(data);
+ assertEquals(data.length, stream.getPos());
+ } else {
+ for (int i = 0; i < data.length; i++) {
+ stream.write(data[i]);
+ assertEquals(i + 1, stream.getPos());
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ byte[] actual = readGCSData(uri);
+ assertArrayEquals(data, actual);
+ }
+
+ private byte[] readGCSData(BlobId blobId) {
+ return storage.get(blobId).getContent();
+ }
+
+ private byte[] randomData(int size) {
+ byte [] result = new byte[size];
+ random.nextBytes(result);
+ return result;
+ }
+
+ private BlobId randomBlobId() {
+ return BlobId.fromGsUtilUri(String.format("gs://%s/data/%s.dat", BUCKET, UUID.randomUUID()));
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index d6d27bb..5971727 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -32,6 +32,7 @@ include 'spark'
include 'pig'
include 'hive-metastore'
include 'nessie'
+include 'gcp'
project(':api').name = 'iceberg-api'
project(':common').name = 'iceberg-common'
@@ -47,6 +48,7 @@ project(':spark').name = 'iceberg-spark'
project(':pig').name = 'iceberg-pig'
project(':hive-metastore').name = 'iceberg-hive-metastore'
project(':nessie').name = 'iceberg-nessie'
+project(':gcp').name = 'iceberg-gcp'
List<String> knownFlinkVersions = System.getProperty("knownFlinkVersions").split(",")
String flinkVersionsString = System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")
diff --git a/versions.props b/versions.props
index c18f124..f54248d 100644
--- a/versions.props
+++ b/versions.props
@@ -25,6 +25,7 @@ org.glassfish.jaxb:jaxb-runtime = 2.3.3
software.amazon.awssdk:* = 2.15.7
org.scala-lang:scala-library = 2.12.10
org.projectnessie:* = 0.17.0
+com.google.cloud:libraries-bom = 24.1.0
# test deps
org.junit.vintage:junit-vintage-engine = 5.7.2