You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2021/12/14 16:48:56 UTC

[iceberg] branch master updated: Add FileIO implementation for Google Cloud Storage (#3711)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e70e7d2  Add FileIO implementation for Google Cloud Storage (#3711)
e70e7d2 is described below

commit e70e7d26feb1d56f31c7cdfde515278beeda26a6
Author: Daniel Weeks <dw...@apache.org>
AuthorDate: Tue Dec 14 08:48:40 2021 -0800

    Add FileIO implementation for Google Cloud Storage (#3711)
    
    * Add FileIO implementation for Google Cloud Storage
    
    * Checkstyle
    
    * Checkstyle
    
    * Review comments
    
    * Update version management
---
 build.gradle                                       |  24 ++++
 .../java/org/apache/iceberg/gcp/GCPProperties.java | 103 +++++++++++++++
 .../org/apache/iceberg/gcp/gcs/BaseGCSFile.java    |  76 +++++++++++
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 125 ++++++++++++++++++
 .../org/apache/iceberg/gcp/gcs/GCSInputFile.java   |  46 +++++++
 .../org/apache/iceberg/gcp/gcs/GCSInputStream.java | 143 +++++++++++++++++++++
 .../org/apache/iceberg/gcp/gcs/GCSOutputFile.java  |  69 ++++++++++
 .../apache/iceberg/gcp/gcs/GCSOutputStream.java    | 127 ++++++++++++++++++
 .../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java  |  96 ++++++++++++++
 .../apache/iceberg/gcp/gcs/GCSInputStreamTest.java | 137 ++++++++++++++++++++
 .../iceberg/gcp/gcs/GCSOutputStreamTest.java       |  98 ++++++++++++++
 settings.gradle                                    |   2 +
 versions.props                                     |   1 +
 13 files changed, 1047 insertions(+)

diff --git a/build.gradle b/build.gradle
index 85bfe42..e290c88 100644
--- a/build.gradle
+++ b/build.gradle
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom')
+    implementation 'com.google.cloud:google-cloud-storage'
+
+    testImplementation 'com.google.cloud:google-cloud-nio'
+    testImplementation 'org.assertj:assertj-core'
+
+    testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
+
+    testImplementation("org.apache.hadoop:hadoop-common") {
+      exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+      exclude group: 'javax.servlet', module: 'servlet-api'
+      exclude group: 'com.google.code.gson', module: 'gson'
+    }
+  }
+}
+
 project(':iceberg-hive-metastore') {
   dependencies {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
new file mode 100644
index 0000000..0f092be
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+
+public class GCPProperties implements Serializable {
+  // Service Options
+  public static final String GCS_PROJECT_ID = "gcs.project-id";
+  public static final String GCS_CLIENT_LIB_TOKEN = "gcs.client-lib-token";
+  public static final String GCS_SERVICE_HOST = "gcs.service.host";
+
+  // GCS Configuration Properties
+  public static final String GCS_DECRYPTION_KEY = "gcs.decryption-key";
+  public static final String GCS_ENCRYPTION_KEY = "gcs.encryption-key";
+  public static final String GCS_USER_PROJECT = "gcs.user-project";
+
+  public static final String GCS_CHANNEL_READ_CHUNK_SIZE = "gcs.channel.read.chunk-size-bytes";
+  public static final String GCS_CHANNEL_WRITE_CHUNK_SIZE = "gcs.channel.write.chunk-size-bytes";
+
+  private String projectId;
+  private String clientLibToken;
+  private String serviceHost;
+
+  private String gcsDecryptionKey;
+  private String gcsEncryptionKey;
+  private String gcsUserProject;
+
+  private Integer gcsChannelReadChunkSize;
+  private Integer gcsChannelWriteChunkSize;
+
+  public GCPProperties() {
+  }
+
+  public GCPProperties(Map<String, String> properties) {
+    projectId = properties.get(GCS_PROJECT_ID);
+    clientLibToken = properties.get(GCS_CLIENT_LIB_TOKEN);
+    serviceHost = properties.get(GCS_SERVICE_HOST);
+
+    gcsDecryptionKey = properties.get(GCS_DECRYPTION_KEY);
+    gcsEncryptionKey = properties.get(GCS_ENCRYPTION_KEY);
+    gcsUserProject = properties.get(GCS_USER_PROJECT);
+
+    if (properties.containsKey(GCS_CHANNEL_READ_CHUNK_SIZE)) {
+      gcsChannelReadChunkSize = Integer.parseInt(properties.get(GCS_CHANNEL_READ_CHUNK_SIZE));
+    }
+
+    if (properties.containsKey(GCS_CHANNEL_WRITE_CHUNK_SIZE)) {
+      gcsChannelWriteChunkSize = Integer.parseInt(properties.get(GCS_CHANNEL_WRITE_CHUNK_SIZE));
+    }
+  }
+
+  public Optional<Integer> channelReadChunkSize() {
+    return Optional.ofNullable(gcsChannelReadChunkSize);
+  }
+
+  public Optional<Integer> channelWriteChunkSize() {
+    return Optional.ofNullable(gcsChannelWriteChunkSize);
+  }
+
+  public Optional<String> clientLibToken() {
+    return Optional.ofNullable(clientLibToken);
+  }
+
+  public Optional<String> decryptionKey() {
+    return Optional.ofNullable(gcsDecryptionKey);
+  }
+
+  public Optional<String> encryptionKey() {
+    return Optional.ofNullable(gcsEncryptionKey);
+  }
+
+  public Optional<String> projectId() {
+    return Optional.ofNullable(projectId);
+  }
+
+  public Optional<String> serviceHost() {
+    return Optional.ofNullable(serviceHost);
+  }
+
+  public Optional<String> userProject() {
+    return Optional.ofNullable(gcsUserProject);
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
new file mode 100644
index 0000000..9842c10
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import java.net.URI;
+import org.apache.iceberg.gcp.GCPProperties;
+
+abstract class BaseGCSFile {
+  private final Storage storage;
+  private final GCPProperties gcpProperties;
+  private final BlobId blobId;
+  private Blob metadata;
+
+  BaseGCSFile(Storage storage, BlobId blobId, GCPProperties gcpProperties) {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+  }
+
+  public String location() {
+    return blobId.toGsUtilUri();
+  }
+
+  Storage storage() {
+    return storage;
+  }
+
+  URI uri() {
+    return URI.create(blobId.toGsUtilUri());
+  }
+
+  BlobId blobId() {
+    return blobId;
+  }
+
+  protected GCPProperties gcpProperties() {
+    return gcpProperties;
+  }
+
+  public boolean exists() {
+    return getBlob() != null;
+  }
+
+  protected Blob getBlob() {
+    if (metadata == null) {
+      metadata = storage.get(blobId);
+    }
+
+    return metadata;
+  }
+
+  @Override
+  public String toString() {
+    return blobId.toString();
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
new file mode 100644
index 0000000..d1d947a
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}
+ * that follow the convention <pre>{@code gs://<bucket>/<blob_path>}</pre>
+ * <p>
+ * See <a href="https://cloud.google.com/storage/docs/folders#overview">Cloud Storage Overview</a>
+ */
+public class GCSFileIO implements FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
+
+  private SerializableSupplier<Storage> storageSupplier;
+  private GCPProperties gcpProperties;
+  private transient Storage storage;
+  private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+
+  /**
+   * No-arg constructor to load the FileIO dynamically.
+   * <p>
+   * All fields are initialized by calling {@link GCSFileIO#initialize(Map)} later.
+   */
+  public GCSFileIO() {
+  }
+
+  /**
+   * Constructor with custom storage supplier and GCP properties.
+   * <p>
+   * Calling {@link GCSFileIO#initialize(Map)} will overwrite information set in this constructor.
+   *
+   * @param storageSupplier storage supplier
+   * @param gcpProperties gcp properties
+   */
+  public GCSFileIO(SerializableSupplier<Storage> storageSupplier, GCPProperties gcpProperties) {
+    this.storageSupplier = storageSupplier;
+    this.gcpProperties = gcpProperties;
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return GCSInputFile.fromLocation(path, client(), gcpProperties);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return GCSOutputFile.fromLocation(path, client(), gcpProperties);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    // There is no specific contract about whether delete should fail
+    // and other FileIO providers ignore failure.  Log the failure for
+    // now as it is not a required operation for Iceberg.
+    if (!client().delete(BlobId.fromGsUtilUri(path))) {
+      LOG.warn("Failed to delete path: {}", path);
+    }
+  }
+
+  private Storage client() {
+    if (storage == null) {
+      storage = storageSupplier.get();
+    }
+    return storage;
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.gcpProperties = new GCPProperties(properties);
+
+    this.storageSupplier = () -> {
+      StorageOptions.Builder builder = StorageOptions.newBuilder();
+
+      gcpProperties.projectId().ifPresent(builder::setProjectId);
+      gcpProperties.clientLibToken().ifPresent(builder::setClientLibToken);
+      gcpProperties.serviceHost().ifPresent(builder::setHost);
+
+      return builder.build().getService();
+    };
+  }
+
+  @Override
+  public void close() {
+    // handles concurrent calls to close()
+    if (isResourceClosed.compareAndSet(false, true)) {
+      if (storage != null) {
+        // GCS Storage does not appear to be closable, so release the reference
+        storage = null;
+      }
+    }
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
new file mode 100644
index 0000000..1f836ec
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+public class GCSInputFile extends BaseGCSFile implements InputFile {
+  public static GCSInputFile fromLocation(String location, Storage storage, GCPProperties gcpProperties) {
+    return new GCSInputFile(storage, BlobId.fromGsUtilUri(location), gcpProperties);
+  }
+
+  GCSInputFile(Storage storage, BlobId blobId, GCPProperties gcpProperties) {
+    super(storage, blobId, gcpProperties);
+  }
+
+  @Override
+  public long getLength() {
+    return getBlob().getSize();
+  }
+
+  @Override
+  public SeekableInputStream newStream() {
+    return new GCSInputStream(storage(), blobId(), gcpProperties());
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
new file mode 100644
index 0000000..3a83276
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobSourceOption;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSInputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSInputStream extends SeekableInputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSInputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private ReadChannel channel;
+  private long pos = 0;
+  private boolean closed = false;
+  private final ByteBuffer singleByteBuffer = ByteBuffer.wrap(new byte[1]);
+  private ByteBuffer byteBuffer;
+
+  GCSInputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  private void openStream() {
+    List<BlobSourceOption> sourceOptions = Lists.newArrayList();
+
+    gcpProperties.decryptionKey().ifPresent(
+        key -> sourceOptions.add(BlobSourceOption.decryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        userProject -> sourceOptions.add(BlobSourceOption.userProject(userProject)));
+
+    channel = storage.reader(blobId, sourceOptions.toArray(new BlobSourceOption[0]));
+
+    gcpProperties.channelReadChunkSize().ifPresent(channel::setChunkSize);
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void seek(long newPos) {
+    Preconditions.checkState(!closed, "already closed");
+    Preconditions.checkArgument(newPos >= 0, "position is negative: %s", newPos);
+
+    pos = newPos;
+    try {
+      channel.seek(newPos);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public int read() throws IOException {
+    Preconditions.checkState(!closed, "Cannot read: already closed");
+    singleByteBuffer.position(0);
+
+    pos += 1;
+    channel.read(singleByteBuffer);
+
+    return singleByteBuffer.array()[0];
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    Preconditions.checkState(!closed, "Cannot read: already closed");
+
+    byteBuffer = byteBuffer != null && byteBuffer.array() == b ? byteBuffer : ByteBuffer.wrap(b);
+    byteBuffer.position(off);
+    byteBuffer.limit(Math.min(off + len, byteBuffer.capacity()));
+
+    int bytesRead = channel.read(byteBuffer);
+    pos += bytesRead;
+
+    return bytesRead;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    closed = true;
+    if (channel != null) {
+      channel.close();
+    }
+  }
+
+  @SuppressWarnings("checkstyle:NoFinalizer")
+  @Override
+  protected void finalize() throws Throwable {
+    super.finalize();
+    if (!closed) {
+      close(); // releasing resources is more important than printing the warning
+      String trace = Joiner.on("\n\t").join(
+          Arrays.copyOfRange(createStack, 1, createStack.length));
+      LOG.warn("Unclosed input stream created by:\n\t{}", trace);
+    }
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
new file mode 100644
index 0000000..db11e2c
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class GCSOutputFile extends BaseGCSFile implements OutputFile {
+  public static GCSOutputFile fromLocation(String location, Storage storage, GCPProperties gcpProperties) {
+    return new GCSOutputFile(storage, BlobId.fromGsUtilUri(location), gcpProperties);
+  }
+
+  GCSOutputFile(Storage storage, BlobId blobId, GCPProperties gcpProperties) {
+    super(storage, blobId, gcpProperties);
+  }
+
+  /**
+   * Create an output stream for the specified location if the target object
+   * does not exist in GCS at the time of invocation.
+   *
+   * @return output stream
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("Location already exists: %s", uri());
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    try {
+      return new GCSOutputStream(storage(), blobId(), gcpProperties());
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to create output stream for location: " + uri(), e);
+    }
+  }
+
+  @Override
+  public InputFile toInputFile() {
+    return new GCSInputFile(storage(), blobId(), gcpProperties());
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
new file mode 100644
index 0000000..611833d
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        key -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        userProject -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+    WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+        writeOptions.toArray(new BlobWriteOption[0]));
+
+    gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);
+
+    stream = Channels.newOutputStream(channel);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    super.close();
+    closed = true;
+    stream.close();
+  }
+
+
+  @SuppressWarnings("checkstyle:NoFinalizer")
+  @Override
+  protected void finalize() throws Throwable {
+    super.finalize();
+    if (!closed) {
+      close(); // releasing resources is more important than printing the warning
+      String trace = Joiner.on("\n\t").join(
+          Arrays.copyOfRange(createStack, 1, createStack.length));
+      LOG.warn("Unclosed output stream created by:\n\t{}", trace);
+    }
+  }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
new file mode 100644
index 0000000..99bb08d
--- /dev/null
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Random;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+
+public class GCSFileIOTest {
+  private static final String TEST_BUCKET = "TEST_BUCKET";
+  private final Random random = new Random(1);
+
+  private final Storage storage = LocalStorageHelper.getOptions().getService();
+  private GCSFileIO io;
+
+  @Before
+  public void before() {
+    io = new GCSFileIO(() -> storage, new GCPProperties());
+  }
+
+  @Test
+  public void newInputFile() throws IOException {
+    String location = format("gs://%s/path/to/file.txt", TEST_BUCKET);
+    byte [] expected = new byte[1024 * 1024];
+    random.nextBytes(expected);
+
+    InputFile in = io.newInputFile(location);
+    assertFalse(in.exists());
+
+    OutputFile out = io.newOutputFile(location);
+    try (OutputStream os = out.createOrOverwrite()) {
+      IOUtils.write(expected, os);
+    }
+
+    assertThat(in.exists()).isTrue();
+    byte [] actual = new byte[1024 * 1024];
+
+    try (InputStream is = in.newStream()) {
+      IOUtils.readFully(is, actual);
+    }
+
+    assertThat(expected).isEqualTo(actual);
+
+    io.deleteFile(in);
+
+    assertThat(io.newInputFile(location).exists()).isFalse();
+  }
+
+  @Test
+  public void testDelete() {
+    String path = "delete/path/data.dat";
+    storage.create(BlobInfo.newBuilder(TEST_BUCKET, path).build());
+
+    // There should be one blob in the bucket
+    assertThat(StreamSupport.stream(storage.list(TEST_BUCKET).iterateAll().spliterator(), false).count())
+        .isEqualTo(1);
+
+    io.deleteFile(format("gs://%s/%s", TEST_BUCKET, path));
+
+    // The bucket should now be empty
+    assertThat(StreamSupport.stream(storage.list(TEST_BUCKET).iterateAll().spliterator(), false).count())
+        .isZero();
+  }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
new file mode 100644
index 0000000..d9ca744
--- /dev/null
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class GCSInputStreamTest {
+
+  private final Random random = new Random(1);
+
+  private final GCPProperties gcpProperties = new GCPProperties();
+  private final Storage storage = LocalStorageHelper.getOptions().getService();
+
+  @Test
+  public void testRead() throws Exception {
+    BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat");
+    int dataSize = 1024 * 1024 * 10;
+    byte[] data = randomData(dataSize);
+
+    writeGCSData(uri, data);
+
+    try (SeekableInputStream in = new GCSInputStream(storage, uri, gcpProperties)) {
+      int readSize = 1024;
+      byte [] actual = new byte[readSize];
+
+      readAndCheck(in, in.getPos(), readSize, data, false);
+      readAndCheck(in, in.getPos(), readSize, data, true);
+
+      // Seek forward in current stream
+      int seekSize = 1024;
+      readAndCheck(in, in.getPos() + seekSize, readSize, data, false);
+      readAndCheck(in, in.getPos() + seekSize, readSize, data, true);
+
+      // Buffered read
+      readAndCheck(in, in.getPos(), readSize, data, true);
+      readAndCheck(in, in.getPos(), readSize, data, false);
+
+      // Seek with new stream
+      long seekNewStreamPosition = 2 * 1024 * 1024;
+      readAndCheck(in, in.getPos() + seekNewStreamPosition, readSize, data, true);
+      readAndCheck(in, in.getPos() + seekNewStreamPosition, readSize, data, false);
+
+      // Backseek and read
+      readAndCheck(in, 0, readSize, data, true);
+      readAndCheck(in, 0, readSize, data, false);
+    }
+  }
+
+  private void readAndCheck(SeekableInputStream in, long rangeStart, int size, byte [] original, boolean buffered)
+      throws IOException {
+    in.seek(rangeStart);
+    assertEquals(rangeStart, in.getPos());
+
+    long rangeEnd = rangeStart + size;
+    byte [] actual = new byte[size];
+
+    if (buffered) {
+      IOUtils.readFully(in, actual);
+    } else {
+      int read = 0;
+      while (read < size) {
+        actual[read++] = (byte) in.read();
+      }
+    }
+
+    assertEquals(rangeEnd, in.getPos());
+    assertArrayEquals(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd), actual);
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/closed.dat");
+    SeekableInputStream closed = new GCSInputStream(storage, blobId, gcpProperties);
+    closed.close();
+    assertThrows(IllegalStateException.class, () -> closed.seek(0));
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/seek.dat");
+    byte[] data = randomData(1024 * 1024);
+
+    writeGCSData(blobId, data);
+
+    try (SeekableInputStream in = new GCSInputStream(storage, blobId, gcpProperties)) {
+      in.seek(data.length / 2);
+      byte[] actual = new byte[data.length / 2 ];
+
+      IOUtils.readFully(in, actual, 0, data.length / 2);
+
+      byte [] expected = Arrays.copyOfRange(data, data.length / 2, data.length);
+      assertArrayEquals(expected, actual);
+    }
+  }
+
+  private byte[] randomData(int size) {
+    byte[] data = new byte[size];
+    random.nextBytes(data);
+    return data;
+  }
+
+  private void writeGCSData(BlobId blobId, byte[] data) throws IOException {
+    storage.createFrom(BlobInfo.newBuilder(blobId).build(), new ByteArrayInputStream(data));
+  }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSOutputStreamTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSOutputStreamTest.java
new file mode 100644
index 0000000..06f86c3
--- /dev/null
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSOutputStreamTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class GCSOutputStreamTest {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStreamTest.class);
+  private static final String BUCKET = "test-bucket";
+
+  private final GCPProperties properties = new GCPProperties();
+  private final Storage storage = LocalStorageHelper.getOptions().getService();
+  private final Random random = new Random(1);
+
+  @Test
+  public void testWrite() {
+    // Run tests for both byte and array write paths
+    Stream.of(true, false).forEach(arrayWrite -> {
+      // Test small file write
+      writeAndVerify(storage, randomBlobId(), randomData(1024), arrayWrite);
+
+      // Test large file
+      writeAndVerify(storage, randomBlobId(), randomData(10 * 1024 * 1024), arrayWrite);
+    });
+  }
+
+  @Test
+  public void testMultipleClose() throws IOException {
+    GCSOutputStream stream = new GCSOutputStream(storage, randomBlobId(), properties);
+    stream.close();
+    stream.close();
+  }
+
+
+  private void writeAndVerify(Storage client, BlobId uri, byte [] data, boolean arrayWrite) {
+    try (GCSOutputStream stream = new GCSOutputStream(client, uri, properties)) {
+      if (arrayWrite) {
+        stream.write(data);
+        assertEquals(data.length, stream.getPos());
+      } else {
+        for (int i = 0; i < data.length; i++) {
+          stream.write(data[i]);
+          assertEquals(i + 1, stream.getPos());
+        }
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    byte[] actual = readGCSData(uri);
+    assertArrayEquals(data, actual);
+  }
+
+  private byte[] readGCSData(BlobId blobId) {
+    return storage.get(blobId).getContent();
+  }
+
+  private byte[] randomData(int size) {
+    byte [] result = new byte[size];
+    random.nextBytes(result);
+    return result;
+  }
+
+  private BlobId randomBlobId() {
+    return BlobId.fromGsUtilUri(String.format("gs://%s/data/%s.dat", BUCKET, UUID.randomUUID()));
+  }
+}
diff --git a/settings.gradle b/settings.gradle
index d6d27bb..5971727 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -32,6 +32,7 @@ include 'spark'
 include 'pig'
 include 'hive-metastore'
 include 'nessie'
+include 'gcp'
 
 project(':api').name = 'iceberg-api'
 project(':common').name = 'iceberg-common'
@@ -47,6 +48,7 @@ project(':spark').name = 'iceberg-spark'
 project(':pig').name = 'iceberg-pig'
 project(':hive-metastore').name = 'iceberg-hive-metastore'
 project(':nessie').name = 'iceberg-nessie'
+project(':gcp').name = 'iceberg-gcp'
 
 List<String> knownFlinkVersions = System.getProperty("knownFlinkVersions").split(",")
 String flinkVersionsString = System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")
diff --git a/versions.props b/versions.props
index c18f124..f54248d 100644
--- a/versions.props
+++ b/versions.props
@@ -25,6 +25,7 @@ org.glassfish.jaxb:jaxb-runtime = 2.3.3
 software.amazon.awssdk:* = 2.15.7
 org.scala-lang:scala-library = 2.12.10
 org.projectnessie:* = 0.17.0
+com.google.cloud:libraries-bom = 24.1.0
 
 # test deps
 org.junit.vintage:junit-vintage-engine = 5.7.2